Coverage Report

Created: 2025-03-13 18:54

/root/doris/be/src/pipeline/pipeline_task.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstdint>
21
#include <memory>
22
#include <string>
23
#include <vector>
24
25
#include "common/status.h"
26
#include "pipeline/dependency.h"
27
#include "pipeline/exec/operator.h"
28
#include "pipeline/pipeline.h"
29
#include "util/runtime_profile.h"
30
#include "util/stopwatch.hpp"
31
#include "vec/core/block.h"
32
33
namespace doris {
34
class QueryContext;
35
class RuntimeState;
36
namespace pipeline {
37
class PipelineFragmentContext;
38
} // namespace pipeline
39
} // namespace doris
40
41
namespace doris::pipeline {
42
43
class MultiCoreTaskQueue;
44
class PriorityTaskQueue;
45
class Dependency;
46
47
class PipelineTask {
48
public:
49
    PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
50
                 PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
51
                 std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
52
                                         std::shared_ptr<Dependency>>>
53
                         le_state_map,
54
                 int task_idx);
55
56
    Status prepare(const std::vector<TScanRangeParams>& scan_range, const int sender_id,
57
                   const TDataSink& tsink, QueryContext* query_ctx);
58
59
    Status execute(bool* eos);
60
61
    // if the pipeline create a bunch of pipeline task
62
    // must be call after all pipeline task is finish to release resource
63
    Status close(Status exec_status, bool close_sink = true);
64
65
0
    PipelineFragmentContext* fragment_context() { return _fragment_context; }
66
67
    QueryContext* query_context();
68
69
44
    int get_core_id() const { return _core_id; }
70
71
0
    void set_core_id(int id) {
72
0
        if (id != _core_id) {
73
0
            if (_core_id != -1) {
74
0
                COUNTER_UPDATE(_core_change_times, 1);
75
0
            }
76
0
            _core_id = id;
77
0
        }
78
0
    }
79
80
    void finalize();
81
82
    std::string debug_string();
83
84
5
    bool is_pending_finish() {
85
5
        for (auto* fin_dep : _finish_dependencies) {
86
5
            _blocked_dep = fin_dep->is_blocked_by(this);
87
5
            if (_blocked_dep != nullptr) {
88
0
                _blocked_dep->start_watcher();
89
0
                return true;
90
0
            }
91
5
        }
92
5
        return false;
93
5
    }
94
95
0
    std::shared_ptr<BasicSharedState> get_source_shared_state() {
96
0
        return _op_shared_states.contains(_source->operator_id())
97
0
                       ? _op_shared_states[_source->operator_id()]
98
0
                       : nullptr;
99
0
    }
100
101
2
    void inject_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
102
2
        if (!shared_state) {
103
0
            return;
104
0
        }
105
        // Shared state is created by upstream task's sink operator and shared by source operator of this task.
106
4
        for (auto& op : _operators) {
107
4
            if (shared_state->related_op_ids.contains(op->operator_id())) {
108
2
                _op_shared_states.insert({op->operator_id(), shared_state});
109
2
                return;
110
2
            }
111
4
        }
112
0
        if (shared_state->related_op_ids.contains(_sink->dests_id().front())) {
113
0
            DCHECK_EQ(_sink_shared_state, nullptr)
114
0
                    << " Sink: " << _sink->get_name() << " dest id: " << _sink->dests_id().front();
115
0
            _sink_shared_state = shared_state;
116
0
        }
117
0
    }
118
119
7
    std::shared_ptr<BasicSharedState> get_sink_shared_state() { return _sink_shared_state; }
120
121
7
    BasicSharedState* get_op_shared_state(int id) {
122
7
        if (!_op_shared_states.contains(id)) {
123
5
            return nullptr;
124
5
        }
125
2
        return _op_shared_states[id].get();
126
7
    }
127
128
    void wake_up();
129
130
0
    DataSinkOperatorPtr sink() const { return _sink; }
131
132
0
    int task_id() const { return _index; };
133
0
    bool is_finalized() const { return _finalized; }
134
135
0
    void set_wake_up_early() { _wake_up_early = true; }
136
137
0
    void clear_blocking_state() {
138
        // We use a lock to assure all dependencies are not deconstructed here.
139
0
        std::unique_lock<std::mutex> lc(_dependency_lock);
140
0
        if (!_finalized) {
141
0
            for (auto* dep : _spill_dependencies) {
142
0
                dep->set_always_ready();
143
0
            }
144
145
0
            for (auto* dep : _filter_dependencies) {
146
0
                dep->set_always_ready();
147
0
            }
148
0
            for (auto& deps : _read_dependencies) {
149
0
                for (auto* dep : deps) {
150
0
                    dep->set_always_ready();
151
0
                }
152
0
            }
153
0
            for (auto* dep : _write_dependencies) {
154
0
                dep->set_always_ready();
155
0
            }
156
0
            for (auto* dep : _finish_dependencies) {
157
0
                dep->set_always_ready();
158
0
            }
159
0
        }
160
0
    }
161
162
5
    void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; }
163
17
    MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
164
165
    static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
166
167
    // 1 used for update priority queue
168
    // note(wb) an ugly implementation, need refactor later
169
    // 1.1 pipeline task
170
0
    void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; }
171
17
    uint64_t get_runtime_ns() const { return this->_runtime; }
172
173
    // 1.2 priority queue's queue level
174
10
    void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
175
0
    int get_queue_level() const { return this->_queue_level; }
176
177
17
    void put_in_runnable_queue() {
178
17
        _schedule_time++;
179
17
        _wait_worker_watcher.start();
180
17
    }
181
182
10
    void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
183
184
0
    bool is_running() { return _running.load(); }
185
0
    bool is_revoking() {
186
0
        for (auto* dep : _spill_dependencies) {
187
0
            if (dep->is_blocked_by(nullptr) != nullptr) {
188
0
                return true;
189
0
            }
190
0
        }
191
0
        return false;
192
0
    }
193
0
    bool set_running(bool running) { return _running.exchange(running); }
194
195
0
    bool is_exceed_debug_timeout() {
196
0
        if (_has_exceed_timeout) {
197
0
            return true;
198
0
        }
199
        // If enable_debug_log_timeout_secs <= 0, then disable the log
200
0
        if (_pipeline_task_watcher.elapsed_time() >
201
0
            config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) {
202
0
            _has_exceed_timeout = true;
203
0
            return true;
204
0
        }
205
0
        return false;
206
0
    }
207
208
0
    void log_detail_if_need() {
209
0
        if (config::enable_debug_log_timeout_secs < 1) {
210
0
            return;
211
0
        }
212
0
        if (is_exceed_debug_timeout()) {
213
0
            LOG(INFO) << "query id|instanceid " << print_id(_state->query_id()) << "|"
214
0
                      << print_id(_state->fragment_instance_id())
215
0
                      << " current pipeline exceed run time "
216
0
                      << config::enable_debug_log_timeout_secs << " seconds. "
217
0
                      << "/n task detail:" << debug_string();
218
0
        }
219
0
    }
220
221
0
    RuntimeState* runtime_state() const { return _state; }
222
223
0
    RuntimeProfile* get_task_profile() const { return _task_profile.get(); }
224
225
0
    std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }
226
227
0
    void stop_if_finished() {
228
0
        if (_sink->is_finished(_state)) {
229
0
            clear_blocking_state();
230
0
        }
231
0
    }
232
233
0
    PipelineId pipeline_id() const { return _pipeline->id(); }
234
    [[nodiscard]] size_t get_revocable_size() const;
235
    [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& spill_context);
236
237
3
    void add_spill_dependency(Dependency* dependency) {
238
3
        _spill_dependencies.emplace_back(dependency);
239
3
    }
240
241
76
    bool wake_up_early() const { return _wake_up_early; }
242
243
0
    void inc_memory_reserve_failed_times() { COUNTER_UPDATE(_memory_reserve_failed_times, 1); }
244
245
private:
246
    friend class RuntimeFilterDependency;
247
    bool _is_blocked();
248
    bool _wait_to_start();
249
250
    Status _extract_dependencies();
251
    void _init_profile();
252
    void _fresh_profile_counter();
253
    Status _open();
254
255
    uint32_t _index;
256
    PipelinePtr _pipeline;
257
    bool _has_exceed_timeout = false;
258
    bool _opened;
259
    RuntimeState* _state = nullptr;
260
    int _core_id = -1;
261
    uint32_t _schedule_time = 0;
262
    std::unique_ptr<vectorized::Block> _block;
263
264
    PipelineFragmentContext* _fragment_context = nullptr;
265
    MultiCoreTaskQueue* _task_queue = nullptr;
266
267
    // used for priority queue
268
    // it may be visited by different thread but there is no race condition
269
    // so no need to add lock
270
    uint64_t _runtime = 0;
271
    // it's visited in one thread, so no need to thread synchronization
272
    // 1 get task, (set _queue_level/_core_id)
273
    // 2 exe task
274
    // 3 update task statistics(update _queue_level/_core_id)
275
    int _queue_level = 0;
276
277
    RuntimeProfile* _parent_profile = nullptr;
278
    std::unique_ptr<RuntimeProfile> _task_profile;
279
    RuntimeProfile::Counter* _task_cpu_timer = nullptr;
280
    RuntimeProfile::Counter* _prepare_timer = nullptr;
281
    RuntimeProfile::Counter* _open_timer = nullptr;
282
    RuntimeProfile::Counter* _exec_timer = nullptr;
283
    RuntimeProfile::Counter* _get_block_timer = nullptr;
284
    RuntimeProfile::Counter* _get_block_counter = nullptr;
285
    RuntimeProfile::Counter* _sink_timer = nullptr;
286
    RuntimeProfile::Counter* _close_timer = nullptr;
287
    RuntimeProfile::Counter* _schedule_counts = nullptr;
288
    MonotonicStopWatch _wait_worker_watcher;
289
    RuntimeProfile::Counter* _wait_worker_timer = nullptr;
290
    // TODO we should calculate the time between when really runnable and runnable
291
    RuntimeProfile::Counter* _yield_counts = nullptr;
292
    RuntimeProfile::Counter* _core_change_times = nullptr;
293
    RuntimeProfile::Counter* _memory_reserve_times = nullptr;
294
    RuntimeProfile::Counter* _memory_reserve_failed_times = nullptr;
295
296
    MonotonicStopWatch _pipeline_task_watcher;
297
298
    Operators _operators; // left is _source, right is _root
299
    OperatorXBase* _source;
300
    OperatorXBase* _root;
301
    DataSinkOperatorPtr _sink;
302
303
    // `_read_dependencies` is stored as same order as `_operators`
304
    std::vector<std::vector<Dependency*>> _read_dependencies;
305
    std::vector<Dependency*> _spill_dependencies;
306
    std::vector<Dependency*> _write_dependencies;
307
    std::vector<Dependency*> _finish_dependencies;
308
    std::vector<Dependency*> _filter_dependencies;
309
310
    // All shared states of this pipeline task.
311
    std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
312
    std::shared_ptr<BasicSharedState> _sink_shared_state;
313
    std::vector<TScanRangeParams> _scan_ranges;
314
    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
315
            _le_state_map;
316
    int _task_idx;
317
    bool _dry_run = false;
318
319
    Dependency* _blocked_dep = nullptr;
320
321
    Dependency* _execution_dep = nullptr;
322
    Dependency* _memory_sufficient_dependency;
323
324
    std::atomic<bool> _finalized {false};
325
    std::mutex _dependency_lock;
326
327
    std::atomic<bool> _running {false};
328
    std::atomic<bool> _eos {false};
329
    std::atomic<bool> _wake_up_early {false};
330
331
    /**
332
     * State of this pipeline task.
333
     * `NORMAL` means a task executes normally without spilling.
334
     * `PENDING` means the last execute round is blocked by poor free memory.
335
     * `EOS` means the last execute round is blocked by poor free memory and it is the last block.
336
     */
337
    enum class State : int {
338
        NORMAL,
339
        PENDING,
340
        EOS,
341
    };
342
343
    State _exec_state = State::NORMAL;
344
};
345
346
} // namespace doris::pipeline