/root/doris/be/src/pipeline/pipeline_task.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstdint> |
21 | | #include <memory> |
22 | | #include <string> |
23 | | #include <string_view> |
24 | | |
25 | | #include "common/config.h" |
26 | | #include "common/status.h" |
27 | | #include "exec/operator.h" |
28 | | #include "pipeline.h" |
29 | | #include "runtime/workload_group/workload_group.h" |
30 | | #include "util/runtime_profile.h" |
31 | | #include "util/stopwatch.hpp" |
32 | | #include "vec/core/block.h" |
33 | | |
34 | | namespace doris { |
35 | | class QueryContext; |
36 | | class RuntimeState; |
37 | | namespace pipeline { |
38 | | class PipelineFragmentContext; |
39 | | } // namespace pipeline |
40 | | } // namespace doris |
41 | | |
42 | | namespace doris::pipeline { |
43 | | |
44 | | /** |
45 | | * PipelineTaskState indicates all possible states of a pipeline task. |
46 | | * A FSM is described as below: |
47 | | * |
48 | | * |-----------------------------------------------------| |
49 | | * |---| transfer 2 transfer 3 | transfer 4 |
50 | | * |-------> BLOCKED ------------| |---------------------------------------> CANCELED |
51 | | * |------| | | transfer 5 transfer 6| |
52 | | * NOT_READY ---| transfer 0 |-----> RUNNABLE ---|---------> PENDING_FINISH ------| |
53 | | * | | ^ | transfer 7| |
54 | | * |------------------------------------| |--------|---------------------------------------> FINISHED |
55 | | * transfer 1 transfer 9 transfer 8 |
56 | | * BLOCKED include BLOCKED_FOR_DEPENDENCY, BLOCKED_FOR_SOURCE and BLOCKED_FOR_SINK. |
57 | | * |
58 | | * transfer 0 (NOT_READY -> BLOCKED): this pipeline task has some incomplete dependencies |
59 | | * transfer 1 (NOT_READY -> RUNNABLE): this pipeline task has no incomplete dependencies |
60 | | * transfer 2 (BLOCKED -> RUNNABLE): runnable condition for this pipeline task is met (e.g. get a new block from rpc) |
61 | | * transfer 3 (RUNNABLE -> BLOCKED): runnable condition for this pipeline task is not met (e.g. sink operator send a block by RPC and wait for a response) |
62 | | * transfer 4 (RUNNABLE -> CANCELED): current fragment is cancelled |
63 | | * transfer 5 (RUNNABLE -> PENDING_FINISH): this pipeline task completed but wait for releasing resources hold by itself |
64 | | * transfer 6 (PENDING_FINISH -> CANCELED): current fragment is cancelled |
65 | | * transfer 7 (PENDING_FINISH -> FINISHED): this pipeline task completed and resources hold by itself have been released already |
66 | | * transfer 8 (RUNNABLE -> FINISHED): this pipeline task completed and no resource need to be released |
67 | | * transfer 9 (RUNNABLE -> RUNNABLE): this pipeline task yields CPU and re-enters the runnable queue if it is runnable and has occupied CPU for a max time slice |
68 | | */ |
69 | | enum class PipelineTaskState : uint8_t { |
70 | | NOT_READY = 0, // do not prepare |
71 | | BLOCKED_FOR_DEPENDENCY = 1, |
72 | | BLOCKED_FOR_SOURCE = 2, |
73 | | BLOCKED_FOR_SINK = 3, |
74 | | RUNNABLE = 4, // can execute |
75 | | PENDING_FINISH = |
76 | | 5, // compute task is over, but still hold resource. like some scan and sink task |
77 | | FINISHED = 6, |
78 | | CANCELED = 7, |
79 | | BLOCKED_FOR_RF = 8, |
80 | | }; |
81 | | |
82 | 0 | inline const char* get_state_name(PipelineTaskState idx) { |
83 | 0 | switch (idx) { |
84 | 0 | case PipelineTaskState::NOT_READY: |
85 | 0 | return "NOT_READY"; |
86 | 0 | case PipelineTaskState::BLOCKED_FOR_DEPENDENCY: |
87 | 0 | return "BLOCKED_FOR_DEPENDENCY"; |
88 | 0 | case PipelineTaskState::BLOCKED_FOR_SOURCE: |
89 | 0 | return "BLOCKED_FOR_SOURCE"; |
90 | 0 | case PipelineTaskState::BLOCKED_FOR_SINK: |
91 | 0 | return "BLOCKED_FOR_SINK"; |
92 | 0 | case PipelineTaskState::RUNNABLE: |
93 | 0 | return "RUNNABLE"; |
94 | 0 | case PipelineTaskState::PENDING_FINISH: |
95 | 0 | return "PENDING_FINISH"; |
96 | 0 | case PipelineTaskState::FINISHED: |
97 | 0 | return "FINISHED"; |
98 | 0 | case PipelineTaskState::CANCELED: |
99 | 0 | return "CANCELED"; |
100 | 0 | case PipelineTaskState::BLOCKED_FOR_RF: |
101 | 0 | return "BLOCKED_FOR_RF"; |
102 | 0 | } |
103 | 0 | LOG(FATAL) << "__builtin_unreachable"; |
104 | 0 | __builtin_unreachable(); |
105 | 0 | } |
106 | | |
107 | 0 | inline bool is_final_state(PipelineTaskState idx) { |
108 | 0 | switch (idx) { |
109 | 0 | case PipelineTaskState::FINISHED: |
110 | 0 | case PipelineTaskState::CANCELED: |
111 | 0 | return true; |
112 | 0 | default: |
113 | 0 | return false; |
114 | 0 | } |
115 | 0 | } |
116 | | |
117 | | class TaskQueue; |
118 | | class PriorityTaskQueue; |
119 | | |
120 | | // The class do the pipeline task. Minest schdule union by task scheduler |
121 | | class PipelineTask { |
122 | | public: |
123 | | PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, OperatorPtr& sink, |
124 | | PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile); |
125 | | |
126 | | PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, |
127 | | PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile); |
128 | 0 | virtual ~PipelineTask() = default; |
129 | | |
130 | | virtual Status prepare(RuntimeState* state); |
131 | | |
132 | | virtual Status execute(bool* eos); |
133 | | |
134 | | // if the pipeline create a bunch of pipeline task |
135 | | // must be call after all pipeline task is finish to release resource |
136 | | virtual Status close(Status exec_status, bool close_sink = true); |
137 | | |
138 | 0 | void put_in_runnable_queue() { |
139 | 0 | _schedule_time++; |
140 | 0 | _wait_worker_watcher.start(); |
141 | 0 | } |
142 | 0 | void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } |
143 | 0 | PipelineTaskState get_state() const { return _cur_state; } |
144 | | void set_state(PipelineTaskState state); |
145 | | |
146 | 0 | virtual bool is_pending_finish() { |
147 | 0 | bool source_ret = _source->is_pending_finish(); |
148 | 0 | if (source_ret) { |
149 | 0 | return true; |
150 | 0 | } else { |
151 | 0 | this->set_src_pending_finish_time(); |
152 | 0 | } |
153 | | |
154 | 0 | bool sink_ret = _sink->is_pending_finish(); |
155 | 0 | if (sink_ret) { |
156 | 0 | return true; |
157 | 0 | } else { |
158 | 0 | this->set_dst_pending_finish_time(); |
159 | 0 | } |
160 | 0 | return false; |
161 | 0 | } |
162 | | |
163 | 0 | virtual bool source_can_read() { return _source->can_read() || _pipeline->_always_can_read; } |
164 | | |
165 | 0 | virtual bool runtime_filters_are_ready_or_timeout() { |
166 | 0 | return _source->runtime_filters_are_ready_or_timeout(); |
167 | 0 | } |
168 | | |
169 | 0 | virtual bool sink_can_write() { return _sink->can_write() || _pipeline->_always_can_write; } |
170 | | |
171 | 0 | virtual void finalize() {} |
172 | | |
173 | 0 | PipelineFragmentContext* fragment_context() { return _fragment_context; } |
174 | | |
175 | | QueryContext* query_context(); |
176 | | |
177 | 0 | int get_previous_core_id() const { |
178 | 0 | return _previous_schedule_id != -1 ? _previous_schedule_id |
179 | 0 | : _pipeline->_previous_schedule_id; |
180 | 0 | } |
181 | | |
182 | 0 | void set_previous_core_id(int id) { |
183 | 0 | if (id == _previous_schedule_id) { |
184 | 0 | return; |
185 | 0 | } |
186 | 0 | if (_previous_schedule_id != -1) { |
187 | 0 | COUNTER_UPDATE(_core_change_times, 1); |
188 | 0 | } |
189 | 0 | _previous_schedule_id = id; |
190 | 0 | } |
191 | | |
192 | | virtual bool has_dependency(); |
193 | | |
194 | 0 | OperatorPtr get_root() { return _root; } |
195 | | |
196 | | virtual std::string debug_string(); |
197 | | |
198 | | void set_task_queue(TaskQueue* task_queue); |
199 | 0 | TaskQueue* get_task_queue() { return _task_queue; } |
200 | | |
201 | | static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL; |
202 | | |
203 | | // 1 used for update priority queue |
204 | | // note(wb) an ugly implementation, need refactor later |
205 | | // 1.1 pipeline task |
206 | 0 | void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; } |
207 | 0 | uint64_t get_runtime_ns() const { return this->_runtime; } |
208 | | |
209 | | // 1.2 priority queue's queue level |
210 | 0 | void update_queue_level(int queue_level) { this->_queue_level = queue_level; } |
211 | 0 | int get_queue_level() const { return this->_queue_level; } |
212 | | |
213 | | // 1.3 priority queue's core id |
214 | 0 | void set_core_id(int core_id) { this->_core_id = core_id; } |
215 | 0 | int get_core_id() const { return this->_core_id; } |
216 | | |
217 | 0 | void set_begin_execute_time() { |
218 | 0 | if (!_is_first_time_to_execute) { |
219 | 0 | _begin_execute_time = _pipeline_task_watcher.elapsed_time(); |
220 | 0 | _is_first_time_to_execute = true; |
221 | 0 | } |
222 | 0 | } |
223 | | |
224 | 0 | void set_eos_time() { |
225 | 0 | if (!_is_eos) { |
226 | 0 | _eos_time = _pipeline_task_watcher.elapsed_time(); |
227 | 0 | _is_eos = true; |
228 | 0 | } |
229 | 0 | } |
230 | | |
231 | 0 | void set_src_pending_finish_time() { |
232 | 0 | if (!_is_src_pending_finish_over) { |
233 | 0 | _src_pending_finish_over_time = _pipeline_task_watcher.elapsed_time(); |
234 | 0 | _is_src_pending_finish_over = true; |
235 | 0 | } |
236 | 0 | } |
237 | | |
238 | 0 | void set_dst_pending_finish_time() { |
239 | 0 | if (!_is_dst_pending_finish_over) { |
240 | 0 | _dst_pending_finish_over_time = _pipeline_task_watcher.elapsed_time(); |
241 | 0 | _is_dst_pending_finish_over = true; |
242 | 0 | } |
243 | 0 | } |
244 | | |
245 | 0 | virtual bool is_finished() const { return false; } |
246 | | |
247 | 0 | virtual void set_close_pipeline_time() { |
248 | 0 | if (!_is_close_pipeline) { |
249 | 0 | _close_pipeline_time = _pipeline_task_watcher.elapsed_time(); |
250 | 0 | _is_close_pipeline = true; |
251 | 0 | COUNTER_SET(_close_pipeline_timer, _close_pipeline_time); |
252 | 0 | } |
253 | 0 | } |
254 | | |
255 | 0 | TUniqueId instance_id() const { return _state->fragment_instance_id(); } |
256 | | |
257 | 0 | void set_parent_profile(RuntimeProfile* profile) { _parent_profile = profile; } |
258 | | |
259 | 0 | virtual bool is_pipelineX() const { return false; } |
260 | | |
261 | 0 | bool is_running() { return _running.load(); } |
262 | 0 | void set_running(bool running) { _running = running; } |
263 | | |
264 | 0 | bool is_exceed_debug_timeout() { |
265 | 0 | if (_has_exceed_timeout) { |
266 | 0 | return true; |
267 | 0 | } |
268 | | // If enable_debug_log_timeout_secs <= 0, then disable the log |
269 | 0 | if (_pipeline_task_watcher.elapsed_time() > |
270 | 0 | config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) { |
271 | 0 | _has_exceed_timeout = true; |
272 | 0 | return true; |
273 | 0 | } |
274 | 0 | return false; |
275 | 0 | } |
276 | | |
277 | 0 | void log_detail_if_need() { |
278 | 0 | if (config::enable_debug_log_timeout_secs < 1) { |
279 | 0 | return; |
280 | 0 | } |
281 | 0 | if (is_exceed_debug_timeout()) { |
282 | 0 | LOG(INFO) << "query id|instanceid " << print_id(_state->query_id()) << "|" |
283 | 0 | << print_id(_state->fragment_instance_id()) |
284 | 0 | << " current pipeline exceed run time " |
285 | 0 | << config::enable_debug_log_timeout_secs << " seconds. Task state " |
286 | 0 | << get_state_name(get_state()) << "/n task detail:" << debug_string(); |
287 | 0 | } |
288 | 0 | } |
289 | | |
290 | 0 | RuntimeState* runtime_state() const { return _state; } |
291 | | |
292 | 0 | std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } |
293 | | |
294 | 0 | PipelineId pipeline_id() const { return _pipeline->id(); } |
295 | | |
296 | 0 | virtual void clear_blocking_state() {} |
297 | 0 | virtual void set_wake_up_early() {} |
298 | | |
299 | | protected: |
300 | 0 | void _finish_p_dependency() { |
301 | 0 | for (const auto& p : _pipeline->_parents) { |
302 | 0 | p.second.lock()->finish_one_dependency(p.first, _previous_schedule_id); |
303 | 0 | } |
304 | 0 | } |
305 | | |
306 | | virtual Status _open(); |
307 | | virtual void _init_profile(); |
308 | | virtual void _fresh_profile_counter(); |
309 | | |
310 | | uint32_t _index; |
311 | | PipelinePtr _pipeline; |
312 | | bool _dependency_finish = false; |
313 | | bool _has_exceed_timeout = false; |
314 | | bool _prepared; |
315 | | bool _opened; |
316 | | RuntimeState* _state = nullptr; |
317 | | int _previous_schedule_id = -1; |
318 | | uint32_t _schedule_time = 0; |
319 | | PipelineTaskState _cur_state; |
320 | | SourceState _data_state; |
321 | | std::unique_ptr<doris::vectorized::Block> _block; |
322 | | PipelineFragmentContext* _fragment_context = nullptr; |
323 | | TaskQueue* _task_queue = nullptr; |
324 | | |
325 | | // used for priority queue |
326 | | // it may be visited by different thread but there is no race condition |
327 | | // so no need to add lock |
328 | | uint64_t _runtime = 0; |
329 | | // it's visited in one thread, so no need to thread synchronization |
330 | | // 1 get task, (set _queue_level/_core_id) |
331 | | // 2 exe task |
332 | | // 3 update task statistics(update _queue_level/_core_id) |
333 | | int _queue_level = 0; |
334 | | int _core_id = 0; |
335 | | Status _open_status = Status::OK(); |
336 | | |
337 | | RuntimeProfile* _parent_profile = nullptr; |
338 | | std::unique_ptr<RuntimeProfile> _task_profile; |
339 | | RuntimeProfile::Counter* _task_cpu_timer = nullptr; |
340 | | RuntimeProfile::Counter* _prepare_timer = nullptr; |
341 | | RuntimeProfile::Counter* _open_timer = nullptr; |
342 | | RuntimeProfile::Counter* _exec_timer = nullptr; |
343 | | RuntimeProfile::Counter* _get_block_timer = nullptr; |
344 | | RuntimeProfile::Counter* _get_block_counter = nullptr; |
345 | | RuntimeProfile::Counter* _sink_timer = nullptr; |
346 | | RuntimeProfile::Counter* _close_timer = nullptr; |
347 | | RuntimeProfile::Counter* _block_counts = nullptr; |
348 | | RuntimeProfile::Counter* _block_by_source_counts = nullptr; |
349 | | RuntimeProfile::Counter* _block_by_sink_counts = nullptr; |
350 | | RuntimeProfile::Counter* _schedule_counts = nullptr; |
351 | | MonotonicStopWatch _wait_source_watcher; |
352 | | RuntimeProfile::Counter* _wait_source_timer = nullptr; |
353 | | MonotonicStopWatch _wait_bf_watcher; |
354 | | RuntimeProfile::Counter* _wait_bf_timer = nullptr; |
355 | | RuntimeProfile::Counter* _wait_bf_counts = nullptr; |
356 | | MonotonicStopWatch _wait_sink_watcher; |
357 | | RuntimeProfile::Counter* _wait_sink_timer = nullptr; |
358 | | MonotonicStopWatch _wait_worker_watcher; |
359 | | RuntimeProfile::Counter* _wait_worker_timer = nullptr; |
360 | | RuntimeProfile::Counter* _wait_dependency_counts = nullptr; |
361 | | RuntimeProfile::Counter* _pending_finish_counts = nullptr; |
362 | | // TODO we should calculate the time between when really runnable and runnable |
363 | | RuntimeProfile::Counter* _yield_counts = nullptr; |
364 | | RuntimeProfile::Counter* _core_change_times = nullptr; |
365 | | |
366 | | // The monotonic time of the entire lifecycle of the pipelinetask, almost synchronized with the pipfragmentctx |
367 | | // There are several important time points: |
368 | | // 1 first time pipelinetask to execute |
369 | | // 2 task eos |
370 | | // 3 src pending finish over |
371 | | // 4 dst pending finish over |
372 | | // 5 close pipeline time, we mark this beacause pending finish state may change |
373 | | MonotonicStopWatch _pipeline_task_watcher; |
374 | | // time 1 |
375 | | bool _is_first_time_to_execute = false; |
376 | | RuntimeProfile::Counter* _begin_execute_timer = nullptr; |
377 | | int64_t _begin_execute_time = 0; |
378 | | // time 2 |
379 | | bool _is_eos = false; |
380 | | RuntimeProfile::Counter* _eos_timer = nullptr; |
381 | | int64_t _eos_time = 0; |
382 | | //time 3 |
383 | | bool _is_src_pending_finish_over = false; |
384 | | RuntimeProfile::Counter* _src_pending_finish_over_timer = nullptr; |
385 | | int64_t _src_pending_finish_over_time = 0; |
386 | | // time 4 |
387 | | bool _is_dst_pending_finish_over = false; |
388 | | RuntimeProfile::Counter* _dst_pending_finish_over_timer = nullptr; |
389 | | int64_t _dst_pending_finish_over_time = 0; |
390 | | // time 5 |
391 | | bool _is_close_pipeline = false; |
392 | | RuntimeProfile::Counter* _close_pipeline_timer = nullptr; |
393 | | int64_t _close_pipeline_time = 0; |
394 | | |
395 | | RuntimeProfile::Counter* _pip_task_total_timer = nullptr; |
396 | | |
397 | | private: |
398 | | Operators _operators; // left is _source, right is _root |
399 | | OperatorPtr _source; |
400 | | OperatorPtr _root; |
401 | | OperatorPtr _sink; |
402 | | |
403 | | std::atomic<bool> _running {false}; |
404 | | }; |
405 | | } // namespace doris::pipeline |