Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/async_result_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/async_result_writer.h"
19
20
#include "common/status.h"
21
#include "core/block/block.h"
22
#include "core/block/materialize_block.h"
23
#include "exec/pipeline/dependency.h"
24
#include "exprs/vexpr_context.h"
25
#include "runtime/exec_env.h"
26
#include "runtime/fragment_mgr.h"
27
#include "runtime/runtime_state.h"
28
29
namespace doris {
30
class ObjectPool;
31
class RowDescriptor;
32
class TExpr;
33
34
#include "common/compile_check_begin.h"
35
36
AsyncResultWriter::AsyncResultWriter(const doris::VExprContextSPtrs& output_expr_ctxs,
37
                                     std::shared_ptr<Dependency> dep,
38
                                     std::shared_ptr<Dependency> fin_dep)
39
13
        : _vec_output_expr_ctxs(output_expr_ctxs), _dependency(dep), _finish_dependency(fin_dep) {}
40
41
0
Status AsyncResultWriter::sink(Block* block, bool eos) {
42
0
    auto rows = block->rows();
43
0
    std::unique_ptr<Block> add_block;
44
0
    if (rows) {
45
0
        add_block = _get_free_block(block, rows);
46
0
    }
47
48
0
    std::lock_guard l(_m);
49
    // if io task failed, just return error status to
50
    // end the query
51
0
    if (!_writer_status.ok()) {
52
0
        return _writer_status.status();
53
0
    }
54
55
0
    DCHECK(_dependency);
56
0
    if (_is_finished()) {
57
0
        _dependency->set_ready();
58
0
    }
59
0
    if (rows) {
60
0
        _memory_used_counter->update(add_block->allocated_bytes());
61
0
        _data_queue.emplace_back(std::move(add_block));
62
0
        if (!_data_queue_is_available() && !_is_finished()) {
63
0
            _dependency->block();
64
0
        }
65
0
    }
66
    // in 'process block' we check _eos first and _data_queue second so here
67
    // in the lock. must modify the _eos after change _data_queue to make sure
68
    // not lead the logic error in multi thread
69
0
    _eos = eos;
70
71
0
    _cv.notify_one();
72
0
    return Status::OK();
73
0
}
74
75
0
std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() {
76
0
    std::lock_guard l(_m);
77
0
    DCHECK(!_data_queue.empty());
78
0
    auto block = std::move(_data_queue.front());
79
0
    _data_queue.pop_front();
80
0
    DCHECK(_dependency);
81
0
    if (_data_queue_is_available()) {
82
0
        _dependency->set_ready();
83
0
    }
84
0
    _memory_used_counter->update(-block->allocated_bytes());
85
0
    return block;
86
0
}
87
88
0
Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* operator_profile) {
89
    // Attention!!!
90
    // AsyncResultWriter::open is called asynchronously,
91
    // so we need to setupt the operator_profile and memory counter here,
92
    // or else the counter can be nullptr when AsyncResultWriter::sink is called.
93
0
    _operator_profile = operator_profile;
94
0
    DCHECK(_operator_profile->get_child("CommonCounters") != nullptr);
95
0
    _memory_used_counter =
96
0
            _operator_profile->get_child("CommonCounters")->get_counter("MemoryUsage");
97
0
    DCHECK(_memory_used_counter != nullptr);
98
    // Should set to false here, to
99
0
    DCHECK(_finish_dependency);
100
0
    _finish_dependency->block();
101
    // This is a async thread, should lock the task ctx, to make sure runtimestate and operator_profile
102
    // not deconstructed before the thread exit.
103
0
    auto task_ctx = state->get_task_execution_context();
104
0
    RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
105
0
            [this, state, operator_profile, task_ctx]() {
106
0
                SCOPED_ATTACH_TASK(state);
107
0
                auto task_lock = task_ctx.lock();
108
0
                if (task_lock == nullptr) {
109
0
                    return;
110
0
                }
111
0
                this->process_block(state, operator_profile);
112
0
                task_lock.reset();
113
0
            }));
114
0
    return Status::OK();
115
0
}
116
117
0
void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* operator_profile) {
118
0
    if (auto status = open(state, operator_profile); !status.ok()) {
119
0
        force_close(status);
120
0
    }
121
122
0
    if (state && state->get_query_ctx() && state->get_query_ctx()->workload_group()) {
123
0
        if (auto cg_ctl_sptr =
124
0
                    state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) {
125
0
            Status ret = cg_ctl_sptr->add_thread_to_cgroup();
126
0
            if (ret.ok()) {
127
0
                std::string wg_tname =
128
0
                        "asyc_wr_" + state->get_query_ctx()->workload_group()->name();
129
0
                Thread::set_self_name(wg_tname);
130
0
            }
131
0
        }
132
0
    }
133
134
0
    DCHECK(_dependency);
135
0
    while (_writer_status.ok()) {
136
0
        ThreadCpuStopWatch cpu_time_stop_watch;
137
0
        cpu_time_stop_watch.start();
138
0
        Defer defer {[&]() {
139
0
            if (state && state->get_query_ctx()) {
140
0
                state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
141
0
                        cpu_time_stop_watch.elapsed_time());
142
0
            }
143
0
        }};
144
145
        //1) wait scan operator write data
146
0
        {
147
0
            std::unique_lock l(_m);
148
            // When the query is cancelled, _writer_status may be set to error status in force_close method.
149
            // When the BE process is exit gracefully, the fragment mgr's thread pool will be shutdown,
150
            // and the async thread will be exit.
151
0
            while (!_eos && _data_queue.empty() && _writer_status.ok() &&
152
0
                   !ExecEnv::GetInstance()->fragment_mgr()->shutting_down()) {
153
                // Add 1s to check to avoid lost signal
154
0
                _cv.wait_for(l, std::chrono::seconds(1));
155
0
            }
156
            // If writer status is not ok, then we should not change its status to avoid lost the actual error status.
157
0
            if (ExecEnv::GetInstance()->fragment_mgr()->shutting_down() && _writer_status.ok()) {
158
0
                _writer_status.update(Status::InternalError<false>("FragmentMgr is shutting down"));
159
0
            }
160
161
            //check if eos or writer error
162
0
            if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
163
0
                _data_queue.clear();
164
0
                break;
165
0
            }
166
0
        }
167
168
        //2) get the block from  data queue and write to downstream
169
0
        auto block = _get_block_from_queue();
170
0
        auto status = write(state, *block);
171
0
        if (!status.ok()) [[unlikely]] {
172
0
            std::unique_lock l(_m);
173
0
            _writer_status.update(status);
174
0
            if (_is_finished()) {
175
0
                _dependency->set_ready();
176
0
            }
177
0
            break;
178
0
        }
179
180
0
        _return_free_block(std::move(block));
181
0
    }
182
183
0
    bool need_finish = false;
184
0
    {
185
        // If the last block is sent successfuly, then call finish to clear the buffer or commit
186
        // transactions.
187
        // Using lock to make sure the writer status is not modified
188
        // There is a unique ptr err_msg in Status, if it is modified, the unique ptr
189
        // maybe released. And it will core because use after free.
190
0
        std::lock_guard l(_m);
191
0
        if (_writer_status.ok() && _eos) {
192
0
            need_finish = true;
193
0
        }
194
0
    }
195
    // eos only means the last block is input to the queue and there is no more block to be added,
196
    // it is not sure that the block is written to stream.
197
0
    if (need_finish) {
198
        // Should not call finish in lock because it may hang, and it will lock _m too long.
199
        // And get_writer_status will also need this lock, it will block pipeline exec thread.
200
0
        Status st = finish(state);
201
0
        _writer_status.update(st);
202
0
    }
203
0
    Status st = Status::OK();
204
0
    { st = _writer_status.status(); }
205
206
0
    Status close_st = close(st);
207
0
    {
208
        // If it is already failed before, then not update the write status so that we could get
209
        // the real reason.
210
0
        std::lock_guard l(_m);
211
0
        if (_writer_status.ok()) {
212
0
            _writer_status.update(close_st);
213
0
        }
214
0
    }
215
    // should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout
216
0
    _set_ready_to_finish();
217
0
}
218
219
0
void AsyncResultWriter::_set_ready_to_finish() {
220
0
    DCHECK(_finish_dependency);
221
0
    _finish_dependency->set_ready();
222
0
}
223
224
0
Status AsyncResultWriter::_projection_block(doris::Block& input_block, doris::Block* output_block) {
225
0
    Status status = Status::OK();
226
0
    if (input_block.rows() == 0) {
227
0
        return status;
228
0
    }
229
0
    RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs,
230
0
                                                                       input_block, output_block));
231
0
    materialize_block_inplace(*output_block);
232
0
    return status;
233
0
}
234
235
0
void AsyncResultWriter::force_close(Status s) {
236
0
    std::lock_guard l(_m);
237
0
    _writer_status.update(s);
238
0
    DCHECK(_dependency);
239
0
    if (_is_finished()) {
240
0
        _dependency->set_ready();
241
0
    }
242
0
    _cv.notify_one();
243
0
}
244
245
0
void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
246
0
    if (_low_memory_mode) {
247
0
        return;
248
0
    }
249
250
0
    const auto allocated_bytes = b->allocated_bytes();
251
0
    if (_free_blocks.enqueue(std::move(b))) {
252
0
        _memory_used_counter->update(allocated_bytes);
253
0
    }
254
0
}
255
256
0
std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::Block* block, size_t rows) {
257
0
    std::unique_ptr<Block> b;
258
0
    if (!_free_blocks.try_dequeue(b)) {
259
0
        b = block->create_same_struct_block(rows, true);
260
0
    } else {
261
0
        _memory_used_counter->update(-b->allocated_bytes());
262
0
    }
263
0
    b->swap(*block);
264
0
    return b;
265
0
}
266
267
template <typename T>
268
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
269
                  RuntimeProfile::Counter* memory_used_counter = nullptr);
270
0
void AsyncResultWriter::set_low_memory_mode() {
271
0
    _low_memory_mode = true;
272
0
    clear_blocks(_free_blocks, _memory_used_counter);
273
0
}
274
} // namespace doris