Coverage Report

Created: 2026-06-29 14:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_task.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstdint>
21
#include <memory>
22
#include <mutex>
23
#include <string>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "exec/operator/operator.h"
29
#include "exec/operator/spill_utils.h"
30
#include "exec/pipeline/dependency.h"
31
#include "exec/pipeline/pipeline.h"
32
#include "runtime/runtime_profile.h"
33
#include "util/stopwatch.hpp"
34
35
namespace doris {
36
class QueryContext;
37
class RuntimeState;
38
class PipelineFragmentContext;
39
} // namespace doris
40
41
namespace doris {
42
43
class MultiCoreTaskQueue;
44
class PriorityTaskQueue;
45
class Dependency;
46
47
class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
48
public:
49
    PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
50
                 std::shared_ptr<PipelineFragmentContext> fragment_context,
51
                 RuntimeProfile* parent_profile,
52
                 std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
53
                                         std::vector<std::shared_ptr<Dependency>>>>
54
                         shared_state_map,
55
                 int task_idx);
56
57
    virtual ~PipelineTask();
58
59
    Status prepare(const std::vector<TScanRangeParams>& scan_range, const int sender_id,
60
                   const TDataSink& tsink);
61
62
    virtual Status execute(bool* done);
63
64
    // if the pipeline create a bunch of pipeline task
65
    // must be call after all pipeline task is finish to release resource
66
    virtual Status close(Status exec_status, bool close_sink = true);
67
68
7.25M
    virtual std::weak_ptr<PipelineFragmentContext>& fragment_context() { return _fragment_context; }
69
70
14.4M
    int get_thread_id(int num_threads) const {
71
14.4M
        return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
72
14.4M
    }
73
74
7.26M
    virtual PipelineTask& set_thread_id(int thread_id) {
75
7.26M
        if (thread_id != _thread_id) {
76
3.93M
            COUNTER_UPDATE(_core_change_times, 1);
77
3.93M
            _thread_id = thread_id;
78
3.93M
        }
79
7.26M
        return *this;
80
7.26M
    }
81
82
    virtual Status finalize();
83
84
    std::string debug_string();
85
86
324k
    std::shared_ptr<BasicSharedState> get_source_shared_state() {
87
324k
        return _op_shared_states.contains(_source->operator_id())
88
324k
                       ? _op_shared_states[_source->operator_id()]
89
324k
                       : nullptr;
90
324k
    }
91
92
    /**
93
     * Pipeline task is blockable means it will be blocked in the next run. So we should put it into
94
     * the blocking task scheduler.
95
     */
96
    virtual bool is_blockable() const;
97
98
    /**
99
     * `shared_state` is shared by different pipeline tasks. This function aims to establish
100
     * connections across related tasks.
101
     *
102
     * There are 2 kinds of relationships to share state by tasks.
103
     * 1. For regular operators, for example, Aggregation, we use the AggSinkOperator to create a
104
     *    shared state and then inject it into downstream task which contains the corresponding
105
     *    AggSourceOperator.
106
     * 2. For multiple-sink-single-source operator, for example, Set operations, the shared state is
107
     *    created once and shared by multiple sink operators and single source operator. For this
108
     *    case, we use the first sink operator create shared state and then inject into all of other
109
     *    tasks.
110
     */
111
    bool inject_shared_state(std::shared_ptr<BasicSharedState> shared_state);
112
113
2.76M
    std::shared_ptr<BasicSharedState> get_sink_shared_state() { return _sink_shared_state; }
114
115
2.40M
    BasicSharedState* get_op_shared_state(int id) {
116
2.40M
        if (!_op_shared_states.contains(id)) {
117
1.91M
            return nullptr;
118
1.91M
        }
119
485k
        return _op_shared_states[id].get();
120
2.40M
    }
121
122
    void wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* dep_lock */);
123
124
0
    DataSinkOperatorPtr sink() const { return _sink; }
125
126
0
    int task_id() const { return _index; };
127
8.19M
    virtual bool is_finalized() const { return _exec_state == State::FINALIZED; }
128
129
813k
    void set_wake_up_early(PipelineId wake_by = -1) {
130
813k
        _wake_up_early = true;
131
813k
        _wake_by = wake_by;
132
813k
    }
133
134
    // Unblock all dependencies so this task can never be blocked again.
135
    // This is called when the task is woken up early or the fragment is canceled.
136
    //
137
    // NOTE: This does NOT call operator-level terminate() — operator terminate must run
138
    // inside execute() on the worker thread because operator state is not thread-safe.
139
    void unblock_all_dependencies();
140
141
    // 1 used for update priority queue
142
    // note(wb) an ugly implementation, need refactor later
143
    // 1.1 pipeline task
144
7.25M
    void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; }
145
8.02M
    uint64_t get_runtime_ns() const { return this->_runtime; }
146
147
    // 1.2 priority queue's queue level
148
8.02M
    void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
149
7.25M
    int get_queue_level() const { return this->_queue_level; }
150
151
8.01M
    void put_in_runnable_queue() {
152
8.01M
        _schedule_time++;
153
8.01M
        _wait_worker_watcher.start();
154
8.01M
    }
155
156
8.02M
    void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
157
158
19.8k
    bool is_running() { return _running.load(); }
159
15.2M
    virtual bool set_running(bool running) {
160
15.2M
        bool old_value = !running;
161
15.2M
        _running.compare_exchange_weak(old_value, running);
162
15.2M
        return old_value;
163
15.2M
    }
164
165
3.11M
    virtual RuntimeState* runtime_state() const { return _state; }
166
167
0
    virtual std::string task_name() const {
168
0
        return fmt::format("task{}({})", _index, _pipeline->_name);
169
0
    }
170
171
    [[nodiscard]] Status do_revoke_memory(const std::shared_ptr<SpillContext>& spill_context);
172
173
    // TODO: Maybe we do not need this safe code anymore
174
    void stop_if_finished();
175
176
1.96M
    virtual PipelineId pipeline_id() const { return _pipeline->id(); }
177
    [[nodiscard]] size_t get_revocable_size() const;
178
    [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& spill_context);
179
180
5.19M
    Status blocked(Dependency* dependency, std::unique_lock<std::mutex>& /* dep_lock */) {
181
5.19M
        DCHECK_EQ(_blocked_dep, nullptr) << "task: " << debug_string();
182
5.19M
        _blocked_dep = dependency;
183
5.19M
        return _state_transition(PipelineTask::State::BLOCKED);
184
5.19M
    }
185
186
protected:
187
    // Only used for RevokableTask
188
1
    PipelineTask() : _index(0) {}
189
190
private:
191
    friend class HybridTaskScheduler;
192
193
    void _stop_accepting_submit();
194
195
    // Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters)
196
    bool _wait_to_start();
197
    // Whether this task is blocked during execution (read dependency, write dependency)
198
    bool _is_blocked();
199
    // Whether this task is blocked after execution (pending finish dependency)
200
    bool _is_pending_finish();
201
202
    Status _extract_dependencies();
203
    void _init_profile();
204
    void _fresh_profile_counter();
205
    Status _open();
206
    Status _prepare();
207
208
    // Operator `op` try to reserve memory before executing. Return false if reserve failed
209
    // otherwise return true.
210
    bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op);
211
    bool _should_trigger_revoking(const size_t reserve_size) const;
212
    size_t _get_revocable_size() const;
213
214
    const TUniqueId _query_id;
215
    const uint32_t _index;
216
    PipelinePtr _pipeline;
217
    bool _opened;
218
    RuntimeState* _state = nullptr;
219
    int _thread_id = -1;
220
    uint32_t _schedule_time = 0;
221
    std::unique_ptr<Block> _block;
222
223
    std::weak_ptr<PipelineFragmentContext> _fragment_context;
224
225
    // used for priority queue
226
    // it may be visited by different thread but there is no race condition
227
    // so no need to add lock
228
    uint64_t _runtime = 0;
229
    // it's visited in one thread, so no need to thread synchronization
230
    // 1 get task, (set _queue_level/_core_id)
231
    // 2 exe task
232
    // 3 update task statistics(update _queue_level/_core_id)
233
    int _queue_level = 0;
234
235
    RuntimeProfile* _parent_profile = nullptr;
236
    std::unique_ptr<RuntimeProfile> _task_profile;
237
    RuntimeProfile::Counter* _task_cpu_timer = nullptr;
238
    RuntimeProfile::Counter* _prepare_timer = nullptr;
239
    RuntimeProfile::Counter* _open_timer = nullptr;
240
    RuntimeProfile::Counter* _exec_timer = nullptr;
241
    RuntimeProfile::Counter* _get_block_timer = nullptr;
242
    RuntimeProfile::Counter* _get_block_counter = nullptr;
243
    RuntimeProfile::Counter* _sink_timer = nullptr;
244
    RuntimeProfile::Counter* _close_timer = nullptr;
245
    RuntimeProfile::Counter* _schedule_counts = nullptr;
246
    MonotonicStopWatch _wait_worker_watcher;
247
    RuntimeProfile::Counter* _wait_worker_timer = nullptr;
248
    // TODO we should calculate the time between when really runnable and runnable
249
    RuntimeProfile::Counter* _yield_counts = nullptr;
250
    RuntimeProfile::Counter* _core_change_times = nullptr;
251
    RuntimeProfile::Counter* _memory_reserve_times = nullptr;
252
    RuntimeProfile::Counter* _memory_reserve_failed_times = nullptr;
253
254
    Operators _operators; // left is _source, right is _root
255
    OperatorXBase* _source;
256
    OperatorXBase* _root;
257
    DataSinkOperatorPtr _sink;
258
259
    // `_read_dependencies` is stored as same order as `_operators`
260
    std::vector<std::vector<Dependency*>> _read_dependencies;
261
    std::vector<Dependency*> _write_dependencies;
262
    std::vector<Dependency*> _finish_dependencies;
263
    std::vector<Dependency*> _execution_dependencies;
264
265
    // All shared states of this pipeline task.
266
    std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
267
    std::shared_ptr<BasicSharedState> _sink_shared_state;
268
    std::vector<TScanRangeParams> _scan_ranges;
269
    std::map<int,
270
             std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
271
            _shared_state_map;
272
    int _task_idx;
273
    bool _dry_run = false;
274
    MOCK_REMOVE(const)
275
    unsigned long long _exec_time_slice = config::pipeline_task_exec_time_slice * NANOS_PER_MILLIS;
276
    Dependency* _blocked_dep = nullptr;
277
278
    Dependency* _memory_sufficient_dependency;
279
    // Protects dependency containers and the raw Dependency pointers they contain. It also
280
    // serializes forced dependency unblocking with close()/finalize(): set_ready() may synchronously
281
    // call wake_up() and submit this task, so close()/finalize() must not clear operator/shared
282
    // state until forced unblocking finishes. wake_up() must not take this lock.
283
    std::mutex _dependency_lifecycle_lock;
284
    // Guards _accept_submit and keeps HybridTaskScheduler::submit() from reading _sink/_operators
285
    // in is_blockable() while terminal close/finalize is closing the submit gate.
286
    std::mutex _blockable_check_lock;
287
    bool _accept_submit = true;
288
289
    std::atomic<bool> _running {false};
290
    std::atomic<bool> _eos {false};
291
    std::atomic<bool> _wake_up_early {false};
292
    // PipelineTask maybe hold by TaskQueue
293
    std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
294
295
    /**
296
         * Normal state machine:
297
         *
298
         * INITED -----> RUNNABLE -------------------------+----> FINISHED ---+---> FINALIZED
299
         *                   ^                             |                  |
300
         *                   |                             |                  |
301
         *                   +----------- BLOCKED <--------+------------------+
302
         *
303
         * When _wake_up_early is set by make_all_runnable(), additional transitions are allowed:
304
         *   BLOCKED    → FINISHED : task skips RUNNABLE, terminates directly
305
         *   FINISHED   → RUNNABLE : delayed wake_up() arrives after task already finished,
306
         *                           legal but no-op (state stays FINISHED)
307
         *   FINALIZED  → RUNNABLE : same as above but task already finalized,
308
         *                           legal but no-op (state stays FINALIZED)
309
         */
310
    enum class State : int {
311
        INITED,
312
        RUNNABLE,
313
        BLOCKED,
314
        FINISHED,
315
        FINALIZED,
316
    };
317
    const std::vector<std::set<State>> LEGAL_STATE_TRANSITION = {
318
            {},                                               // Target state is INITED
319
            {State::INITED, State::RUNNABLE, State::BLOCKED}, // Target state is RUNNABLE
320
            {State::RUNNABLE, State::FINISHED},               // Target state is BLOCKED
321
            {State::RUNNABLE},                                // Target state is FINISHED
322
            {State::INITED, State::FINISHED}};                // Target state is FINALIZED
323
324
    // Extended table used when _wake_up_early is true.
325
    const std::vector<std::set<State>> WAKE_UP_EARLY_LEGAL_STATE_TRANSITION = {
326
            {}, // INITED
327
            {State::INITED, State::RUNNABLE, State::BLOCKED, State::FINISHED,
328
             State::FINALIZED},                 // RUNNABLE (+ FINISHED, FINALIZED)
329
            {State::RUNNABLE, State::FINISHED}, // BLOCKED
330
            {State::RUNNABLE, State::BLOCKED},  // FINISHED (+ BLOCKED)
331
            {State::INITED, State::FINISHED}};  // FINALIZED
332
333
16.3M
    std::string _to_string(State state) const {
334
16.3M
        switch (state) {
335
17
        case State::INITED:
336
17
            return "INITED";
337
7.19M
        case State::RUNNABLE:
338
7.19M
            return "RUNNABLE";
339
5.20M
        case State::BLOCKED:
340
5.20M
            return "BLOCKED";
341
1.96M
        case State::FINISHED:
342
1.96M
            return "FINISHED";
343
1.96M
        case State::FINALIZED:
344
1.96M
            return "FINALIZED";
345
0
        default:
346
0
            __builtin_unreachable();
347
16.3M
        }
348
16.3M
    }
349
350
    Status _state_transition(State new_state);
351
    std::atomic<State> _exec_state = State::INITED;
352
    MonotonicStopWatch _state_change_watcher;
353
    std::atomic<bool> _spilling = false;
354
    const std::string _pipeline_name;
355
    std::atomic<int> _wake_by = -1;
356
};
357
358
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
359
360
} // namespace doris