Coverage Report

Created: 2024-11-21 18:14

/root/doris/be/src/pipeline/pipeline_task.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include <memory>
23
#include <string>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "pipeline/dependency.h"
28
#include "pipeline/exec/operator.h"
29
#include "pipeline/pipeline.h"
30
#include "util/runtime_profile.h"
31
#include "util/stopwatch.hpp"
32
#include "vec/core/block.h"
33
34
namespace doris {
35
class QueryContext;
36
class RuntimeState;
37
namespace pipeline {
38
class PipelineFragmentContext;
39
} // namespace pipeline
40
} // namespace doris
41
42
namespace doris::pipeline {
43
44
class MultiCoreTaskQueue;
45
class PriorityTaskQueue;
46
class Dependency;
47
48
class PipelineTask {
49
public:
50
    PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
51
                 PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
52
                 std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
53
                                         std::shared_ptr<Dependency>>>
54
                         le_state_map,
55
                 int task_idx);
56
57
    Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
58
                   QueryContext* query_ctx);
59
60
    Status execute(bool* eos);
61
62
    // if the pipeline create a bunch of pipeline task
63
    // must be call after all pipeline task is finish to release resource
64
    Status close(Status exec_status);
65
66
0
    PipelineFragmentContext* fragment_context() { return _fragment_context; }
67
68
    QueryContext* query_context();
69
70
0
    int get_previous_core_id() const {
71
0
        return _previous_schedule_id != -1 ? _previous_schedule_id
72
0
                                           : _pipeline->_previous_schedule_id;
73
0
    }
74
75
0
    void set_previous_core_id(int id) {
76
0
        if (id != _previous_schedule_id) {
77
0
            if (_previous_schedule_id != -1) {
78
0
                COUNTER_UPDATE(_core_change_times, 1);
79
0
            }
80
0
            _previous_schedule_id = id;
81
0
        }
82
0
    }
83
84
    void finalize();
85
86
    std::string debug_string();
87
88
0
    bool is_pending_finish() {
89
0
        for (auto* fin_dep : _finish_dependencies) {
90
0
            _blocked_dep = fin_dep->is_blocked_by(this);
91
0
            if (_blocked_dep != nullptr) {
92
0
                _blocked_dep->start_watcher();
93
0
                return true;
94
0
            }
95
0
        }
96
0
        return false;
97
0
    }
98
99
0
    std::shared_ptr<BasicSharedState> get_source_shared_state() {
100
0
        return _op_shared_states.contains(_source->operator_id())
101
0
                       ? _op_shared_states[_source->operator_id()]
102
0
                       : nullptr;
103
0
    }
104
105
0
    void inject_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
106
0
        if (!shared_state) {
107
0
            return;
108
0
        }
109
        // Shared state is created by upstream task's sink operator and shared by source operator of this task.
110
0
        for (auto& op : _operators) {
111
0
            if (shared_state->related_op_ids.contains(op->operator_id())) {
112
0
                _op_shared_states.insert({op->operator_id(), shared_state});
113
0
                return;
114
0
            }
115
0
        }
116
0
        if (shared_state->related_op_ids.contains(_sink->dests_id().front())) {
117
0
            DCHECK(_sink_shared_state == nullptr);
118
0
            _sink_shared_state = shared_state;
119
0
        }
120
0
    }
121
122
0
    std::shared_ptr<BasicSharedState> get_sink_shared_state() { return _sink_shared_state; }
123
124
0
    BasicSharedState* get_op_shared_state(int id) {
125
0
        if (!_op_shared_states.contains(id)) {
126
0
            return nullptr;
127
0
        }
128
0
        return _op_shared_states[id].get();
129
0
    }
130
131
    void wake_up();
132
133
0
    DataSinkOperatorPtr sink() const { return _sink; }
134
135
0
    int task_id() const { return _index; };
136
0
    bool is_finalized() const { return _finalized; }
137
138
0
    void clear_blocking_state(bool wake_up_by_downstream = false) {
139
0
        _state->get_query_ctx()->get_execution_dependency()->set_always_ready();
140
        // We use a lock to assure all dependencies are not deconstructed here.
141
0
        std::unique_lock<std::mutex> lc(_dependency_lock);
142
0
        _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
143
0
        if (!_finalized) {
144
0
            _execution_dep->set_always_ready();
145
0
            for (auto* dep : _filter_dependencies) {
146
0
                dep->set_always_ready();
147
0
            }
148
0
            for (auto& deps : _read_dependencies) {
149
0
                for (auto* dep : deps) {
150
0
                    dep->set_always_ready();
151
0
                }
152
0
            }
153
0
            for (auto* dep : _write_dependencies) {
154
0
                dep->set_always_ready();
155
0
            }
156
0
            for (auto* dep : _finish_dependencies) {
157
0
                dep->set_always_ready();
158
0
            }
159
0
        }
160
0
    }
161
162
0
    void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; }
163
0
    MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
164
165
    static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
166
167
    // 1 used for update priority queue
168
    // note(wb) an ugly implementation, need refactor later
169
    // 1.1 pipeline task
170
0
    void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; }
171
0
    uint64_t get_runtime_ns() const { return this->_runtime; }
172
173
    // 1.2 priority queue's queue level
174
0
    void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
175
0
    int get_queue_level() const { return this->_queue_level; }
176
177
    // 1.3 priority queue's core id
178
0
    void set_core_id(int core_id) { this->_core_id = core_id; }
179
0
    int get_core_id() const { return this->_core_id; }
180
181
    /**
182
     * Return true if:
183
     * 1. `enable_force_spill` is true which forces this task to spill data.
184
     * 2. Or memory consumption reaches the high water mark of current workload group (80% of memory limitation by default) and revocable_mem_bytes is bigger than min_revocable_mem_bytes.
185
     * 3. Or memory consumption is higher than the low water mark of current workload group (50% of memory limitation by default) and `query_weighted_consumption >= query_weighted_limit` and revocable memory is big enough.
186
     */
187
    static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes);
188
189
0
    void put_in_runnable_queue() {
190
0
        _schedule_time++;
191
0
        _wait_worker_watcher.start();
192
0
    }
193
194
0
    void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
195
196
0
    bool is_running() { return _running.load(); }
197
0
    void set_running(bool running) { _running = running; }
198
199
0
    bool is_exceed_debug_timeout() {
200
0
        if (_has_exceed_timeout) {
201
0
            return true;
202
0
        }
203
        // If enable_debug_log_timeout_secs <= 0, then disable the log
204
0
        if (_pipeline_task_watcher.elapsed_time() >
205
0
            config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) {
206
0
            _has_exceed_timeout = true;
207
0
            return true;
208
0
        }
209
0
        return false;
210
0
    }
211
212
0
    void log_detail_if_need() {
213
0
        if (config::enable_debug_log_timeout_secs < 1) {
214
0
            return;
215
0
        }
216
0
        if (is_exceed_debug_timeout()) {
217
0
            LOG(INFO) << "query id|instanceid " << print_id(_state->query_id()) << "|"
218
0
                      << print_id(_state->fragment_instance_id())
219
0
                      << " current pipeline exceed run time "
220
0
                      << config::enable_debug_log_timeout_secs << " seconds. "
221
0
                      << "/n task detail:" << debug_string();
222
0
        }
223
0
    }
224
225
0
    RuntimeState* runtime_state() const { return _state; }
226
227
0
    RuntimeProfile* get_task_profile() const { return _task_profile.get(); }
228
229
0
    std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }
230
231
0
    void stop_if_finished() {
232
0
        if (_sink->is_finished(_state)) {
233
0
            clear_blocking_state();
234
0
        }
235
0
    }
236
237
0
    PipelineId pipeline_id() const { return _pipeline->id(); }
238
239
0
    bool wake_up_by_downstream() const { return _wake_up_by_downstream; }
240
241
private:
242
    friend class RuntimeFilterDependency;
243
    bool _is_blocked();
244
    bool _wait_to_start();
245
246
    Status _extract_dependencies();
247
    void _init_profile();
248
    void _fresh_profile_counter();
249
    Status _open();
250
251
    uint32_t _index;
252
    PipelinePtr _pipeline;
253
    bool _has_exceed_timeout = false;
254
    bool _opened;
255
    RuntimeState* _state = nullptr;
256
    int _previous_schedule_id = -1;
257
    uint32_t _schedule_time = 0;
258
    std::unique_ptr<doris::vectorized::Block> _block;
259
    PipelineFragmentContext* _fragment_context = nullptr;
260
    MultiCoreTaskQueue* _task_queue = nullptr;
261
262
    // used for priority queue
263
    // it may be visited by different thread but there is no race condition
264
    // so no need to add lock
265
    uint64_t _runtime = 0;
266
    // it's visited in one thread, so no need to thread synchronization
267
    // 1 get task, (set _queue_level/_core_id)
268
    // 2 exe task
269
    // 3 update task statistics(update _queue_level/_core_id)
270
    int _queue_level = 0;
271
    int _core_id = 0;
272
273
    RuntimeProfile* _parent_profile = nullptr;
274
    std::unique_ptr<RuntimeProfile> _task_profile;
275
    RuntimeProfile::Counter* _task_cpu_timer = nullptr;
276
    RuntimeProfile::Counter* _prepare_timer = nullptr;
277
    RuntimeProfile::Counter* _open_timer = nullptr;
278
    RuntimeProfile::Counter* _exec_timer = nullptr;
279
    RuntimeProfile::Counter* _get_block_timer = nullptr;
280
    RuntimeProfile::Counter* _get_block_counter = nullptr;
281
    RuntimeProfile::Counter* _sink_timer = nullptr;
282
    RuntimeProfile::Counter* _close_timer = nullptr;
283
    RuntimeProfile::Counter* _schedule_counts = nullptr;
284
    MonotonicStopWatch _wait_worker_watcher;
285
    RuntimeProfile::Counter* _wait_worker_timer = nullptr;
286
    // TODO we should calculate the time between when really runnable and runnable
287
    RuntimeProfile::Counter* _yield_counts = nullptr;
288
    RuntimeProfile::Counter* _core_change_times = nullptr;
289
290
    MonotonicStopWatch _pipeline_task_watcher;
291
292
    Operators _operators; // left is _source, right is _root
293
    OperatorXBase* _source;
294
    OperatorXBase* _root;
295
    DataSinkOperatorPtr _sink;
296
297
    // `_read_dependencies` is stored as same order as `_operators`
298
    std::vector<std::vector<Dependency*>> _read_dependencies;
299
    std::vector<Dependency*> _write_dependencies;
300
    std::vector<Dependency*> _finish_dependencies;
301
    std::vector<Dependency*> _filter_dependencies;
302
303
    // All shared states of this pipeline task.
304
    std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
305
    std::shared_ptr<BasicSharedState> _sink_shared_state;
306
    std::vector<TScanRangeParams> _scan_ranges;
307
    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
308
            _le_state_map;
309
    int _task_idx;
310
    bool _dry_run = false;
311
312
    Dependency* _blocked_dep = nullptr;
313
314
    Dependency* _execution_dep = nullptr;
315
316
    std::atomic<bool> _finalized = false;
317
    std::mutex _dependency_lock;
318
319
    std::atomic<bool> _running = false;
320
    std::atomic<bool> _eos = false;
321
    std::atomic<bool> _wake_up_by_downstream = false;
322
};
323
324
} // namespace doris::pipeline