/root/doris/be/src/pipeline/task_scheduler.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "task_scheduler.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <gen_cpp/types.pb.h> |
23 | | #include <glog/logging.h> |
24 | | #include <sched.h> |
25 | | |
26 | | // IWYU pragma: no_include <bits/chrono.h> |
27 | | #include <chrono> // IWYU pragma: keep |
28 | | #include <functional> |
29 | | #include <ostream> |
30 | | #include <string> |
31 | | #include <thread> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/logging.h" |
35 | | #include "pipeline/pipeline_task.h" |
36 | | #include "pipeline/pipeline_x/pipeline_x_task.h" |
37 | | #include "pipeline/task_queue.h" |
38 | | #include "pipeline_fragment_context.h" |
39 | | #include "runtime/exec_env.h" |
40 | | #include "runtime/query_context.h" |
41 | | #include "util/debug_util.h" |
42 | | #include "util/sse_util.hpp" |
43 | | #include "util/thread.h" |
44 | | #include "util/threadpool.h" |
45 | | #include "util/time.h" |
46 | | #include "util/uid_util.h" |
47 | | #include "vec/runtime/vdatetime_value.h" |
48 | | |
49 | | namespace doris::pipeline { |
50 | | |
51 | | BlockedTaskScheduler::BlockedTaskScheduler(std::string name) |
52 | 0 | : _name(std::move(name)), _started(false), _shutdown(false) {} |
53 | | |
54 | 0 | Status BlockedTaskScheduler::start() { |
55 | 0 | LOG(INFO) << "BlockedTaskScheduler start"; |
56 | 0 | RETURN_IF_ERROR(Thread::create( |
57 | 0 | "BlockedTaskScheduler", _name, [this]() { this->_schedule(); }, &_thread)); |
58 | 0 | while (!this->_started.load()) { |
59 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(5)); |
60 | 0 | } |
61 | 0 | LOG(INFO) << "BlockedTaskScheduler started"; |
62 | 0 | return Status::OK(); |
63 | 0 | } |
64 | | |
65 | 0 | void BlockedTaskScheduler::shutdown() { |
66 | 0 | LOG(INFO) << "Start shutdown BlockedTaskScheduler"; |
67 | 0 | if (!this->_shutdown) { |
68 | 0 | this->_shutdown = true; |
69 | 0 | if (_thread) { |
70 | 0 | _task_cond.notify_one(); |
71 | 0 | _thread->join(); |
72 | 0 | } |
73 | 0 | } |
74 | 0 | } |
75 | | |
76 | 0 | Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) { |
77 | 0 | if (this->_shutdown) { |
78 | 0 | return Status::InternalError("BlockedTaskScheduler shutdown"); |
79 | 0 | } |
80 | 0 | std::unique_lock<std::mutex> lock(_task_mutex); |
81 | 0 | if (task->is_pipelineX()) { |
82 | | // put this task into current dependency's blocking queue and wait for event notification |
83 | | // instead of using a separate BlockedTaskScheduler. |
84 | 0 | task->set_running(false); |
85 | 0 | return Status::OK(); |
86 | 0 | } |
87 | 0 | _blocked_tasks.push_back(task); |
88 | 0 | _task_cond.notify_one(); |
89 | 0 | task->set_running(false); |
90 | 0 | return Status::OK(); |
91 | 0 | } |
92 | | |
93 | 0 | void BlockedTaskScheduler::_schedule() { |
94 | 0 | _started.store(true); |
95 | 0 | std::list<PipelineTask*> local_blocked_tasks; |
96 | 0 | int empty_times = 0; |
97 | |
|
98 | 0 | while (!_shutdown) { |
99 | 0 | { |
100 | 0 | std::unique_lock<std::mutex> lock(this->_task_mutex); |
101 | 0 | local_blocked_tasks.splice(local_blocked_tasks.end(), _blocked_tasks); |
102 | 0 | if (local_blocked_tasks.empty()) { |
103 | 0 | while (!_shutdown.load() && _blocked_tasks.empty()) { |
104 | 0 | _task_cond.wait_for(lock, std::chrono::milliseconds(10)); |
105 | 0 | } |
106 | |
|
107 | 0 | if (_shutdown.load()) { |
108 | 0 | break; |
109 | 0 | } |
110 | | |
111 | 0 | DCHECK(!_blocked_tasks.empty()); |
112 | 0 | local_blocked_tasks.splice(local_blocked_tasks.end(), _blocked_tasks); |
113 | 0 | } |
114 | 0 | } |
115 | | |
116 | 0 | auto origin_local_block_tasks_size = local_blocked_tasks.size(); |
117 | 0 | auto iter = local_blocked_tasks.begin(); |
118 | 0 | VecDateTimeValue now = VecDateTimeValue::local_time(); |
119 | 0 | while (iter != local_blocked_tasks.end()) { |
120 | 0 | auto* task = *iter; |
121 | 0 | auto state = task->get_state(); |
122 | 0 | task->log_detail_if_need(); |
123 | 0 | if (state == PipelineTaskState::PENDING_FINISH) { |
124 | | // should cancel or should finish |
125 | 0 | if (task->is_pending_finish()) { |
126 | 0 | VLOG_DEBUG << "Task pending" << task->debug_string(); |
127 | 0 | iter++; |
128 | 0 | } else { |
129 | 0 | _make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH); |
130 | 0 | } |
131 | 0 | } else if (task->query_context()->is_cancelled()) { |
132 | 0 | _make_task_run(local_blocked_tasks, iter); |
133 | 0 | } else if (task->query_context()->is_timeout(now)) { |
134 | 0 | LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id()) |
135 | 0 | << ", instance_id=" << print_id(task->instance_id()) |
136 | 0 | << ", task info: " << task->debug_string(); |
137 | |
|
138 | 0 | task->query_context()->cancel("", Status::Cancelled("")); |
139 | 0 | _make_task_run(local_blocked_tasks, iter); |
140 | 0 | } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) { |
141 | 0 | if (task->has_dependency()) { |
142 | 0 | iter++; |
143 | 0 | } else { |
144 | 0 | _make_task_run(local_blocked_tasks, iter); |
145 | 0 | } |
146 | 0 | } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) { |
147 | 0 | if (task->source_can_read()) { |
148 | 0 | _make_task_run(local_blocked_tasks, iter); |
149 | 0 | } else { |
150 | 0 | iter++; |
151 | 0 | } |
152 | 0 | } else if (state == PipelineTaskState::BLOCKED_FOR_RF) { |
153 | 0 | if (task->runtime_filters_are_ready_or_timeout()) { |
154 | 0 | _make_task_run(local_blocked_tasks, iter); |
155 | 0 | } else { |
156 | 0 | iter++; |
157 | 0 | } |
158 | 0 | } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) { |
159 | 0 | if (task->sink_can_write()) { |
160 | 0 | _make_task_run(local_blocked_tasks, iter); |
161 | 0 | } else { |
162 | 0 | iter++; |
163 | 0 | } |
164 | 0 | } else { |
165 | | // TODO: DCHECK the state |
166 | 0 | _make_task_run(local_blocked_tasks, iter); |
167 | 0 | } |
168 | 0 | } |
169 | |
|
170 | 0 | if (origin_local_block_tasks_size == 0 || |
171 | 0 | local_blocked_tasks.size() == origin_local_block_tasks_size) { |
172 | 0 | empty_times += 1; |
173 | 0 | } else { |
174 | 0 | empty_times = 0; |
175 | 0 | } |
176 | |
|
177 | 0 | if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) { |
178 | 0 | #ifdef __x86_64__ |
179 | 0 | _mm_pause(); |
180 | | #else |
181 | | sched_yield(); |
182 | | #endif |
183 | 0 | } |
184 | 0 | if (empty_times == EMPTY_TIMES_TO_YIELD * 10) { |
185 | 0 | empty_times = 0; |
186 | 0 | sched_yield(); |
187 | 0 | } |
188 | 0 | } |
189 | 0 | LOG(INFO) << "BlockedTaskScheduler schedule thread stop"; |
190 | 0 | } |
191 | | |
192 | | void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks, |
193 | | std::list<PipelineTask*>::iterator& task_itr, |
194 | 0 | PipelineTaskState t_state) { |
195 | 0 | auto* task = *task_itr; |
196 | 0 | task->set_state(t_state); |
197 | 0 | local_tasks.erase(task_itr++); |
198 | 0 | static_cast<void>(task->get_task_queue()->push_back(task)); |
199 | 0 | } |
200 | | |
201 | 0 | TaskScheduler::~TaskScheduler() { |
202 | 0 | stop(); |
203 | 0 | LOG(INFO) << "Task scheduler " << _name << " shutdown"; |
204 | 0 | } |
205 | | |
206 | 0 | Status TaskScheduler::start() { |
207 | 0 | int cores = _task_queue->cores(); |
208 | 0 | RETURN_IF_ERROR(ThreadPoolBuilder(_name) |
209 | 0 | .set_min_threads(cores) |
210 | 0 | .set_max_threads(cores) |
211 | 0 | .set_max_queue_size(0) |
212 | 0 | .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) |
213 | 0 | .build(&_fix_thread_pool)); |
214 | 0 | LOG_INFO("TaskScheduler set cores").tag("size", cores); |
215 | 0 | _markers.reserve(cores); |
216 | 0 | for (size_t i = 0; i < cores; ++i) { |
217 | 0 | _markers.push_back(std::make_unique<std::atomic<bool>>(true)); |
218 | 0 | RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); |
219 | 0 | } |
220 | 0 | return Status::OK(); |
221 | 0 | } |
222 | | |
223 | 0 | Status TaskScheduler::schedule_task(PipelineTask* task) { |
224 | 0 | return _task_queue->push_back(task); |
225 | | // TODO control num of task |
226 | 0 | } |
227 | | |
228 | | // after _close_task, task maybe destructed. |
229 | 0 | void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { |
230 | | // Has to attach memory tracker here, because the close task will also release some memory. |
231 | | // Should count the memory to the query or the query's memory will not decrease when part of |
232 | | // task finished. |
233 | 0 | SCOPED_ATTACH_TASK(task->runtime_state()); |
234 | 0 | if (task->is_finished()) { |
235 | 0 | task->set_running(false); |
236 | 0 | return; |
237 | 0 | } |
238 | | // close_a_pipeline may delete fragment context and will core in some defer |
239 | | // code, because the defer code will access fragment context it self. |
240 | 0 | auto lock_for_context = task->fragment_context()->shared_from_this(); |
241 | | // is_pending_finish does not check status, so has to check status in close API. |
242 | | // For example, in async writer, the writer may failed during dealing with eos_block |
243 | | // but it does not return error status. Has to check the error status in close API. |
244 | | // We have already refactor all source and sink api, the close API does not need waiting |
245 | | // for pending finish now. So that could call close directly. |
246 | 0 | Status status = task->close(exec_status); |
247 | 0 | if (!status.ok() && state != PipelineTaskState::CANCELED) { |
248 | 0 | if (task->is_pipelineX()) { //should call fragment context cancel, in it will call query context cancel |
249 | 0 | task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, |
250 | 0 | std::string(status.msg())); |
251 | 0 | } else { |
252 | 0 | task->query_context()->cancel(status.to_string(), |
253 | 0 | Status::Cancelled(status.to_string())); |
254 | 0 | } |
255 | 0 | state = PipelineTaskState::CANCELED; |
256 | 0 | } |
257 | 0 | task->set_state(state); |
258 | 0 | task->set_close_pipeline_time(); |
259 | 0 | task->finalize(); |
260 | 0 | task->set_running(false); |
261 | 0 | task->fragment_context()->close_a_pipeline(task->pipeline_id()); |
262 | 0 | } |
263 | | |
264 | 0 | void TaskScheduler::_do_work(size_t index) { |
265 | 0 | const auto& marker = _markers[index]; |
266 | 0 | while (*marker) { |
267 | 0 | auto* task = _task_queue->take(index); |
268 | 0 | if (!task) { |
269 | 0 | continue; |
270 | 0 | } |
271 | 0 | if (task->is_pipelineX() && task->is_running()) { |
272 | 0 | static_cast<void>(_task_queue->push_back(task, index)); |
273 | 0 | continue; |
274 | 0 | } |
275 | 0 | task->log_detail_if_need(); |
276 | 0 | task->set_running(true); |
277 | 0 | task->set_task_queue(_task_queue.get()); |
278 | 0 | auto* fragment_ctx = task->fragment_context(); |
279 | 0 | bool canceled = fragment_ctx->is_canceled(); |
280 | |
|
281 | 0 | auto state = task->get_state(); |
282 | | // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish |
283 | | // has to return false. The task is finished and need to close now. |
284 | 0 | if (state == PipelineTaskState::PENDING_FINISH) { |
285 | 0 | DCHECK(task->is_pipelineX() || !task->is_pending_finish()) |
286 | 0 | << "must not pending close " << task->debug_string(); |
287 | 0 | Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); |
288 | 0 | _close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, |
289 | 0 | exec_status); |
290 | 0 | continue; |
291 | 0 | } |
292 | | |
293 | 0 | DCHECK(state != PipelineTaskState::FINISHED && state != PipelineTaskState::CANCELED) |
294 | 0 | << "task already finish: " << task->debug_string(); |
295 | |
|
296 | 0 | if (canceled) { |
297 | | // may change from pending FINISH,should called cancel |
298 | | // also may change form BLOCK, other task called cancel |
299 | | |
300 | | // If pipeline is canceled, it will report after pipeline closed, and will propagate |
301 | | // errors to downstream through exchange. So, here we needn't send_report. |
302 | | // fragment_ctx->send_report(true); |
303 | 0 | Status cancel_status = fragment_ctx->get_query_ctx()->exec_status(); |
304 | 0 | _close_task(task, PipelineTaskState::CANCELED, cancel_status); |
305 | 0 | continue; |
306 | 0 | } |
307 | | |
308 | 0 | if (task->is_pipelineX()) { |
309 | 0 | task->set_state(PipelineTaskState::RUNNABLE); |
310 | 0 | } |
311 | |
|
312 | 0 | DCHECK(task->is_pipelineX() || task->get_state() == PipelineTaskState::RUNNABLE) |
313 | 0 | << "state:" << get_state_name(task->get_state()) |
314 | 0 | << " task: " << task->debug_string(); |
315 | | // task exec |
316 | 0 | bool eos = false; |
317 | 0 | auto status = Status::OK(); |
318 | |
|
319 | 0 | try { |
320 | | // This will enable exception handling logic in allocator.h when memory allocate |
321 | | // failed or sysem memory is not sufficient. |
322 | 0 | doris::enable_thread_catch_bad_alloc++; |
323 | 0 | Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; |
324 | | //TODO: use a better enclose to abstracting these |
325 | 0 | if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { |
326 | 0 | TUniqueId query_id = task->query_context()->query_id(); |
327 | 0 | std::string task_name = task->task_name(); |
328 | | #ifdef __APPLE__ |
329 | | uint32_t core_id = 0; |
330 | | #else |
331 | 0 | uint32_t core_id = sched_getcpu(); |
332 | 0 | #endif |
333 | 0 | std::thread::id tid = std::this_thread::get_id(); |
334 | 0 | uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid); |
335 | 0 | uint64_t start_time = MonotonicMicros(); |
336 | |
|
337 | 0 | status = task->execute(&eos); |
338 | |
|
339 | 0 | uint64_t end_time = MonotonicMicros(); |
340 | 0 | auto state = task->get_state(); |
341 | 0 | std::string state_name = |
342 | 0 | state == PipelineTaskState::RUNNABLE ? get_state_name(state) : ""; |
343 | 0 | ExecEnv::GetInstance()->pipeline_tracer_context()->record( |
344 | 0 | {query_id, task_name, core_id, thread_id, start_time, end_time, |
345 | 0 | state_name}); |
346 | 0 | } else { |
347 | 0 | status = task->execute(&eos); |
348 | 0 | } |
349 | 0 | } catch (const Exception& e) { |
350 | 0 | status = e.to_status(); |
351 | 0 | } |
352 | |
|
353 | 0 | task->set_previous_core_id(index); |
354 | |
|
355 | 0 | if (status.is<ErrorCode::END_OF_FILE>()) { |
356 | | // Sink operator finished, just close task now. |
357 | 0 | _close_task(task, PipelineTaskState::FINISHED, Status::OK()); |
358 | 0 | continue; |
359 | 0 | } else if (!status.ok()) { |
360 | 0 | task->set_eos_time(); |
361 | 0 | LOG(WARNING) << fmt::format( |
362 | 0 | "Pipeline task failed. query_id: {} reason: {}", |
363 | 0 | PrintInstanceStandardInfo(task->query_context()->query_id(), |
364 | 0 | task->fragment_context()->get_fragment_instance_id()), |
365 | 0 | status.to_string()); |
366 | | // Print detail informations below when you debugging here. |
367 | | // |
368 | | // LOG(WARNING)<< "task:\n"<<task->debug_string(); |
369 | | |
370 | | // exec failed,cancel all fragment instance |
371 | 0 | fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.msg()); |
372 | 0 | _close_task(task, PipelineTaskState::CANCELED, status); |
373 | 0 | continue; |
374 | 0 | } |
375 | 0 | fragment_ctx->trigger_report_if_necessary(); |
376 | |
|
377 | 0 | if (eos) { |
378 | 0 | task->set_eos_time(); |
379 | | // TODO: pipeline parallel need to wait the last task finish to call finalize |
380 | | // and find_p_dependency |
381 | 0 | VLOG_DEBUG << fmt::format( |
382 | 0 | "Try close task: {}, fragment_ctx->is_canceled(): {}", |
383 | 0 | PrintInstanceStandardInfo(task->query_context()->query_id(), |
384 | 0 | task->fragment_context()->get_fragment_instance_id()), |
385 | 0 | fragment_ctx->is_canceled()); |
386 | 0 | if (task->is_pipelineX()) { |
387 | | // is pending finish will add the task to dependency's blocking queue, and then the task will be |
388 | | // added to running queue when dependency is ready. |
389 | 0 | if (task->is_pending_finish()) { |
390 | | // Only meet eos, should set task to PENDING_FINISH state |
391 | 0 | task->set_state(PipelineTaskState::PENDING_FINISH); |
392 | 0 | task->set_running(false); |
393 | 0 | } else { |
394 | | // Close the task directly? |
395 | 0 | Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); |
396 | 0 | _close_task( |
397 | 0 | task, |
398 | 0 | canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, |
399 | 0 | exec_status); |
400 | 0 | } |
401 | 0 | } else { |
402 | | // Only meet eos, should set task to PENDING_FINISH state |
403 | | // pipeline is ok, because it will check is pending finish, and if it is ready, it will be invoked. |
404 | 0 | task->set_state(PipelineTaskState::PENDING_FINISH); |
405 | 0 | task->set_running(false); |
406 | | // After the task is added to the block queue, it maybe run by another thread |
407 | | // and the task maybe released in the other thread. And will core at |
408 | | // task set running. |
409 | 0 | static_cast<void>(_blocked_task_scheduler->add_blocked_task(task)); |
410 | 0 | } |
411 | 0 | continue; |
412 | 0 | } |
413 | | |
414 | 0 | auto pipeline_state = task->get_state(); |
415 | 0 | switch (pipeline_state) { |
416 | 0 | case PipelineTaskState::BLOCKED_FOR_SOURCE: |
417 | 0 | case PipelineTaskState::BLOCKED_FOR_SINK: |
418 | 0 | case PipelineTaskState::BLOCKED_FOR_RF: |
419 | 0 | case PipelineTaskState::BLOCKED_FOR_DEPENDENCY: |
420 | 0 | static_cast<void>(_blocked_task_scheduler->add_blocked_task(task)); |
421 | 0 | break; |
422 | 0 | case PipelineTaskState::RUNNABLE: |
423 | 0 | task->set_running(false); |
424 | 0 | static_cast<void>(_task_queue->push_back(task, index)); |
425 | 0 | break; |
426 | 0 | default: |
427 | 0 | DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state) |
428 | 0 | << " task: " << task->debug_string(); |
429 | 0 | break; |
430 | 0 | } |
431 | 0 | } |
432 | 0 | } |
433 | | |
434 | 0 | void TaskScheduler::stop() { |
435 | 0 | if (!this->_shutdown.load()) { |
436 | 0 | if (_task_queue) { |
437 | 0 | _task_queue->close(); |
438 | 0 | } |
439 | 0 | if (_fix_thread_pool) { |
440 | 0 | for (const auto& marker : _markers) { |
441 | 0 | marker->store(false); |
442 | 0 | } |
443 | 0 | _fix_thread_pool->shutdown(); |
444 | 0 | _fix_thread_pool->wait(); |
445 | 0 | } |
446 | | // Should set at the ending of the stop to ensure that the |
447 | | // pool is stopped. For example, if there are 2 threads call stop |
448 | | // then if one thread set shutdown = false, then another thread will |
449 | | // not check it and will free task scheduler. |
450 | 0 | this->_shutdown.store(true); |
451 | 0 | } |
452 | 0 | } |
453 | | |
454 | | } // namespace doris::pipeline |