Coverage Report

Created: 2025-03-13 11:28

/root/doris/be/src/pipeline/pipeline_task.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstdint>
21
#include <memory>
22
#include <string>
23
#include <string_view>
24
25
#include "common/config.h"
26
#include "common/status.h"
27
#include "exec/operator.h"
28
#include "pipeline.h"
29
#include "runtime/workload_group/workload_group.h"
30
#include "util/runtime_profile.h"
31
#include "util/stopwatch.hpp"
32
#include "vec/core/block.h"
33
34
namespace doris {
35
class QueryContext;
36
class RuntimeState;
37
namespace pipeline {
38
class PipelineFragmentContext;
39
} // namespace pipeline
40
} // namespace doris
41
42
namespace doris::pipeline {
43
44
/**
45
 * PipelineTaskState indicates all possible states of a pipeline task.
46
 * A FSM is described as below:
47
 *
48
 *                 |-----------------------------------------------------|
49
 *                 |---|                  transfer 2    transfer 3       |   transfer 4
50
 *                     |-------> BLOCKED ------------|                   |---------------------------------------> CANCELED
51
 *              |------|                             |                   | transfer 5           transfer 6|
52
 * NOT_READY ---| transfer 0                         |-----> RUNNABLE ---|---------> PENDING_FINISH ------|
53
 *              |                                    |          ^        |                      transfer 7|
54
 *              |------------------------------------|          |--------|---------------------------------------> FINISHED
55
 *                transfer 1                                   transfer 9          transfer 8
56
 * BLOCKED include BLOCKED_FOR_DEPENDENCY, BLOCKED_FOR_SOURCE and BLOCKED_FOR_SINK.
57
 *
58
 * transfer 0 (NOT_READY -> BLOCKED): this pipeline task has some incomplete dependencies
59
 * transfer 1 (NOT_READY -> RUNNABLE): this pipeline task has no incomplete dependencies
60
 * transfer 2 (BLOCKED -> RUNNABLE): runnable condition for this pipeline task is met (e.g. get a new block from rpc)
61
 * transfer 3 (RUNNABLE -> BLOCKED): runnable condition for this pipeline task is not met (e.g. sink operator send a block by RPC and wait for a response)
62
 * transfer 4 (RUNNABLE -> CANCELED): current fragment is cancelled
63
 * transfer 5 (RUNNABLE -> PENDING_FINISH): this pipeline task completed but wait for releasing resources hold by itself
64
 * transfer 6 (PENDING_FINISH -> CANCELED): current fragment is cancelled
65
 * transfer 7 (PENDING_FINISH -> FINISHED): this pipeline task completed and resources hold by itself have been released already
66
 * transfer 8 (RUNNABLE -> FINISHED): this pipeline task completed and no resource need to be released
67
 * transfer 9 (RUNNABLE -> RUNNABLE): this pipeline task yields CPU and re-enters the runnable queue if it is runnable and has occupied CPU for a max time slice
68
 */
69
enum class PipelineTaskState : uint8_t {
70
    NOT_READY = 0, // do not prepare
71
    BLOCKED_FOR_DEPENDENCY = 1,
72
    BLOCKED_FOR_SOURCE = 2,
73
    BLOCKED_FOR_SINK = 3,
74
    RUNNABLE = 4, // can execute
75
    PENDING_FINISH =
76
            5, // compute task is over, but still hold resource. like some scan and sink task
77
    FINISHED = 6,
78
    CANCELED = 7,
79
    BLOCKED_FOR_RF = 8,
80
};
81
82
0
inline const char* get_state_name(PipelineTaskState idx) {
83
0
    switch (idx) {
84
0
    case PipelineTaskState::NOT_READY:
85
0
        return "NOT_READY";
86
0
    case PipelineTaskState::BLOCKED_FOR_DEPENDENCY:
87
0
        return "BLOCKED_FOR_DEPENDENCY";
88
0
    case PipelineTaskState::BLOCKED_FOR_SOURCE:
89
0
        return "BLOCKED_FOR_SOURCE";
90
0
    case PipelineTaskState::BLOCKED_FOR_SINK:
91
0
        return "BLOCKED_FOR_SINK";
92
0
    case PipelineTaskState::RUNNABLE:
93
0
        return "RUNNABLE";
94
0
    case PipelineTaskState::PENDING_FINISH:
95
0
        return "PENDING_FINISH";
96
0
    case PipelineTaskState::FINISHED:
97
0
        return "FINISHED";
98
0
    case PipelineTaskState::CANCELED:
99
0
        return "CANCELED";
100
0
    case PipelineTaskState::BLOCKED_FOR_RF:
101
0
        return "BLOCKED_FOR_RF";
102
0
    }
103
0
    LOG(FATAL) << "__builtin_unreachable";
104
0
    __builtin_unreachable();
105
0
}
106
107
0
inline bool is_final_state(PipelineTaskState idx) {
108
0
    switch (idx) {
109
0
    case PipelineTaskState::FINISHED:
110
0
    case PipelineTaskState::CANCELED:
111
0
        return true;
112
0
    default:
113
0
        return false;
114
0
    }
115
0
}
116
117
class TaskQueue;
118
class PriorityTaskQueue;
119
120
// The class do the pipeline task. Minest schdule union by task scheduler
121
class PipelineTask {
122
public:
123
    PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, OperatorPtr& sink,
124
                 PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile);
125
126
    PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
127
                 PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile);
128
0
    virtual ~PipelineTask() = default;
129
130
    virtual Status prepare(RuntimeState* state);
131
132
    virtual Status execute(bool* eos);
133
134
    // if the pipeline create a bunch of pipeline task
135
    // must be call after all pipeline task is finish to release resource
136
    virtual Status close(Status exec_status, bool close_sink = true);
137
138
0
    void put_in_runnable_queue() {
139
0
        _schedule_time++;
140
0
        _wait_worker_watcher.start();
141
0
    }
142
0
    void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
143
0
    PipelineTaskState get_state() const { return _cur_state; }
144
    void set_state(PipelineTaskState state);
145
146
0
    virtual bool is_pending_finish() {
147
0
        bool source_ret = _source->is_pending_finish();
148
0
        if (source_ret) {
149
0
            return true;
150
0
        } else {
151
0
            this->set_src_pending_finish_time();
152
0
        }
153
154
0
        bool sink_ret = _sink->is_pending_finish();
155
0
        if (sink_ret) {
156
0
            return true;
157
0
        } else {
158
0
            this->set_dst_pending_finish_time();
159
0
        }
160
0
        return false;
161
0
    }
162
163
0
    virtual bool source_can_read() { return _source->can_read() || _pipeline->_always_can_read; }
164
165
0
    virtual bool runtime_filters_are_ready_or_timeout() {
166
0
        return _source->runtime_filters_are_ready_or_timeout();
167
0
    }
168
169
0
    virtual bool sink_can_write() { return _sink->can_write() || _pipeline->_always_can_write; }
170
171
0
    virtual void finalize() {}
172
173
0
    PipelineFragmentContext* fragment_context() { return _fragment_context; }
174
175
    QueryContext* query_context();
176
177
0
    int get_previous_core_id() const {
178
0
        return _previous_schedule_id != -1 ? _previous_schedule_id
179
0
                                           : _pipeline->_previous_schedule_id;
180
0
    }
181
182
0
    void set_previous_core_id(int id) {
183
0
        if (id == _previous_schedule_id) {
184
0
            return;
185
0
        }
186
0
        if (_previous_schedule_id != -1) {
187
0
            COUNTER_UPDATE(_core_change_times, 1);
188
0
        }
189
0
        _previous_schedule_id = id;
190
0
    }
191
192
    virtual bool has_dependency();
193
194
0
    OperatorPtr get_root() { return _root; }
195
196
    virtual std::string debug_string();
197
198
    void set_task_queue(TaskQueue* task_queue);
199
0
    TaskQueue* get_task_queue() { return _task_queue; }
200
201
    static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
202
203
    // 1 used for update priority queue
204
    // note(wb) an ugly implementation, need refactor later
205
    // 1.1 pipeline task
206
0
    void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; }
207
0
    uint64_t get_runtime_ns() const { return this->_runtime; }
208
209
    // 1.2 priority queue's queue level
210
0
    void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
211
0
    int get_queue_level() const { return this->_queue_level; }
212
213
    // 1.3 priority queue's core id
214
0
    void set_core_id(int core_id) { this->_core_id = core_id; }
215
0
    int get_core_id() const { return this->_core_id; }
216
217
0
    void set_begin_execute_time() {
218
0
        if (!_is_first_time_to_execute) {
219
0
            _begin_execute_time = _pipeline_task_watcher.elapsed_time();
220
0
            _is_first_time_to_execute = true;
221
0
        }
222
0
    }
223
224
0
    void set_eos_time() {
225
0
        if (!_is_eos) {
226
0
            _eos_time = _pipeline_task_watcher.elapsed_time();
227
0
            _is_eos = true;
228
0
        }
229
0
    }
230
231
0
    void set_src_pending_finish_time() {
232
0
        if (!_is_src_pending_finish_over) {
233
0
            _src_pending_finish_over_time = _pipeline_task_watcher.elapsed_time();
234
0
            _is_src_pending_finish_over = true;
235
0
        }
236
0
    }
237
238
0
    void set_dst_pending_finish_time() {
239
0
        if (!_is_dst_pending_finish_over) {
240
0
            _dst_pending_finish_over_time = _pipeline_task_watcher.elapsed_time();
241
0
            _is_dst_pending_finish_over = true;
242
0
        }
243
0
    }
244
245
0
    virtual bool is_finished() const { return false; }
246
247
0
    virtual void set_close_pipeline_time() {
248
0
        if (!_is_close_pipeline) {
249
0
            _close_pipeline_time = _pipeline_task_watcher.elapsed_time();
250
0
            _is_close_pipeline = true;
251
0
            COUNTER_SET(_close_pipeline_timer, _close_pipeline_time);
252
0
        }
253
0
    }
254
255
0
    TUniqueId instance_id() const { return _state->fragment_instance_id(); }
256
257
0
    void set_parent_profile(RuntimeProfile* profile) { _parent_profile = profile; }
258
259
0
    virtual bool is_pipelineX() const { return false; }
260
261
0
    bool is_running() { return _running.load(); }
262
0
    void set_running(bool running) { _running = running; }
263
264
0
    bool is_exceed_debug_timeout() {
265
0
        if (_has_exceed_timeout) {
266
0
            return true;
267
0
        }
268
        // If enable_debug_log_timeout_secs <= 0, then disable the log
269
0
        if (_pipeline_task_watcher.elapsed_time() >
270
0
            config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) {
271
0
            _has_exceed_timeout = true;
272
0
            return true;
273
0
        }
274
0
        return false;
275
0
    }
276
277
0
    void log_detail_if_need() {
278
0
        if (config::enable_debug_log_timeout_secs < 1) {
279
0
            return;
280
0
        }
281
0
        if (is_exceed_debug_timeout()) {
282
0
            LOG(INFO) << "query id|instanceid " << print_id(_state->query_id()) << "|"
283
0
                      << print_id(_state->fragment_instance_id())
284
0
                      << " current pipeline exceed run time "
285
0
                      << config::enable_debug_log_timeout_secs << " seconds. Task state "
286
0
                      << get_state_name(get_state()) << "/n task detail:" << debug_string();
287
0
        }
288
0
    }
289
290
0
    RuntimeState* runtime_state() const { return _state; }
291
292
0
    std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }
293
294
0
    PipelineId pipeline_id() const { return _pipeline->id(); }
295
296
0
    virtual void clear_blocking_state() {}
297
0
    virtual void set_wake_up_early() {}
298
299
protected:
300
0
    void _finish_p_dependency() {
301
0
        for (const auto& p : _pipeline->_parents) {
302
0
            p.second.lock()->finish_one_dependency(p.first, _previous_schedule_id);
303
0
        }
304
0
    }
305
306
    virtual Status _open();
307
    virtual void _init_profile();
308
    virtual void _fresh_profile_counter();
309
310
    uint32_t _index;
311
    PipelinePtr _pipeline;
312
    bool _dependency_finish = false;
313
    bool _has_exceed_timeout = false;
314
    bool _prepared;
315
    bool _opened;
316
    RuntimeState* _state = nullptr;
317
    int _previous_schedule_id = -1;
318
    uint32_t _schedule_time = 0;
319
    PipelineTaskState _cur_state;
320
    SourceState _data_state;
321
    std::unique_ptr<doris::vectorized::Block> _block;
322
    PipelineFragmentContext* _fragment_context = nullptr;
323
    TaskQueue* _task_queue = nullptr;
324
325
    // used for priority queue
326
    // it may be visited by different thread but there is no race condition
327
    // so no need to add lock
328
    uint64_t _runtime = 0;
329
    // it's visited in one thread, so no need to thread synchronization
330
    // 1 get task, (set _queue_level/_core_id)
331
    // 2 exe task
332
    // 3 update task statistics(update _queue_level/_core_id)
333
    int _queue_level = 0;
334
    int _core_id = 0;
335
    Status _open_status = Status::OK();
336
337
    RuntimeProfile* _parent_profile = nullptr;
338
    std::unique_ptr<RuntimeProfile> _task_profile;
339
    RuntimeProfile::Counter* _task_cpu_timer = nullptr;
340
    RuntimeProfile::Counter* _prepare_timer = nullptr;
341
    RuntimeProfile::Counter* _open_timer = nullptr;
342
    RuntimeProfile::Counter* _exec_timer = nullptr;
343
    RuntimeProfile::Counter* _get_block_timer = nullptr;
344
    RuntimeProfile::Counter* _get_block_counter = nullptr;
345
    RuntimeProfile::Counter* _sink_timer = nullptr;
346
    RuntimeProfile::Counter* _close_timer = nullptr;
347
    RuntimeProfile::Counter* _block_counts = nullptr;
348
    RuntimeProfile::Counter* _block_by_source_counts = nullptr;
349
    RuntimeProfile::Counter* _block_by_sink_counts = nullptr;
350
    RuntimeProfile::Counter* _schedule_counts = nullptr;
351
    MonotonicStopWatch _wait_source_watcher;
352
    RuntimeProfile::Counter* _wait_source_timer = nullptr;
353
    MonotonicStopWatch _wait_bf_watcher;
354
    RuntimeProfile::Counter* _wait_bf_timer = nullptr;
355
    RuntimeProfile::Counter* _wait_bf_counts = nullptr;
356
    MonotonicStopWatch _wait_sink_watcher;
357
    RuntimeProfile::Counter* _wait_sink_timer = nullptr;
358
    MonotonicStopWatch _wait_worker_watcher;
359
    RuntimeProfile::Counter* _wait_worker_timer = nullptr;
360
    RuntimeProfile::Counter* _wait_dependency_counts = nullptr;
361
    RuntimeProfile::Counter* _pending_finish_counts = nullptr;
362
    // TODO we should calculate the time between when really runnable and runnable
363
    RuntimeProfile::Counter* _yield_counts = nullptr;
364
    RuntimeProfile::Counter* _core_change_times = nullptr;
365
366
    // The monotonic time of the entire lifecycle of the pipelinetask, almost synchronized with the pipfragmentctx
367
    // There are several important time points:
368
    // 1 first time pipelinetask to execute
369
    // 2 task eos
370
    // 3 src pending finish over
371
    // 4 dst pending finish over
372
    // 5 close pipeline time, we mark this beacause pending finish state may change
373
    MonotonicStopWatch _pipeline_task_watcher;
374
    // time 1
375
    bool _is_first_time_to_execute = false;
376
    RuntimeProfile::Counter* _begin_execute_timer = nullptr;
377
    int64_t _begin_execute_time = 0;
378
    // time 2
379
    bool _is_eos = false;
380
    RuntimeProfile::Counter* _eos_timer = nullptr;
381
    int64_t _eos_time = 0;
382
    //time 3
383
    bool _is_src_pending_finish_over = false;
384
    RuntimeProfile::Counter* _src_pending_finish_over_timer = nullptr;
385
    int64_t _src_pending_finish_over_time = 0;
386
    // time 4
387
    bool _is_dst_pending_finish_over = false;
388
    RuntimeProfile::Counter* _dst_pending_finish_over_timer = nullptr;
389
    int64_t _dst_pending_finish_over_time = 0;
390
    // time 5
391
    bool _is_close_pipeline = false;
392
    RuntimeProfile::Counter* _close_pipeline_timer = nullptr;
393
    int64_t _close_pipeline_time = 0;
394
395
    RuntimeProfile::Counter* _pip_task_total_timer = nullptr;
396
397
private:
398
    Operators _operators; // left is _source, right is _root
399
    OperatorPtr _source;
400
    OperatorPtr _root;
401
    OperatorPtr _sink;
402
403
    std::atomic<bool> _running {false};
404
};
405
} // namespace doris::pipeline