Coverage Report

Created: 2025-04-14 14:25

/root/doris/be/src/pipeline/pipeline_task.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstdint>
21
#include <memory>
22
#include <string>
23
#include <vector>
24
25
#include "common/status.h"
26
#include "pipeline/dependency.h"
27
#include "pipeline/exec/operator.h"
28
#include "pipeline/pipeline.h"
29
#include "util/runtime_profile.h"
30
#include "util/stopwatch.hpp"
31
#include "vec/core/block.h"
32
33
namespace doris {
34
class QueryContext;
35
class RuntimeState;
36
namespace pipeline {
37
class PipelineFragmentContext;
38
} // namespace pipeline
39
} // namespace doris
40
41
namespace doris::pipeline {
42
43
class MultiCoreTaskQueue;
44
class PriorityTaskQueue;
45
class Dependency;
46
47
class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
48
public:
49
    PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
50
                 std::shared_ptr<PipelineFragmentContext> fragment_context,
51
                 RuntimeProfile* parent_profile,
52
                 std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
53
                                         std::vector<std::shared_ptr<Dependency>>>>
54
                         shared_state_map,
55
                 int task_idx);
56
57
    Status prepare(const std::vector<TScanRangeParams>& scan_range, const int sender_id,
58
                   const TDataSink& tsink);
59
60
    Status execute(bool* done);
61
62
    // if the pipeline create a bunch of pipeline task
63
    // must be call after all pipeline task is finish to release resource
64
    Status close(Status exec_status, bool close_sink = true);
65
66
0
    std::weak_ptr<PipelineFragmentContext>& fragment_context() { return _fragment_context; }
67
68
65
    int get_core_id() const { return _core_id; }
69
70
0
    void set_core_id(int id) {
71
0
        if (id != _core_id) {
72
0
            if (_core_id != -1) {
73
0
                COUNTER_UPDATE(_core_change_times, 1);
74
0
            }
75
0
            _core_id = id;
76
0
        }
77
0
    }
78
79
    Status finalize();
80
81
    std::string debug_string();
82
83
0
    std::shared_ptr<BasicSharedState> get_source_shared_state() {
84
0
        return _op_shared_states.contains(_source->operator_id())
85
0
                       ? _op_shared_states[_source->operator_id()]
86
0
                       : nullptr;
87
0
    }
88
89
2
    void inject_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
90
2
        if (!shared_state) {
91
0
            return;
92
0
        }
93
        // Shared state is created by upstream task's sink operator and shared by source operator of this task.
94
4
        for (auto& op : _operators) {
95
4
            if (shared_state->related_op_ids.contains(op->operator_id())) {
96
2
                _op_shared_states.insert({op->operator_id(), shared_state});
97
2
                return;
98
2
            }
99
4
        }
100
0
        if (shared_state->related_op_ids.contains(_sink->dests_id().front())) {
101
0
            DCHECK_EQ(_sink_shared_state, nullptr)
102
0
                    << " Sink: " << _sink->get_name() << " dest id: " << _sink->dests_id().front();
103
0
            _sink_shared_state = shared_state;
104
0
        }
105
0
    }
106
107
16
    std::shared_ptr<BasicSharedState> get_sink_shared_state() { return _sink_shared_state; }
108
109
16
    BasicSharedState* get_op_shared_state(int id) {
110
16
        if (!_op_shared_states.contains(id)) {
111
14
            return nullptr;
112
14
        }
113
2
        return _op_shared_states[id].get();
114
16
    }
115
116
    Status wake_up(Dependency* dep);
117
118
0
    DataSinkOperatorPtr sink() const { return _sink; }
119
120
0
    int task_id() const { return _index; };
121
59.5k
    bool is_finalized() const { return _exec_state == State::FINALIZED; }
122
123
4
    void set_wake_up_early() { _wake_up_early = true; }
124
125
    // Execution phase should be terminated. This is called if this task is canceled or waken up early.
126
    void terminate();
127
128
11
    void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; }
129
23
    MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
130
131
#ifdef BE_TEST
132
    unsigned long long THREAD_TIME_SLICE = 100'000'000ULL;
133
#else
134
    static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
135
#endif
136
137
    // 1 used for update priority queue
138
    // note(wb) an ugly implementation, need refactor later
139
    // 1.1 pipeline task
140
0
    void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; }
141
23
    uint64_t get_runtime_ns() const { return this->_runtime; }
142
143
    // 1.2 priority queue's queue level
144
10
    void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
145
0
    int get_queue_level() const { return this->_queue_level; }
146
147
23
    void put_in_runnable_queue() {
148
23
        _schedule_time++;
149
23
        _wait_worker_watcher.start();
150
23
    }
151
152
10
    void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
153
154
19.8k
    bool is_running() { return _running.load(); }
155
0
    bool is_revoking() {
156
0
        for (auto* dep : _spill_dependencies) {
157
0
            if (dep->is_blocked_by()) {
158
0
                return true;
159
0
            }
160
0
        }
161
0
        return false;
162
0
    }
163
0
    bool set_running(bool running) { return _running.exchange(running); }
164
165
0
    bool is_exceed_debug_timeout() {
166
0
        if (_has_exceed_timeout) {
167
0
            return true;
168
0
        }
169
        // If enable_debug_log_timeout_secs <= 0, then disable the log
170
0
        if (_pipeline_task_watcher.elapsed_time() >
171
0
            config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) {
172
0
            _has_exceed_timeout = true;
173
0
            return true;
174
0
        }
175
0
        return false;
176
0
    }
177
178
0
    void log_detail_if_need() {
179
0
        if (config::enable_debug_log_timeout_secs < 1) {
180
0
            return;
181
0
        }
182
0
        if (is_exceed_debug_timeout()) {
183
0
            LOG(INFO) << "query id|instanceid " << print_id(_state->query_id()) << "|"
184
0
                      << print_id(_state->fragment_instance_id())
185
0
                      << " current pipeline exceed run time "
186
0
                      << config::enable_debug_log_timeout_secs << " seconds. "
187
0
                      << "/n task detail:" << debug_string();
188
0
        }
189
0
    }
190
191
0
    RuntimeState* runtime_state() const { return _state; }
192
193
0
    std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }
194
195
    // TODO: Maybe we do not need this safe code anymore
196
    void stop_if_finished();
197
198
0
    PipelineId pipeline_id() const { return _pipeline->id(); }
199
    [[nodiscard]] size_t get_revocable_size() const;
200
    [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& spill_context);
201
202
22
    Status blocked(Dependency* dependency) {
203
22
        DCHECK_EQ(_blocked_dep, nullptr) << "task: " << debug_string();
204
22
        _blocked_dep = dependency;
205
22
        return _state_transition(PipelineTask::State::BLOCKED);
206
22
    }
207
208
private:
209
    // Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters)
210
    bool _wait_to_start();
211
    // Whether this task is blocked during execution (read dependency, write dependency)
212
    bool _is_blocked();
213
    // Whether this task is blocked after execution (pending finish dependency)
214
    bool _is_pending_finish();
215
216
    Status _extract_dependencies();
217
    void _init_profile();
218
    void _fresh_profile_counter();
219
    Status _open();
220
221
    const TUniqueId _query_id;
222
    const uint32_t _index;
223
    PipelinePtr _pipeline;
224
    bool _has_exceed_timeout = false;
225
    bool _opened;
226
    RuntimeState* _state = nullptr;
227
    int _core_id = -1;
228
    uint32_t _schedule_time = 0;
229
    std::unique_ptr<vectorized::Block> _block;
230
231
    std::weak_ptr<PipelineFragmentContext> _fragment_context;
232
    MultiCoreTaskQueue* _task_queue = nullptr;
233
234
    // used for priority queue
235
    // it may be visited by different thread but there is no race condition
236
    // so no need to add lock
237
    uint64_t _runtime = 0;
238
    // it's visited in one thread, so no need to thread synchronization
239
    // 1 get task, (set _queue_level/_core_id)
240
    // 2 exe task
241
    // 3 update task statistics(update _queue_level/_core_id)
242
    int _queue_level = 0;
243
244
    RuntimeProfile* _parent_profile = nullptr;
245
    std::unique_ptr<RuntimeProfile> _task_profile;
246
    RuntimeProfile::Counter* _task_cpu_timer = nullptr;
247
    RuntimeProfile::Counter* _prepare_timer = nullptr;
248
    RuntimeProfile::Counter* _open_timer = nullptr;
249
    RuntimeProfile::Counter* _exec_timer = nullptr;
250
    RuntimeProfile::Counter* _get_block_timer = nullptr;
251
    RuntimeProfile::Counter* _get_block_counter = nullptr;
252
    RuntimeProfile::Counter* _sink_timer = nullptr;
253
    RuntimeProfile::Counter* _close_timer = nullptr;
254
    RuntimeProfile::Counter* _schedule_counts = nullptr;
255
    MonotonicStopWatch _wait_worker_watcher;
256
    RuntimeProfile::Counter* _wait_worker_timer = nullptr;
257
    // TODO we should calculate the time between when really runnable and runnable
258
    RuntimeProfile::Counter* _yield_counts = nullptr;
259
    RuntimeProfile::Counter* _core_change_times = nullptr;
260
    RuntimeProfile::Counter* _memory_reserve_times = nullptr;
261
    RuntimeProfile::Counter* _memory_reserve_failed_times = nullptr;
262
263
    MonotonicStopWatch _pipeline_task_watcher;
264
265
    Operators _operators; // left is _source, right is _root
266
    OperatorXBase* _source;
267
    OperatorXBase* _root;
268
    DataSinkOperatorPtr _sink;
269
270
    // `_read_dependencies` is stored as same order as `_operators`
271
    std::vector<std::vector<Dependency*>> _read_dependencies;
272
    std::vector<Dependency*> _spill_dependencies;
273
    std::vector<Dependency*> _write_dependencies;
274
    std::vector<Dependency*> _finish_dependencies;
275
    std::vector<Dependency*> _filter_dependencies;
276
277
    // All shared states of this pipeline task.
278
    std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
279
    std::shared_ptr<BasicSharedState> _sink_shared_state;
280
    std::vector<TScanRangeParams> _scan_ranges;
281
    std::map<int,
282
             std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
283
            _shared_state_map;
284
    int _task_idx;
285
    bool _dry_run = false;
286
287
    Dependency* _blocked_dep = nullptr;
288
289
    Dependency* _execution_dep = nullptr;
290
    Dependency* _memory_sufficient_dependency;
291
    std::mutex _dependency_lock;
292
293
    std::atomic<bool> _running {false};
294
    std::atomic<bool> _eos {false};
295
    std::atomic<bool> _wake_up_early {false};
296
297
    /**
298
         *
299
         * INITED -----> RUNNABLE -------------------------+----> FINISHED ---+---> FINALIZED
300
         *                   ^                             |                  |
301
         *                   |                             |                  |
302
         *                   +----------- BLOCKED <--------+------------------+
303
         */
304
    enum class State : int {
305
        INITED,
306
        RUNNABLE,
307
        BLOCKED,
308
        FINISHED,
309
        FINALIZED,
310
    };
311
    const std::vector<std::set<State>> LEGAL_STATE_TRANSITION = {
312
            {},                                               // Target state is INITED
313
            {State::INITED, State::RUNNABLE, State::BLOCKED}, // Target state is RUNNABLE
314
            {State::RUNNABLE, State::FINISHED},               // Target state is BLOCKED
315
            {State::RUNNABLE},                                // Target state is FINISHED
316
            {State::INITED, State::FINISHED}};                // Target state is FINALIZED
317
318
19.9k
    std::string _to_string(State state) const {
319
19.9k
        switch (state) {
320
17
        case State::INITED:
321
17
            return "INITED";
322
19.8k
        case State::RUNNABLE:
323
19.8k
            return "RUNNABLE";
324
38
        case State::BLOCKED:
325
38
            return "BLOCKED";
326
21
        case State::FINISHED:
327
21
            return "FINISHED";
328
19
        case State::FINALIZED:
329
19
            return "FINALIZED";
330
0
        default:
331
0
            __builtin_unreachable();
332
19.9k
        }
333
19.9k
    }
334
335
    Status _state_transition(State new_state);
336
    std::atomic<State> _exec_state = State::INITED;
337
    MonotonicStopWatch _state_change_watcher;
338
    std::atomic<bool> _spilling = false;
339
    const std::string _pipeline_name;
340
};
341
342
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
343
344
} // namespace doris::pipeline