Coverage Report

Created: 2025-04-25 12:58

/root/doris/be/src/pipeline/pipeline_task.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "pipeline_task.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <stddef.h>
24
25
#include <algorithm>
26
#include <ostream>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "pipeline/exec/operator.h"
31
#include "pipeline/exec/scan_operator.h"
32
#include "pipeline/pipeline.h"
33
#include "pipeline/pipeline_fragment_context.h"
34
#include "pipeline/task_queue.h"
35
#include "runtime/descriptors.h"
36
#include "runtime/query_context.h"
37
#include "runtime/thread_context.h"
38
#include "util/container_util.hpp"
39
#include "util/defer_op.h"
40
#include "util/mem_info.h"
41
#include "util/runtime_profile.h"
42
43
namespace doris {
44
class RuntimeState;
45
} // namespace doris
46
47
namespace doris::pipeline {
48
49
PipelineTask::PipelineTask(
50
        PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
51
        PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
52
        std::map<int,
53
                 std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
54
                le_state_map,
55
        int task_idx)
56
        : _index(task_id),
57
          _pipeline(pipeline),
58
          _opened(false),
59
          _state(state),
60
          _fragment_context(fragment_context),
61
          _parent_profile(parent_profile),
62
          _operators(pipeline->operators()),
63
          _source(_operators.front().get()),
64
          _root(_operators.back().get()),
65
          _sink(pipeline->sink_shared_pointer()),
66
          _le_state_map(std::move(le_state_map)),
67
          _task_idx(task_idx),
68
0
          _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
69
0
    _pipeline_task_watcher.start();
70
71
0
    auto shared_state = _sink->create_shared_state();
72
0
    if (shared_state) {
73
0
        _sink_shared_state = shared_state;
74
0
    }
75
0
}
76
77
Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
78
0
                             QueryContext* query_ctx) {
79
0
    DCHECK(_sink);
80
0
    _init_profile();
81
0
    SCOPED_TIMER(_task_profile->total_time_counter());
82
0
    SCOPED_CPU_TIMER(_task_cpu_timer);
83
0
    SCOPED_TIMER(_prepare_timer);
84
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::prepare", {
85
0
        Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task prepare failed");
86
0
        return status;
87
0
    });
88
0
    {
89
        // set sink local state
90
0
        LocalSinkStateInfo info {_task_idx,
91
0
                                 _task_profile.get(),
92
0
                                 local_params.sender_id,
93
0
                                 get_sink_shared_state().get(),
94
0
                                 _le_state_map,
95
0
                                 tsink};
96
0
        RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
97
0
    }
98
99
0
    _scan_ranges = find_with_default(local_params.per_node_scan_ranges,
100
0
                                     _operators.front()->node_id(), _scan_ranges);
101
0
    auto* parent_profile = _state->get_sink_local_state()->profile();
102
0
    query_ctx->register_query_statistics(
103
0
            _state->get_sink_local_state()->get_query_statistics_ptr());
104
105
0
    for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
106
0
        auto& op = _operators[op_idx];
107
0
        LocalStateInfo info {parent_profile, _scan_ranges, get_op_shared_state(op->operator_id()),
108
0
                             _le_state_map, _task_idx};
109
0
        RETURN_IF_ERROR(op->setup_local_state(_state, info));
110
0
        parent_profile = _state->get_local_state(op->operator_id())->profile();
111
0
        query_ctx->register_query_statistics(
112
0
                _state->get_local_state(op->operator_id())->get_query_statistics_ptr());
113
0
    }
114
0
    {
115
0
        std::vector<Dependency*> filter_dependencies;
116
0
        const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies();
117
0
        std::copy(deps.begin(), deps.end(),
118
0
                  std::inserter(filter_dependencies, filter_dependencies.end()));
119
120
0
        std::unique_lock<std::mutex> lc(_dependency_lock);
121
0
        filter_dependencies.swap(_filter_dependencies);
122
0
    }
123
0
    if (query_context()->is_cancelled()) {
124
0
        clear_blocking_state();
125
0
    }
126
0
    return Status::OK();
127
0
}
128
129
0
Status PipelineTask::_extract_dependencies() {
130
0
    std::vector<std::vector<Dependency*>> read_dependencies;
131
0
    std::vector<Dependency*> write_dependencies;
132
0
    std::vector<Dependency*> finish_dependencies;
133
0
    read_dependencies.resize(_operators.size());
134
0
    size_t i = 0;
135
0
    for (auto& op : _operators) {
136
0
        auto result = _state->get_local_state_result(op->operator_id());
137
0
        if (!result) {
138
0
            return result.error();
139
0
        }
140
0
        auto* local_state = result.value();
141
0
        read_dependencies[i] = local_state->dependencies();
142
0
        auto* fin_dep = local_state->finishdependency();
143
0
        if (fin_dep) {
144
0
            finish_dependencies.push_back(fin_dep);
145
0
        }
146
0
        i++;
147
0
    }
148
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::_extract_dependencies", {
149
0
        Status status = Status::Error<INTERNAL_ERROR>(
150
0
                "fault_inject pipeline_task _extract_dependencies failed");
151
0
        return status;
152
0
    });
153
0
    {
154
0
        auto* local_state = _state->get_sink_local_state();
155
0
        write_dependencies = local_state->dependencies();
156
0
        auto* fin_dep = local_state->finishdependency();
157
0
        if (fin_dep) {
158
0
            finish_dependencies.push_back(fin_dep);
159
0
        }
160
0
    }
161
0
    {
162
0
        std::unique_lock<std::mutex> lc(_dependency_lock);
163
0
        read_dependencies.swap(_read_dependencies);
164
0
        write_dependencies.swap(_write_dependencies);
165
0
        finish_dependencies.swap(_finish_dependencies);
166
0
    }
167
0
    return Status::OK();
168
0
}
169
170
0
void PipelineTask::_init_profile() {
171
0
    _task_profile =
172
0
            std::make_unique<RuntimeProfile>(fmt::format("PipelineTask (index={})", _index));
173
0
    _parent_profile->add_child(_task_profile.get(), true, nullptr);
174
0
    _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
175
176
0
    static const char* exec_time = "ExecuteTime";
177
0
    _exec_timer = ADD_TIMER(_task_profile, exec_time);
178
0
    _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
179
0
    _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
180
0
    _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
181
0
    _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT);
182
0
    _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
183
0
    _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
184
185
0
    _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
186
187
0
    _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
188
0
    _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
189
0
    _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
190
0
}
191
192
0
void PipelineTask::_fresh_profile_counter() {
193
0
    COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
194
0
    COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
195
0
}
196
197
0
Status PipelineTask::_open() {
198
0
    SCOPED_TIMER(_task_profile->total_time_counter());
199
0
    SCOPED_CPU_TIMER(_task_cpu_timer);
200
0
    SCOPED_TIMER(_open_timer);
201
0
    _dry_run = _sink->should_dry_run(_state);
202
0
    for (auto& o : _operators) {
203
0
        auto* local_state = _state->get_local_state(o->operator_id());
204
0
        auto st = local_state->open(_state);
205
0
        DCHECK(st.is<ErrorCode::PIP_WAIT_FOR_RF>() ? !_filter_dependencies.empty() : true)
206
0
                << debug_string();
207
0
        RETURN_IF_ERROR(st);
208
0
    }
209
0
    RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
210
0
    RETURN_IF_ERROR(_extract_dependencies());
211
0
    _block = doris::vectorized::Block::create_unique();
212
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::open", {
213
0
        Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task open failed");
214
0
        return status;
215
0
    });
216
0
    _opened = true;
217
0
    return Status::OK();
218
0
}
219
220
0
void PipelineTask::set_task_queue(TaskQueue* task_queue) {
221
0
    _task_queue = task_queue;
222
0
}
223
224
0
bool PipelineTask::_wait_to_start() {
225
    // Before task starting, we should make sure
226
    // 1. Execution dependency is ready (which is controlled by FE 2-phase commit)
227
    // 2. Runtime filter dependencies are ready
228
0
    _blocked_dep = _execution_dep->is_blocked_by(this);
229
0
    if (_blocked_dep != nullptr) {
230
0
        static_cast<Dependency*>(_blocked_dep)->start_watcher();
231
0
        return true;
232
0
    }
233
234
0
    for (auto* op_dep : _filter_dependencies) {
235
0
        _blocked_dep = op_dep->is_blocked_by(this);
236
0
        if (_blocked_dep != nullptr) {
237
0
            _blocked_dep->start_watcher();
238
0
            return true;
239
0
        }
240
0
    }
241
0
    return false;
242
0
}
243
244
0
bool PipelineTask::_is_blocked() {
245
0
    Defer defer([this] {
246
0
        if (_blocked_dep != nullptr) {
247
0
            _task_profile->add_info_string("TaskState", "Blocked");
248
0
            _task_profile->add_info_string("BlockedByDependency", _blocked_dep->name());
249
0
        }
250
0
    });
251
    // `_dry_run = true` means we do not need data from source operator.
252
0
    if (!_dry_run) {
253
0
        for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
254
            // `_read_dependencies` is organized according to operators. For each operator, running condition is met iff all dependencies are ready.
255
0
            for (auto* dep : _read_dependencies[i]) {
256
0
                _blocked_dep = dep->is_blocked_by(this);
257
0
                if (_blocked_dep != nullptr) {
258
0
                    _blocked_dep->start_watcher();
259
0
                    return true;
260
0
                }
261
0
            }
262
            // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators.
263
0
            if (!_operators[i]->need_more_input_data(_state)) {
264
0
                if (VLOG_DEBUG_IS_ON) {
265
0
                    VLOG_DEBUG << "query: " << print_id(_state->query_id())
266
0
                               << ", task id: " << _index << ", operator " << i
267
0
                               << " not need_more_input_data";
268
0
                }
269
0
                break;
270
0
            }
271
0
        }
272
0
    }
273
274
0
    for (auto* op_dep : _write_dependencies) {
275
0
        _blocked_dep = op_dep->is_blocked_by(this);
276
0
        if (_blocked_dep != nullptr) {
277
0
            _blocked_dep->start_watcher();
278
0
            return true;
279
0
        }
280
0
    }
281
0
    return false;
282
0
}
283
284
0
Status PipelineTask::execute(bool* eos) {
285
0
    if (_eos) {
286
0
        *eos = true;
287
0
        return Status::OK();
288
0
    }
289
290
0
    SCOPED_TIMER(_task_profile->total_time_counter());
291
0
    SCOPED_TIMER(_exec_timer);
292
0
    SCOPED_ATTACH_TASK(_state);
293
294
0
    int64_t time_spent = 0;
295
0
    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
296
0
        Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task execute failed");
297
0
        return status;
298
0
    });
299
0
    ThreadCpuStopWatch cpu_time_stop_watch;
300
0
    cpu_time_stop_watch.start();
301
0
    Defer defer {[&]() {
302
0
        if (_task_queue) {
303
0
            _task_queue->update_statistics(this, time_spent);
304
0
        }
305
0
        int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
306
0
        _task_cpu_timer->update(delta_cpu_time);
307
0
        auto cpu_qs = query_context()->get_cpu_statistics();
308
0
        if (cpu_qs) {
309
0
            cpu_qs->add_cpu_nanos(delta_cpu_time);
310
0
        }
311
0
        query_context()->update_cpu_time(delta_cpu_time);
312
0
    }};
313
0
    if (_wait_to_start()) {
314
0
        if (config::enable_prefetch_tablet) {
315
0
            RETURN_IF_ERROR(_source->hold_tablets(_state));
316
0
        }
317
0
        return Status::OK();
318
0
    }
319
320
    // The status must be runnable
321
0
    if (!_opened && !_fragment_context->is_canceled()) {
322
0
        DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
323
0
            auto required_pipeline_id =
324
0
                    DebugPoints::instance()->get_debug_param_or_default<int32_t>(
325
0
                            "PipelineTask::execute.open_sleep", "pipeline_id", -1);
326
0
            auto required_task_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
327
0
                    "PipelineTask::execute.open_sleep", "task_id", -1);
328
0
            if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) {
329
0
                LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s";
330
0
                sleep(5);
331
0
            }
332
0
        });
333
334
0
        SCOPED_RAW_TIMER(&time_spent);
335
0
        if (_wake_up_early) {
336
0
            *eos = true;
337
0
            _eos = true;
338
0
            return Status::OK();
339
0
        }
340
0
        RETURN_IF_ERROR(_open());
341
0
    }
342
343
0
    auto set_wake_up_and_dep_ready = [&]() {
344
0
        if (wake_up_early()) {
345
0
            return;
346
0
        }
347
0
        set_wake_up_early();
348
0
        clear_blocking_state();
349
0
    };
350
351
0
    _task_profile->add_info_string("TaskState", "Runnable");
352
0
    _task_profile->add_info_string("BlockedByDependency", "");
353
0
    while (!_fragment_context->is_canceled()) {
354
0
        SCOPED_RAW_TIMER(&time_spent);
355
0
        if (_is_blocked()) {
356
0
            return Status::OK();
357
0
        }
358
359
        /// When a task is cancelled,
360
        /// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
361
        /// Here, checking whether it is cancelled to prevent tasks in a blocking state from being re-executed.
362
0
        if (_fragment_context->is_canceled()) {
363
0
            break;
364
0
        }
365
366
0
        if (time_spent > THREAD_TIME_SLICE) {
367
0
            COUNTER_UPDATE(_yield_counts, 1);
368
0
            break;
369
0
        }
370
0
        _block->clear_column_data(_root->row_desc().num_materialized_slots());
371
0
        auto* block = _block.get();
372
373
0
        auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
374
0
        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
375
0
            RETURN_IF_ERROR(_sink->revoke_memory(_state));
376
0
            continue;
377
0
        }
378
0
        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
379
0
            Status status =
380
0
                    Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed");
381
0
            return status;
382
0
        });
383
        // `_sink->is_finished(_state)` means sink operator should be finished
384
0
        if (_sink->is_finished(_state)) {
385
0
            set_wake_up_and_dep_ready();
386
0
        }
387
388
        // `_dry_run` means sink operator need no more data
389
0
        *eos = wake_up_early() || _dry_run;
390
0
        if (!*eos) {
391
0
            SCOPED_TIMER(_get_block_timer);
392
0
            _get_block_counter->update(1);
393
0
            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
394
0
        }
395
396
0
        if (*eos) {
397
0
            RETURN_IF_ERROR(close(Status::OK(), false));
398
0
        }
399
400
0
        if (_block->rows() != 0 || *eos) {
401
0
            SCOPED_TIMER(_sink_timer);
402
0
            DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
403
0
                auto required_pipeline_id =
404
0
                        DebugPoints::instance()->get_debug_param_or_default<int32_t>(
405
0
                                "PipelineTask::execute.sink_eos_sleep", "pipeline_id", -1);
406
0
                auto required_task_id =
407
0
                        DebugPoints::instance()->get_debug_param_or_default<int32_t>(
408
0
                                "PipelineTask::execute.sink_eos_sleep", "task_id", -1);
409
0
                if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) {
410
0
                    LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep sleep 10s";
411
0
                    sleep(10);
412
0
                }
413
0
            });
414
415
0
            Status status = _sink->sink(_state, block, *eos);
416
417
0
            if (status.is<ErrorCode::END_OF_FILE>()) {
418
0
                set_wake_up_and_dep_ready();
419
0
            } else if (!status) {
420
0
                return status;
421
0
            }
422
423
0
            if (*eos) { // just return, the scheduler will do finish work
424
0
                _task_profile->add_info_string("TaskState", "Finished");
425
0
                _eos = true;
426
0
                return Status::OK();
427
0
            }
428
0
        }
429
0
    }
430
431
0
    RETURN_IF_ERROR(get_task_queue()->push_back(this));
432
0
    return Status::OK();
433
0
}
434
435
0
bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes) {
436
0
    auto* query_ctx = state->get_query_ctx();
437
0
    auto wg = query_ctx->workload_group();
438
0
    if (!wg) {
439
0
        LOG_ONCE(INFO) << "no workload group for query " << print_id(state->query_id());
440
0
        return false;
441
0
    }
442
0
    const auto min_revocable_mem_bytes = state->min_revocable_mem();
443
444
0
    if (UNLIKELY(state->enable_force_spill())) {
445
0
        if (revocable_mem_bytes >= min_revocable_mem_bytes) {
446
0
            LOG_ONCE(INFO) << "spill force, query: " << print_id(state->query_id());
447
0
            return true;
448
0
        }
449
0
    }
450
451
0
    bool is_wg_mem_low_water_mark = false;
452
0
    bool is_wg_mem_high_water_mark = false;
453
0
    wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
454
0
    if (is_wg_mem_high_water_mark) {
455
0
        if (revocable_mem_bytes > min_revocable_mem_bytes) {
456
0
            VLOG_DEBUG << "query " << print_id(state->query_id())
457
0
                       << " revoke memory, hight water mark";
458
0
            return true;
459
0
        }
460
0
        return false;
461
0
    } else if (is_wg_mem_low_water_mark) {
462
0
        int64_t spill_threshold = query_ctx->spill_threshold();
463
0
        int64_t memory_usage = query_ctx->query_mem_tracker->consumption();
464
0
        if (spill_threshold == 0 || memory_usage < spill_threshold) {
465
0
            return false;
466
0
        }
467
0
        auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num();
468
0
        DCHECK(big_memory_operator_num >= 0);
469
0
        int64_t mem_limit_of_op;
470
0
        if (0 == big_memory_operator_num) {
471
0
            return false;
472
0
        } else {
473
0
            mem_limit_of_op = spill_threshold / big_memory_operator_num;
474
0
        }
475
476
0
        LOG_EVERY_T(INFO, 1) << "query " << print_id(state->query_id())
477
0
                             << " revoke memory, low water mark, revocable_mem_bytes: "
478
0
                             << PrettyPrinter::print_bytes(revocable_mem_bytes)
479
0
                             << ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op)
480
0
                             << ", min_revocable_mem_bytes: "
481
0
                             << PrettyPrinter::print_bytes(min_revocable_mem_bytes)
482
0
                             << ", memory_usage: " << PrettyPrinter::print_bytes(memory_usage)
483
0
                             << ", spill_threshold: " << PrettyPrinter::print_bytes(spill_threshold)
484
0
                             << ", big_memory_operator_num: " << big_memory_operator_num;
485
0
        return (revocable_mem_bytes > mem_limit_of_op ||
486
0
                revocable_mem_bytes > min_revocable_mem_bytes);
487
0
    } else {
488
0
        return false;
489
0
    }
490
0
}
491
492
0
void PipelineTask::finalize() {
493
0
    std::unique_lock<std::mutex> lc(_dependency_lock);
494
0
    _finalized = true;
495
0
    _sink_shared_state.reset();
496
0
    _op_shared_states.clear();
497
0
    _le_state_map.clear();
498
0
}
499
500
0
Status PipelineTask::close(Status exec_status, bool close_sink) {
501
0
    int64_t close_ns = 0;
502
0
    Status s;
503
0
    {
504
0
        SCOPED_RAW_TIMER(&close_ns);
505
0
        if (close_sink) {
506
0
            s = _sink->close(_state, exec_status);
507
0
        }
508
0
        for (auto& op : _operators) {
509
0
            auto tem = op->close(_state);
510
0
            if (!tem.ok() && s.ok()) {
511
0
                s = tem;
512
0
            }
513
0
        }
514
0
    }
515
0
    if (_opened) {
516
0
        COUNTER_UPDATE(_close_timer, close_ns);
517
0
        COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
518
0
    }
519
520
0
    if (close_sink && _opened) {
521
0
        _task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" : "false");
522
0
        _fresh_profile_counter();
523
0
    }
524
525
0
    if (_task_queue) {
526
0
        _task_queue->update_statistics(this, close_ns);
527
0
    }
528
0
    return s;
529
0
}
530
531
0
std::string PipelineTask::debug_string() {
532
0
    std::unique_lock<std::mutex> lc(_dependency_lock);
533
0
    fmt::memory_buffer debug_string_buffer;
534
535
0
    fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
536
0
    fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
537
0
                   print_id(_state->fragment_instance_id()));
538
539
0
    auto* cur_blocked_dep = _blocked_dep;
540
0
    auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
541
0
    fmt::format_to(debug_string_buffer,
542
0
                   "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
543
0
                   "{}, elapse time = {}s, _wake_up_early = {}], block dependency = {}, is "
544
0
                   "running = {}\noperators: ",
545
0
                   (void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
546
0
                   _wake_up_early.load(),
547
0
                   cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
548
0
                   is_running());
549
0
    for (size_t i = 0; i < _operators.size(); i++) {
550
0
        fmt::format_to(debug_string_buffer, "\n{}",
551
0
                       _opened && !_finalized ? _operators[i]->debug_string(_state, i)
552
0
                                              : _operators[i]->debug_string(i));
553
0
    }
554
0
    fmt::format_to(debug_string_buffer, "\n{}\n",
555
0
                   _opened && !_finalized ? _sink->debug_string(_state, _operators.size())
556
0
                                          : _sink->debug_string(_operators.size()));
557
0
    if (_finalized) {
558
0
        return fmt::to_string(debug_string_buffer);
559
0
    }
560
561
0
    size_t i = 0;
562
0
    for (; i < _read_dependencies.size(); i++) {
563
0
        for (size_t j = 0; j < _read_dependencies[i].size(); j++) {
564
0
            fmt::format_to(debug_string_buffer, "{}. {}\n", i,
565
0
                           _read_dependencies[i][j]->debug_string(i + 1));
566
0
        }
567
0
    }
568
569
0
    fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
570
0
    for (size_t j = 0; j < _write_dependencies.size(); j++, i++) {
571
0
        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
572
0
                       _write_dependencies[j]->debug_string(i + 1));
573
0
    }
574
575
0
    fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency Information: \n");
576
0
    for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) {
577
0
        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
578
0
                       _filter_dependencies[j]->debug_string(i + 1));
579
0
    }
580
581
0
    fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
582
0
    for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
583
0
        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
584
0
                       _finish_dependencies[j]->debug_string(j + 1));
585
0
    }
586
0
    return fmt::to_string(debug_string_buffer);
587
0
}
588
589
0
void PipelineTask::wake_up() {
590
    // call by dependency
591
0
    static_cast<void>(get_task_queue()->push_back(this));
592
0
}
593
594
0
QueryContext* PipelineTask::query_context() {
595
0
    return _fragment_context->get_query_ctx();
596
0
}
597
} // namespace doris::pipeline