Coverage Report

Created: 2026-04-17 00:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/async_result_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/async_result_writer.h"
19
20
#include "common/status.h"
21
#include "core/block/block.h"
22
#include "core/block/materialize_block.h"
23
#include "exec/pipeline/dependency.h"
24
#include "exprs/vexpr_context.h"
25
#include "runtime/exec_env.h"
26
#include "runtime/fragment_mgr.h"
27
#include "runtime/runtime_state.h"
28
29
namespace doris {
30
class ObjectPool;
31
class RowDescriptor;
32
class TExpr;
33
34
AsyncResultWriter::AsyncResultWriter(const doris::VExprContextSPtrs& output_expr_ctxs,
35
                                     std::shared_ptr<Dependency> dep,
36
                                     std::shared_ptr<Dependency> fin_dep)
37
43
        : _vec_output_expr_ctxs(output_expr_ctxs), _dependency(dep), _finish_dependency(fin_dep) {}
38
39
0
Status AsyncResultWriter::sink(Block* block, bool eos) {
40
0
    auto rows = block->rows();
41
0
    std::unique_ptr<Block> add_block;
42
0
    if (rows) {
43
0
        add_block = _get_free_block(block, rows);
44
0
    }
45
46
0
    std::lock_guard l(_m);
47
    // if io task failed, just return error status to
48
    // end the query
49
0
    if (!_writer_status.ok()) {
50
0
        return _writer_status.status();
51
0
    }
52
53
0
    DCHECK(_dependency);
54
0
    if (_is_finished()) {
55
0
        _dependency->set_ready();
56
0
    }
57
0
    if (rows) {
58
0
        _memory_used_counter->update(add_block->allocated_bytes());
59
0
        _data_queue.emplace_back(std::move(add_block));
60
0
        if (!_data_queue_is_available() && !_is_finished()) {
61
0
            _dependency->block();
62
0
        }
63
0
    }
64
    // in 'process block' we check _eos first and _data_queue second so here
65
    // in the lock. must modify the _eos after change _data_queue to make sure
66
    // not lead the logic error in multi thread
67
0
    _eos = eos;
68
69
0
    _cv.notify_one();
70
0
    return Status::OK();
71
0
}
72
73
0
std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() {
74
0
    std::lock_guard l(_m);
75
0
    DCHECK(!_data_queue.empty());
76
0
    auto block = std::move(_data_queue.front());
77
0
    _data_queue.pop_front();
78
0
    DCHECK(_dependency);
79
0
    if (_data_queue_is_available()) {
80
0
        _dependency->set_ready();
81
0
    }
82
0
    _memory_used_counter->update(-block->allocated_bytes());
83
0
    return block;
84
0
}
85
86
0
Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* operator_profile) {
87
    // Attention!!!
88
    // AsyncResultWriter::open is called asynchronously,
89
    // so we need to setupt the operator_profile and memory counter here,
90
    // or else the counter can be nullptr when AsyncResultWriter::sink is called.
91
0
    _operator_profile = operator_profile;
92
0
    DCHECK(_operator_profile->get_child("CommonCounters") != nullptr);
93
0
    _memory_used_counter =
94
0
            _operator_profile->get_child("CommonCounters")->get_counter("MemoryUsage");
95
0
    DCHECK(_memory_used_counter != nullptr);
96
    // Should set to false here, to
97
0
    DCHECK(_finish_dependency);
98
0
    _finish_dependency->block();
99
    // This is a async thread, should lock the task ctx, to make sure runtimestate and operator_profile
100
    // not deconstructed before the thread exit.
101
0
    auto task_ctx = state->get_task_execution_context();
102
0
    RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
103
0
            [this, state, operator_profile, task_ctx]() {
104
0
                SCOPED_ATTACH_TASK(state);
105
0
                auto task_lock = task_ctx.lock();
106
0
                if (task_lock == nullptr) {
107
0
                    return;
108
0
                }
109
0
                this->process_block(state, operator_profile);
110
0
                task_lock.reset();
111
0
            }));
112
0
    return Status::OK();
113
0
}
114
115
0
void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* operator_profile) {
116
0
    if (auto status = open(state, operator_profile); !status.ok()) {
117
0
        force_close(status);
118
0
    }
119
120
0
    if (state && state->get_query_ctx() && state->get_query_ctx()->workload_group()) {
121
0
        if (auto cg_ctl_sptr =
122
0
                    state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) {
123
0
            Status ret = cg_ctl_sptr->add_thread_to_cgroup();
124
0
            if (ret.ok()) {
125
0
                std::string wg_tname =
126
0
                        "asyc_wr_" + state->get_query_ctx()->workload_group()->name();
127
0
                Thread::set_self_name(wg_tname);
128
0
            }
129
0
        }
130
0
    }
131
132
0
    DCHECK(_dependency);
133
0
    while (_writer_status.ok()) {
134
0
        ThreadCpuStopWatch cpu_time_stop_watch;
135
0
        cpu_time_stop_watch.start();
136
0
        Defer defer {[&]() {
137
0
            if (state && state->get_query_ctx()) {
138
0
                state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
139
0
                        cpu_time_stop_watch.elapsed_time());
140
0
            }
141
0
        }};
142
143
        //1) wait scan operator write data
144
0
        {
145
0
            std::unique_lock l(_m);
146
            // When the query is cancelled, _writer_status may be set to error status in force_close method.
147
            // When the BE process is exit gracefully, the fragment mgr's thread pool will be shutdown,
148
            // and the async thread will be exit.
149
0
            while (!_eos && _data_queue.empty() && _writer_status.ok() &&
150
0
                   !ExecEnv::GetInstance()->fragment_mgr()->shutting_down()) {
151
                // Add 1s to check to avoid lost signal
152
0
                _cv.wait_for(l, std::chrono::seconds(1));
153
0
            }
154
            // If writer status is not ok, then we should not change its status to avoid lost the actual error status.
155
0
            if (ExecEnv::GetInstance()->fragment_mgr()->shutting_down() && _writer_status.ok()) {
156
0
                _writer_status.update(Status::InternalError<false>("FragmentMgr is shutting down"));
157
0
            }
158
159
            //check if eos or writer error
160
0
            if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
161
0
                _data_queue.clear();
162
0
                break;
163
0
            }
164
0
        }
165
166
        //2) get the block from  data queue and write to downstream
167
0
        auto block = _get_block_from_queue();
168
0
        auto status = write(state, *block);
169
0
        if (!status.ok()) [[unlikely]] {
170
0
            std::unique_lock l(_m);
171
0
            _writer_status.update(status);
172
0
            if (_is_finished()) {
173
0
                _dependency->set_ready();
174
0
            }
175
0
            break;
176
0
        }
177
178
0
        _return_free_block(std::move(block));
179
0
    }
180
181
0
    bool need_finish = false;
182
0
    {
183
        // If the last block is sent successfuly, then call finish to clear the buffer or commit
184
        // transactions.
185
        // Using lock to make sure the writer status is not modified
186
        // There is a unique ptr err_msg in Status, if it is modified, the unique ptr
187
        // maybe released. And it will core because use after free.
188
0
        std::lock_guard l(_m);
189
0
        if (_writer_status.ok() && _eos) {
190
0
            need_finish = true;
191
0
        }
192
0
    }
193
    // eos only means the last block is input to the queue and there is no more block to be added,
194
    // it is not sure that the block is written to stream.
195
0
    if (need_finish) {
196
        // Should not call finish in lock because it may hang, and it will lock _m too long.
197
        // And get_writer_status will also need this lock, it will block pipeline exec thread.
198
0
        Status st = finish(state);
199
0
        _writer_status.update(st);
200
0
    }
201
0
    Status st = Status::OK();
202
0
    { st = _writer_status.status(); }
203
204
0
    Status close_st = close(st);
205
0
    {
206
        // If it is already failed before, then not update the write status so that we could get
207
        // the real reason.
208
0
        std::lock_guard l(_m);
209
0
        if (_writer_status.ok()) {
210
0
            _writer_status.update(close_st);
211
0
        }
212
0
    }
213
    // should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout
214
0
    _set_ready_to_finish();
215
0
}
216
217
0
void AsyncResultWriter::_set_ready_to_finish() {
218
0
    DCHECK(_finish_dependency);
219
0
    _finish_dependency->set_ready();
220
0
}
221
222
3
Status AsyncResultWriter::_projection_block(doris::Block& input_block, doris::Block* output_block) {
223
3
    Status status = Status::OK();
224
3
    if (input_block.rows() == 0) {
225
0
        return status;
226
0
    }
227
3
    RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs,
228
3
                                                                       input_block, output_block));
229
3
    materialize_block_inplace(*output_block);
230
3
    return status;
231
3
}
232
233
0
void AsyncResultWriter::force_close(Status s) {
234
0
    std::lock_guard l(_m);
235
0
    _writer_status.update(s);
236
0
    DCHECK(_dependency);
237
0
    if (_is_finished()) {
238
0
        _dependency->set_ready();
239
0
    }
240
0
    _cv.notify_one();
241
0
}
242
243
0
void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
244
0
    if (_low_memory_mode) {
245
0
        return;
246
0
    }
247
248
0
    const auto allocated_bytes = b->allocated_bytes();
249
0
    if (_free_blocks.enqueue(std::move(b))) {
250
0
        _memory_used_counter->update(allocated_bytes);
251
0
    }
252
0
}
253
254
0
std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::Block* block, size_t rows) {
255
0
    std::unique_ptr<Block> b;
256
0
    if (!_free_blocks.try_dequeue(b)) {
257
0
        b = block->create_same_struct_block(rows, true);
258
0
    } else {
259
0
        _memory_used_counter->update(-b->allocated_bytes());
260
0
    }
261
0
    b->swap(*block);
262
0
    return b;
263
0
}
264
265
template <typename T>
266
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
267
                  RuntimeProfile::Counter* memory_used_counter = nullptr);
268
0
void AsyncResultWriter::set_low_memory_mode() {
269
0
    _low_memory_mode = true;
270
0
    clear_blocks(_free_blocks, _memory_used_counter);
271
0
}
272
} // namespace doris