Coverage Report

Created: 2025-10-31 19:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/pipeline/pipeline_task.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstdint>
21
#include <memory>
22
#include <string>
23
#include <vector>
24
25
#include "common/status.h"
26
#include "pipeline/dependency.h"
27
#include "pipeline/exec/operator.h"
28
#include "pipeline/exec/spill_utils.h"
29
#include "pipeline/pipeline.h"
30
#include "util/runtime_profile.h"
31
#include "util/stopwatch.hpp"
32
#include "vec/core/block.h"
33
34
namespace doris {
35
class QueryContext;
36
class RuntimeState;
37
namespace pipeline {
38
class PipelineFragmentContext;
39
} // namespace pipeline
40
} // namespace doris
41
42
namespace doris::pipeline {
43
44
class MultiCoreTaskQueue;
45
class PriorityTaskQueue;
46
class Dependency;
47
48
class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
49
public:
50
    PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
51
                 std::shared_ptr<PipelineFragmentContext> fragment_context,
52
                 RuntimeProfile* parent_profile,
53
                 std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
54
                                         std::vector<std::shared_ptr<Dependency>>>>
55
                         shared_state_map,
56
                 int task_idx);
57
58
    virtual ~PipelineTask();
59
60
    Status prepare(const std::vector<TScanRangeParams>& scan_range, const int sender_id,
61
                   const TDataSink& tsink);
62
63
    virtual Status execute(bool* done);
64
65
    // if the pipeline create a bunch of pipeline task
66
    // must be call after all pipeline task is finish to release resource
67
    virtual Status close(Status exec_status, bool close_sink = true);
68
69
0
    virtual std::weak_ptr<PipelineFragmentContext>& fragment_context() { return _fragment_context; }
70
71
26
    int get_thread_id(int num_threads) const {
72
26
        return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
73
26
    }
74
75
0
    virtual PipelineTask& set_thread_id(int thread_id) {
76
0
        _thread_id = thread_id;
77
0
        if (thread_id != _thread_id) {
78
0
            COUNTER_UPDATE(_core_change_times, 1);
79
0
        }
80
0
        return *this;
81
0
    }
82
83
    virtual Status finalize();
84
85
    std::string debug_string();
86
87
0
    std::shared_ptr<BasicSharedState> get_source_shared_state() {
88
0
        return _op_shared_states.contains(_source->operator_id())
89
0
                       ? _op_shared_states[_source->operator_id()]
90
0
                       : nullptr;
91
0
    }
92
93
    /**
94
     * Pipeline task is blockable means it will be blocked in the next run. So we should put it into
95
     * the blocking task scheduler.
96
     */
97
    virtual bool is_blockable() const;
98
99
    /**
100
     * `shared_state` is shared by different pipeline tasks. This function aims to establish
101
     * connections across related tasks.
102
     *
103
     * There are 2 kinds of relationships to share state by tasks.
104
     * 1. For regular operators, for example, Aggregation, we use the AggSinkOperator to create a
105
     *    shared state and then inject it into downstream task which contains the corresponding
106
     *    AggSourceOperator.
107
     * 2. For multiple-sink-single-source operator, for example, Set operations, the shared state is
108
     *    created once and shared by multiple sink operators and single source operator. For this
109
     *    case, we use the first sink operator create shared state and then inject into all of other
110
     *    tasks.
111
     */
112
    bool inject_shared_state(std::shared_ptr<BasicSharedState> shared_state);
113
114
17
    std::shared_ptr<BasicSharedState> get_sink_shared_state() { return _sink_shared_state; }
115
116
17
    BasicSharedState* get_op_shared_state(int id) {
117
17
        if (!_op_shared_states.contains(id)) {
118
15
            return nullptr;
119
15
        }
120
2
        return _op_shared_states[id].get();
121
17
    }
122
123
    Status wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* dep_lock */);
124
125
0
    DataSinkOperatorPtr sink() const { return _sink; }
126
127
0
    int task_id() const { return _index; };
128
33.3k
    virtual bool is_finalized() const { return _exec_state == State::FINALIZED; }
129
130
4
    void set_wake_up_early(PipelineId wake_by = -1) {
131
4
        _wake_up_early = true;
132
4
        _wake_by = wake_by;
133
4
    }
134
135
    // Execution phase should be terminated. This is called if this task is canceled or waken up early.
136
    void terminate();
137
138
    // 1 used for update priority queue
139
    // note(wb) an ugly implementation, need refactor later
140
    // 1.1 pipeline task
141
0
    void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; }
142
26
    uint64_t get_runtime_ns() const { return this->_runtime; }
143
144
    // 1.2 priority queue's queue level
145
10
    void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
146
0
    int get_queue_level() const { return this->_queue_level; }
147
148
26
    void put_in_runnable_queue() {
149
26
        _schedule_time++;
150
26
        _wait_worker_watcher.start();
151
26
    }
152
153
10
    void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
154
155
11.1k
    bool is_running() { return _running.load(); }
156
0
    virtual bool set_running(bool running) {
157
0
        bool old_value = !running;
158
0
        _running.compare_exchange_weak(old_value, running);
159
0
        return old_value;
160
0
    }
161
162
0
    virtual RuntimeState* runtime_state() const { return _state; }
163
164
0
    virtual std::string task_name() const {
165
0
        return fmt::format("task{}({})", _index, _pipeline->_name);
166
0
    }
167
168
    [[nodiscard]] Status do_revoke_memory(const std::shared_ptr<SpillContext>& spill_context);
169
170
    // TODO: Maybe we do not need this safe code anymore
171
    void stop_if_finished();
172
173
0
    virtual PipelineId pipeline_id() const { return _pipeline->id(); }
174
    [[nodiscard]] size_t get_revocable_size() const;
175
    [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& spill_context);
176
177
23
    Status blocked(Dependency* dependency, std::unique_lock<std::mutex>& /* dep_lock */) {
178
23
        DCHECK_EQ(_blocked_dep, nullptr) << "task: " << debug_string();
179
23
        _blocked_dep = dependency;
180
23
        return _state_transition(PipelineTask::State::BLOCKED);
181
23
    }
182
183
protected:
184
    // Only used for RevokableTask
185
0
    PipelineTask() : _index(0) {}
186
187
private:
188
    // Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters)
189
    bool _wait_to_start();
190
    // Whether this task is blocked during execution (read dependency, write dependency)
191
    bool _is_blocked();
192
    // Whether this task is blocked after execution (pending finish dependency)
193
    bool _is_pending_finish();
194
195
    Status _extract_dependencies();
196
    void _init_profile();
197
    void _fresh_profile_counter();
198
    Status _open();
199
    Status _prepare();
200
201
    // Operator `op` try to reserve memory before executing. Return false if reserve failed
202
    // otherwise return true.
203
    bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op);
204
205
    const TUniqueId _query_id;
206
    const uint32_t _index;
207
    PipelinePtr _pipeline;
208
    bool _opened;
209
    RuntimeState* _state = nullptr;
210
    int _thread_id = -1;
211
    uint32_t _schedule_time = 0;
212
    std::unique_ptr<vectorized::Block> _block;
213
214
    std::weak_ptr<PipelineFragmentContext> _fragment_context;
215
216
    // used for priority queue
217
    // it may be visited by different thread but there is no race condition
218
    // so no need to add lock
219
    uint64_t _runtime = 0;
220
    // it's visited in one thread, so no need to thread synchronization
221
    // 1 get task, (set _queue_level/_core_id)
222
    // 2 exe task
223
    // 3 update task statistics(update _queue_level/_core_id)
224
    int _queue_level = 0;
225
226
    RuntimeProfile* _parent_profile = nullptr;
227
    std::unique_ptr<RuntimeProfile> _task_profile;
228
    RuntimeProfile::Counter* _task_cpu_timer = nullptr;
229
    RuntimeProfile::Counter* _prepare_timer = nullptr;
230
    RuntimeProfile::Counter* _open_timer = nullptr;
231
    RuntimeProfile::Counter* _exec_timer = nullptr;
232
    RuntimeProfile::Counter* _get_block_timer = nullptr;
233
    RuntimeProfile::Counter* _get_block_counter = nullptr;
234
    RuntimeProfile::Counter* _sink_timer = nullptr;
235
    RuntimeProfile::Counter* _close_timer = nullptr;
236
    RuntimeProfile::Counter* _schedule_counts = nullptr;
237
    MonotonicStopWatch _wait_worker_watcher;
238
    RuntimeProfile::Counter* _wait_worker_timer = nullptr;
239
    // TODO we should calculate the time between when really runnable and runnable
240
    RuntimeProfile::Counter* _yield_counts = nullptr;
241
    RuntimeProfile::Counter* _core_change_times = nullptr;
242
    RuntimeProfile::Counter* _memory_reserve_times = nullptr;
243
    RuntimeProfile::Counter* _memory_reserve_failed_times = nullptr;
244
245
    Operators _operators; // left is _source, right is _root
246
    OperatorXBase* _source;
247
    OperatorXBase* _root;
248
    DataSinkOperatorPtr _sink;
249
250
    // `_read_dependencies` is stored as same order as `_operators`
251
    std::vector<std::vector<Dependency*>> _read_dependencies;
252
    std::vector<Dependency*> _write_dependencies;
253
    std::vector<Dependency*> _finish_dependencies;
254
    std::vector<Dependency*> _execution_dependencies;
255
256
    // All shared states of this pipeline task.
257
    std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
258
    std::shared_ptr<BasicSharedState> _sink_shared_state;
259
    std::vector<TScanRangeParams> _scan_ranges;
260
    std::map<int,
261
             std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
262
            _shared_state_map;
263
    int _task_idx;
264
    bool _dry_run = false;
265
    MOCK_REMOVE(const)
266
    unsigned long long _exec_time_slice = config::pipeline_task_exec_time_slice * NANOS_PER_MILLIS;
267
    Dependency* _blocked_dep = nullptr;
268
269
    Dependency* _memory_sufficient_dependency;
270
    std::mutex _dependency_lock;
271
272
    std::atomic<bool> _running {false};
273
    std::atomic<bool> _eos {false};
274
    std::atomic<bool> _wake_up_early {false};
275
    // PipelineTask maybe hold by TaskQueue
276
    std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
277
278
    /**
279
         *
280
         * INITED -----> RUNNABLE -------------------------+----> FINISHED ---+---> FINALIZED
281
         *                   ^                             |                  |
282
         *                   |                             |                  |
283
         *                   +----------- BLOCKED <--------+------------------+
284
         */
285
    enum class State : int {
286
        INITED,
287
        RUNNABLE,
288
        BLOCKED,
289
        FINISHED,
290
        FINALIZED,
291
    };
292
    const std::vector<std::set<State>> LEGAL_STATE_TRANSITION = {
293
            {},                                               // Target state is INITED
294
            {State::INITED, State::RUNNABLE, State::BLOCKED}, // Target state is RUNNABLE
295
            {State::RUNNABLE, State::FINISHED},               // Target state is BLOCKED
296
            {State::RUNNABLE},                                // Target state is FINISHED
297
            {State::INITED, State::FINISHED}};                // Target state is FINALIZED
298
299
11.2k
    std::string _to_string(State state) const {
300
11.2k
        switch (state) {
301
17
        case State::INITED:
302
17
            return "INITED";
303
11.1k
        case State::RUNNABLE:
304
11.1k
            return "RUNNABLE";
305
39
        case State::BLOCKED:
306
39
            return "BLOCKED";
307
21
        case State::FINISHED:
308
21
            return "FINISHED";
309
19
        case State::FINALIZED:
310
19
            return "FINALIZED";
311
0
        default:
312
0
            __builtin_unreachable();
313
11.2k
        }
314
11.2k
    }
315
316
    Status _state_transition(State new_state);
317
    std::atomic<State> _exec_state = State::INITED;
318
    MonotonicStopWatch _state_change_watcher;
319
    std::atomic<bool> _spilling = false;
320
    const std::string _pipeline_name;
321
    int _wake_by = -1;
322
};
323
324
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
325
326
} // namespace doris::pipeline