Coverage Report

Created: 2024-11-21 18:14

/root/doris/be/src/pipeline/pipeline_task.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "pipeline_task.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <stddef.h>
24
25
#include <ostream>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "pipeline/exec/operator.h"
30
#include "pipeline/exec/scan_operator.h"
31
#include "pipeline/pipeline.h"
32
#include "pipeline/pipeline_fragment_context.h"
33
#include "pipeline/task_queue.h"
34
#include "runtime/descriptors.h"
35
#include "runtime/query_context.h"
36
#include "runtime/thread_context.h"
37
#include "util/container_util.hpp"
38
#include "util/defer_op.h"
39
#include "util/mem_info.h"
40
#include "util/runtime_profile.h"
41
42
namespace doris {
43
class RuntimeState;
44
} // namespace doris
45
46
namespace doris::pipeline {
47
48
PipelineTask::PipelineTask(
49
        PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
50
        PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
51
        std::map<int,
52
                 std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
53
                le_state_map,
54
        int task_idx)
55
        : _index(task_id),
56
          _pipeline(pipeline),
57
          _opened(false),
58
          _state(state),
59
          _fragment_context(fragment_context),
60
          _parent_profile(parent_profile),
61
          _operators(pipeline->operators()),
62
          _source(_operators.front().get()),
63
          _root(_operators.back().get()),
64
          _sink(pipeline->sink_shared_pointer()),
65
          _le_state_map(std::move(le_state_map)),
66
          _task_idx(task_idx),
67
0
          _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
68
0
    _pipeline_task_watcher.start();
69
70
0
    auto shared_state = _sink->create_shared_state();
71
0
    if (shared_state) {
72
0
        _sink_shared_state = shared_state;
73
0
    }
74
0
}
75
76
Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
77
0
                             QueryContext* query_ctx) {
78
0
    DCHECK(_sink);
79
0
    _init_profile();
80
0
    SCOPED_TIMER(_task_profile->total_time_counter());
81
0
    SCOPED_CPU_TIMER(_task_cpu_timer);
82
0
    SCOPED_TIMER(_prepare_timer);
83
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::prepare", {
84
0
        Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task prepare failed");
85
0
        return status;
86
0
    });
87
0
    {
88
        // set sink local state
89
0
        LocalSinkStateInfo info {_task_idx,
90
0
                                 _task_profile.get(),
91
0
                                 local_params.sender_id,
92
0
                                 get_sink_shared_state().get(),
93
0
                                 _le_state_map,
94
0
                                 tsink};
95
0
        RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
96
0
    }
97
98
0
    _scan_ranges = find_with_default(local_params.per_node_scan_ranges,
99
0
                                     _operators.front()->node_id(), _scan_ranges);
100
0
    auto* parent_profile = _state->get_sink_local_state()->profile();
101
0
    query_ctx->register_query_statistics(
102
0
            _state->get_sink_local_state()->get_query_statistics_ptr());
103
104
0
    for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
105
0
        auto& op = _operators[op_idx];
106
0
        LocalStateInfo info {parent_profile, _scan_ranges, get_op_shared_state(op->operator_id()),
107
0
                             _le_state_map, _task_idx};
108
0
        RETURN_IF_ERROR(op->setup_local_state(_state, info));
109
0
        parent_profile = _state->get_local_state(op->operator_id())->profile();
110
0
        query_ctx->register_query_statistics(
111
0
                _state->get_local_state(op->operator_id())->get_query_statistics_ptr());
112
0
    }
113
0
    {
114
0
        std::vector<Dependency*> filter_dependencies;
115
0
        const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies();
116
0
        std::copy(deps.begin(), deps.end(),
117
0
                  std::inserter(filter_dependencies, filter_dependencies.end()));
118
119
0
        std::unique_lock<std::mutex> lc(_dependency_lock);
120
0
        filter_dependencies.swap(_filter_dependencies);
121
0
    }
122
0
    if (query_context()->is_cancelled()) {
123
0
        clear_blocking_state();
124
0
    }
125
0
    return Status::OK();
126
0
}
127
128
0
Status PipelineTask::_extract_dependencies() {
129
0
    std::vector<std::vector<Dependency*>> read_dependencies;
130
0
    std::vector<Dependency*> write_dependencies;
131
0
    std::vector<Dependency*> finish_dependencies;
132
0
    read_dependencies.resize(_operators.size());
133
0
    size_t i = 0;
134
0
    for (auto& op : _operators) {
135
0
        auto result = _state->get_local_state_result(op->operator_id());
136
0
        if (!result) {
137
0
            return result.error();
138
0
        }
139
0
        auto* local_state = result.value();
140
0
        read_dependencies[i] = local_state->dependencies();
141
0
        auto* fin_dep = local_state->finishdependency();
142
0
        if (fin_dep) {
143
0
            finish_dependencies.push_back(fin_dep);
144
0
        }
145
0
        i++;
146
0
    }
147
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::_extract_dependencies", {
148
0
        Status status = Status::Error<INTERNAL_ERROR>(
149
0
                "fault_inject pipeline_task _extract_dependencies failed");
150
0
        return status;
151
0
    });
152
0
    {
153
0
        auto* local_state = _state->get_sink_local_state();
154
0
        write_dependencies = local_state->dependencies();
155
0
        auto* fin_dep = local_state->finishdependency();
156
0
        if (fin_dep) {
157
0
            finish_dependencies.push_back(fin_dep);
158
0
        }
159
0
    }
160
0
    {
161
0
        std::unique_lock<std::mutex> lc(_dependency_lock);
162
0
        read_dependencies.swap(_read_dependencies);
163
0
        write_dependencies.swap(_write_dependencies);
164
0
        finish_dependencies.swap(_finish_dependencies);
165
0
    }
166
0
    return Status::OK();
167
0
}
168
169
0
void PipelineTask::_init_profile() {
170
0
    _task_profile =
171
0
            std::make_unique<RuntimeProfile>(fmt::format("PipelineTask (index={})", _index));
172
0
    _parent_profile->add_child(_task_profile.get(), true, nullptr);
173
0
    _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
174
175
0
    static const char* exec_time = "ExecuteTime";
176
0
    _exec_timer = ADD_TIMER(_task_profile, exec_time);
177
0
    _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
178
0
    _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
179
0
    _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
180
0
    _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT);
181
0
    _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
182
0
    _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
183
184
0
    _wait_worker_timer = ADD_TIMER_WITH_LEVEL(_task_profile, "WaitWorkerTime", 1);
185
186
0
    _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
187
0
    _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
188
0
    _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
189
0
}
190
191
0
void PipelineTask::_fresh_profile_counter() {
192
0
    COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
193
0
    COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
194
0
}
195
196
0
Status PipelineTask::_open() {
197
0
    SCOPED_TIMER(_task_profile->total_time_counter());
198
0
    SCOPED_CPU_TIMER(_task_cpu_timer);
199
0
    SCOPED_TIMER(_open_timer);
200
0
    _dry_run = _sink->should_dry_run(_state);
201
0
    for (auto& o : _operators) {
202
0
        auto* local_state = _state->get_local_state(o->operator_id());
203
0
        auto st = local_state->open(_state);
204
0
        DCHECK(st.is<ErrorCode::PIP_WAIT_FOR_RF>() ? !_filter_dependencies.empty() : true)
205
0
                << debug_string();
206
0
        RETURN_IF_ERROR(st);
207
0
    }
208
0
    RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
209
0
    RETURN_IF_ERROR(_extract_dependencies());
210
0
    _block = doris::vectorized::Block::create_unique();
211
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::open", {
212
0
        Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task open failed");
213
0
        return status;
214
0
    });
215
0
    _opened = true;
216
0
    return Status::OK();
217
0
}
218
219
0
bool PipelineTask::_wait_to_start() {
220
    // Before task starting, we should make sure
221
    // 1. Execution dependency is ready (which is controlled by FE 2-phase commit)
222
    // 2. Runtime filter dependencies are ready
223
0
    _blocked_dep = _execution_dep->is_blocked_by(this);
224
0
    if (_blocked_dep != nullptr) {
225
0
        static_cast<Dependency*>(_blocked_dep)->start_watcher();
226
0
        if (_wake_up_by_downstream) {
227
0
            _eos = true;
228
0
        }
229
0
        return true;
230
0
    }
231
232
0
    for (auto* op_dep : _filter_dependencies) {
233
0
        _blocked_dep = op_dep->is_blocked_by(this);
234
0
        if (_blocked_dep != nullptr) {
235
0
            _blocked_dep->start_watcher();
236
0
            if (_wake_up_by_downstream) {
237
0
                _eos = true;
238
0
            }
239
0
            return true;
240
0
        }
241
0
    }
242
0
    return false;
243
0
}
244
245
0
bool PipelineTask::_is_blocked() {
246
0
    Defer defer([this] {
247
0
        if (_blocked_dep != nullptr) {
248
0
            _task_profile->add_info_string("TaskState", "Blocked");
249
0
            _task_profile->add_info_string("BlockedByDependency", _blocked_dep->name());
250
0
        }
251
0
    });
252
    // `_dry_run = true` means we do not need data from source operator.
253
0
    if (!_dry_run) {
254
0
        for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
255
            // `_read_dependencies` is organized according to operators. For each operator, running condition is met iff all dependencies are ready.
256
0
            for (auto* dep : _read_dependencies[i]) {
257
0
                _blocked_dep = dep->is_blocked_by(this);
258
0
                if (_blocked_dep != nullptr) {
259
0
                    _blocked_dep->start_watcher();
260
0
                    if (_wake_up_by_downstream) {
261
0
                        _eos = true;
262
0
                    }
263
0
                    return true;
264
0
                }
265
0
            }
266
            // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators.
267
0
            if (!_operators[i]->need_more_input_data(_state)) {
268
0
                if (VLOG_DEBUG_IS_ON) {
269
0
                    VLOG_DEBUG << "query: " << print_id(_state->query_id())
270
0
                               << ", task id: " << _index << ", operator " << i
271
0
                               << " not need_more_input_data";
272
0
                }
273
0
                break;
274
0
            }
275
0
        }
276
0
    }
277
278
0
    for (auto* op_dep : _write_dependencies) {
279
0
        _blocked_dep = op_dep->is_blocked_by(this);
280
0
        if (_blocked_dep != nullptr) {
281
0
            _blocked_dep->start_watcher();
282
0
            if (_wake_up_by_downstream) {
283
0
                _eos = true;
284
0
            }
285
0
            return true;
286
0
        }
287
0
    }
288
0
    return false;
289
0
}
290
291
0
Status PipelineTask::execute(bool* eos) {
292
0
    SCOPED_TIMER(_task_profile->total_time_counter());
293
0
    SCOPED_TIMER(_exec_timer);
294
0
    SCOPED_ATTACH_TASK(_state);
295
0
    _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
296
0
    *eos = _eos;
297
0
    if (_eos) {
298
        // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
299
0
        return Status::OK();
300
0
    }
301
0
    int64_t time_spent = 0;
302
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
303
0
        Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task execute failed");
304
0
        return status;
305
0
    });
306
0
    ThreadCpuStopWatch cpu_time_stop_watch;
307
0
    cpu_time_stop_watch.start();
308
0
    Defer defer {[&]() {
309
0
        if (_task_queue) {
310
0
            _task_queue->update_statistics(this, time_spent);
311
0
        }
312
0
        int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
313
0
        _task_cpu_timer->update(delta_cpu_time);
314
0
        auto cpu_qs = query_context()->get_cpu_statistics();
315
0
        if (cpu_qs) {
316
0
            cpu_qs->add_cpu_nanos(delta_cpu_time);
317
0
        }
318
0
        query_context()->update_wg_cpu_adder(delta_cpu_time);
319
0
    }};
320
0
    if (_wait_to_start()) {
321
0
        return Status::OK();
322
0
    }
323
0
    if (_wake_up_by_downstream) {
324
0
        _eos = true;
325
0
        *eos = true;
326
0
        return Status::OK();
327
0
    }
328
    // The status must be runnable
329
0
    if (!_opened && !_fragment_context->is_canceled()) {
330
0
        RETURN_IF_ERROR(_open());
331
0
    }
332
333
0
    _task_profile->add_info_string("TaskState", "Runnable");
334
0
    _task_profile->add_info_string("BlockedByDependency", "");
335
0
    while (!_fragment_context->is_canceled()) {
336
0
        if (_is_blocked()) {
337
0
            return Status::OK();
338
0
        }
339
0
        if (_wake_up_by_downstream) {
340
0
            _eos = true;
341
0
            *eos = true;
342
0
            return Status::OK();
343
0
        }
344
345
        /// When a task is cancelled,
346
        /// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
347
        /// Here, checking whether it is cancelled to prevent tasks in a blocking state from being re-executed.
348
0
        if (_fragment_context->is_canceled()) {
349
0
            break;
350
0
        }
351
352
0
        if (time_spent > THREAD_TIME_SLICE) {
353
0
            COUNTER_UPDATE(_yield_counts, 1);
354
0
            break;
355
0
        }
356
0
        _block->clear_column_data(_root->row_desc().num_materialized_slots());
357
0
        auto* block = _block.get();
358
359
0
        auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
360
0
        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
361
0
            RETURN_IF_ERROR(_sink->revoke_memory(_state));
362
0
            continue;
363
0
        }
364
0
        *eos = _eos;
365
0
        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
366
0
            Status status =
367
0
                    Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed");
368
0
            return status;
369
0
        });
370
        // `_dry_run` means sink operator need no more data
371
        // `_sink->is_finished(_state)` means sink operator should be finished
372
0
        if (_dry_run || _sink->is_finished(_state)) {
373
0
            *eos = true;
374
0
            _eos = true;
375
0
        } else {
376
0
            SCOPED_TIMER(_get_block_timer);
377
0
            _get_block_counter->update(1);
378
0
            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
379
0
        }
380
381
0
        if (_block->rows() != 0 || *eos) {
382
0
            SCOPED_TIMER(_sink_timer);
383
0
            Status status = Status::OK();
384
            // Define a lambda function to catch sink exception, because sink will check
385
            // return error status with EOF, it is special, could not return directly.
386
0
            auto sink_function = [&]() -> Status {
387
0
                Status internal_st;
388
0
                internal_st = _sink->sink(_state, block, *eos);
389
0
                return internal_st;
390
0
            };
391
0
            status = sink_function();
392
0
            if (!status.is<ErrorCode::END_OF_FILE>()) {
393
0
                RETURN_IF_ERROR(status);
394
0
            }
395
0
            *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
396
0
            if (*eos) { // just return, the scheduler will do finish work
397
0
                _eos = true;
398
0
                _task_profile->add_info_string("TaskState", "Finished");
399
0
                return Status::OK();
400
0
            }
401
0
        }
402
0
    }
403
404
0
    static_cast<void>(get_task_queue()->push_back(this));
405
0
    return Status::OK();
406
0
}
407
408
0
bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes) {
409
0
    auto* query_ctx = state->get_query_ctx();
410
0
    auto wg = query_ctx->workload_group();
411
0
    if (!wg) {
412
0
        LOG_ONCE(INFO) << "no workload group for query " << print_id(state->query_id());
413
0
        return false;
414
0
    }
415
0
    const auto min_revocable_mem_bytes = state->min_revocable_mem();
416
417
0
    if (UNLIKELY(state->enable_force_spill())) {
418
0
        if (revocable_mem_bytes >= min_revocable_mem_bytes) {
419
0
            LOG_ONCE(INFO) << "spill force, query: " << print_id(state->query_id());
420
0
            return true;
421
0
        }
422
0
    }
423
424
0
    bool is_wg_mem_low_water_mark = false;
425
0
    bool is_wg_mem_high_water_mark = false;
426
0
    wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
427
0
    if (is_wg_mem_high_water_mark) {
428
0
        if (revocable_mem_bytes > min_revocable_mem_bytes) {
429
0
            VLOG_DEBUG << "query " << print_id(state->query_id())
430
0
                       << " revoke memory, hight water mark";
431
0
            return true;
432
0
        }
433
0
        return false;
434
0
    } else if (is_wg_mem_low_water_mark) {
435
0
        int64_t spill_threshold = query_ctx->spill_threshold();
436
0
        int64_t memory_usage = query_ctx->query_mem_tracker->consumption();
437
0
        if (spill_threshold == 0 || memory_usage < spill_threshold) {
438
0
            return false;
439
0
        }
440
0
        auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num();
441
0
        DCHECK(big_memory_operator_num >= 0);
442
0
        int64_t mem_limit_of_op;
443
0
        if (0 == big_memory_operator_num) {
444
0
            return false;
445
0
        } else {
446
0
            mem_limit_of_op = spill_threshold / big_memory_operator_num;
447
0
        }
448
449
0
        LOG_EVERY_T(INFO, 1) << "query " << print_id(state->query_id())
450
0
                             << " revoke memory, low water mark, revocable_mem_bytes: "
451
0
                             << PrettyPrinter::print_bytes(revocable_mem_bytes)
452
0
                             << ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op)
453
0
                             << ", min_revocable_mem_bytes: "
454
0
                             << PrettyPrinter::print_bytes(min_revocable_mem_bytes)
455
0
                             << ", memory_usage: " << PrettyPrinter::print_bytes(memory_usage)
456
0
                             << ", spill_threshold: " << PrettyPrinter::print_bytes(spill_threshold)
457
0
                             << ", big_memory_operator_num: " << big_memory_operator_num;
458
0
        return (revocable_mem_bytes > mem_limit_of_op ||
459
0
                revocable_mem_bytes > min_revocable_mem_bytes);
460
0
    } else {
461
0
        return false;
462
0
    }
463
0
}
464
465
0
void PipelineTask::finalize() {
466
0
    std::unique_lock<std::mutex> lc(_dependency_lock);
467
0
    _finalized = true;
468
0
    _sink_shared_state.reset();
469
0
    _op_shared_states.clear();
470
0
    _le_state_map.clear();
471
0
}
472
473
0
Status PipelineTask::close(Status exec_status) {
474
0
    int64_t close_ns = 0;
475
0
    Defer defer {[&]() {
476
0
        if (_task_queue) {
477
0
            _task_queue->update_statistics(this, close_ns);
478
0
        }
479
0
    }};
480
0
    Status s;
481
0
    {
482
0
        SCOPED_RAW_TIMER(&close_ns);
483
0
        s = _sink->close(_state, exec_status);
484
0
        for (auto& op : _operators) {
485
0
            auto tem = op->close(_state);
486
0
            if (!tem.ok() && s.ok()) {
487
0
                s = tem;
488
0
            }
489
0
        }
490
0
    }
491
0
    if (_opened) {
492
0
        _fresh_profile_counter();
493
0
        COUNTER_SET(_close_timer, close_ns);
494
0
        COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
495
0
    }
496
0
    return s;
497
0
}
498
499
0
std::string PipelineTask::debug_string() {
500
0
    std::unique_lock<std::mutex> lc(_dependency_lock);
501
0
    fmt::memory_buffer debug_string_buffer;
502
503
0
    fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
504
0
    fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
505
0
                   print_id(_state->fragment_instance_id()));
506
507
0
    auto* cur_blocked_dep = _blocked_dep;
508
0
    auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
509
0
    fmt::format_to(debug_string_buffer,
510
0
                   "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
511
0
                   "{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
512
0
                   "running = {}\noperators: ",
513
0
                   (void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
514
0
                   _wake_up_by_downstream.load(),
515
0
                   cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
516
0
                   is_running());
517
0
    for (size_t i = 0; i < _operators.size(); i++) {
518
0
        fmt::format_to(debug_string_buffer, "\n{}",
519
0
                       _opened && !_finalized ? _operators[i]->debug_string(_state, i)
520
0
                                              : _operators[i]->debug_string(i));
521
0
    }
522
0
    fmt::format_to(debug_string_buffer, "\n{}\n",
523
0
                   _opened && !_finalized ? _sink->debug_string(_state, _operators.size())
524
0
                                          : _sink->debug_string(_operators.size()));
525
0
    if (_finalized) {
526
0
        return fmt::to_string(debug_string_buffer);
527
0
    }
528
529
0
    size_t i = 0;
530
0
    for (; i < _read_dependencies.size(); i++) {
531
0
        for (size_t j = 0; j < _read_dependencies[i].size(); j++) {
532
0
            fmt::format_to(debug_string_buffer, "{}. {}\n", i,
533
0
                           _read_dependencies[i][j]->debug_string(i + 1));
534
0
        }
535
0
    }
536
537
0
    fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
538
0
    for (size_t j = 0; j < _write_dependencies.size(); j++, i++) {
539
0
        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
540
0
                       _write_dependencies[j]->debug_string(i + 1));
541
0
    }
542
543
0
    fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency Information: \n");
544
0
    for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) {
545
0
        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
546
0
                       _filter_dependencies[j]->debug_string(i + 1));
547
0
    }
548
549
0
    fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
550
0
    for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
551
0
        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
552
0
                       _finish_dependencies[j]->debug_string(j + 1));
553
0
    }
554
0
    return fmt::to_string(debug_string_buffer);
555
0
}
556
557
0
void PipelineTask::wake_up() {
558
    // call by dependency
559
0
    static_cast<void>(get_task_queue()->push_back(this));
560
0
}
561
562
0
QueryContext* PipelineTask::query_context() {
563
0
    return _fragment_context->get_query_ctx();
564
0
}
565
} // namespace doris::pipeline