Coverage Report

Created: 2026-03-16 01:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/async_result_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <concurrentqueue.h>
20
21
#include <condition_variable>
22
#include <queue> // IWYU pragma: keep
23
24
#include "exec/sink/writer/result_writer.h"
25
#include "exprs/vexpr_fwd.h"
26
#include "runtime/runtime_profile.h"
27
28
namespace doris {
29
class ObjectPool;
30
class RowDescriptor;
31
class RuntimeState;
32
class TDataSink;
33
class TExpr;
34
35
class Dependency;
36
class PipelineTask;
37
38
class Block;
39
/*
40
 *  In the pipeline execution engine, there are usually a large number of io operations on the sink side that
41
 *  will block the limited execution threads of the pipeline execution engine, resulting in a sharp performance
42
 *  degradation of the pipeline execution engine when there are import tasks.
43
 *
44
 *  So all ResultWriter in Sink should use AsyncResultWriter to do the real IO task in thread pool to keep the
45
 *  pipeline execution engine performance.
46
 *
47
 *  The Sub class of AsyncResultWriter need to impl two virtual function
48
 *     * Status open() the first time IO work like: create file/ connect network
49
 *     * Status write() do the real IO work for block 
50
 */
51
class AsyncResultWriter : public ResultWriter {
52
public:
53
    AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs, std::shared_ptr<Dependency> dep,
54
                      std::shared_ptr<Dependency> fin_dep);
55
56
    void force_close(Status s);
57
58
0
    Status init(RuntimeState* state) override { return Status::OK(); }
59
60
    virtual Status open(RuntimeState* state, RuntimeProfile* operator_profile) = 0;
61
62
    // sink the block data to data queue, it is async
63
    Status sink(Block* block, bool eos);
64
65
    // Add the IO thread task process block() to thread pool to dispose the IO
66
    Status start_writer(RuntimeState* state, RuntimeProfile* operator_profile);
67
68
0
    Status get_writer_status() { return _writer_status.status(); }
69
70
    void set_low_memory_mode();
71
72
protected:
73
    Status _projection_block(Block& input_block, Block* output_block);
74
    const VExprContextSPtrs& _vec_output_expr_ctxs;
75
    RuntimeProfile* _operator_profile = nullptr; // not owned, set when open
76
77
    std::unique_ptr<Block> _get_free_block(Block*, size_t rows);
78
79
private:
80
    void process_block(RuntimeState* state, RuntimeProfile* operator_profile);
81
0
    [[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; }
82
0
    [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; }
83
    void _set_ready_to_finish();
84
85
    void _return_free_block(std::unique_ptr<Block>);
86
    std::unique_ptr<Block> _get_block_from_queue();
87
88
    static constexpr auto QUEUE_SIZE = 3;
89
    std::mutex _m;
90
    std::condition_variable _cv;
91
    std::deque<std::unique_ptr<Block>> _data_queue;
92
    // Default value is ok
93
    AtomicStatus _writer_status;
94
    bool _eos = false;
95
    std::atomic_bool _low_memory_mode = false;
96
97
    std::shared_ptr<Dependency> _dependency;
98
    std::shared_ptr<Dependency> _finish_dependency;
99
100
    moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
101
    RuntimeProfile::Counter* _memory_used_counter = nullptr;
102
};
103
104
} // namespace doris