Coverage Report

Created: 2025-03-13 11:28

/root/doris/be/src/pipeline/task_scheduler.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "task_scheduler.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/types.pb.h>
23
#include <glog/logging.h>
24
#include <sched.h>
25
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <functional>
29
#include <ostream>
30
#include <string>
31
#include <thread>
32
#include <utility>
33
34
#include "common/logging.h"
35
#include "pipeline/pipeline_task.h"
36
#include "pipeline/pipeline_x/pipeline_x_task.h"
37
#include "pipeline/task_queue.h"
38
#include "pipeline_fragment_context.h"
39
#include "runtime/exec_env.h"
40
#include "runtime/query_context.h"
41
#include "util/debug_util.h"
42
#include "util/sse_util.hpp"
43
#include "util/thread.h"
44
#include "util/threadpool.h"
45
#include "util/time.h"
46
#include "util/uid_util.h"
47
#include "vec/runtime/vdatetime_value.h"
48
49
namespace doris::pipeline {
50
51
BlockedTaskScheduler::BlockedTaskScheduler(std::string name)
52
0
        : _name(std::move(name)), _started(false), _shutdown(false) {}
53
54
0
Status BlockedTaskScheduler::start() {
55
0
    LOG(INFO) << "BlockedTaskScheduler start";
56
0
    RETURN_IF_ERROR(Thread::create(
57
0
            "BlockedTaskScheduler", _name, [this]() { this->_schedule(); }, &_thread));
58
0
    while (!this->_started.load()) {
59
0
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
60
0
    }
61
0
    LOG(INFO) << "BlockedTaskScheduler started";
62
0
    return Status::OK();
63
0
}
64
65
0
void BlockedTaskScheduler::shutdown() {
66
0
    LOG(INFO) << "Start shutdown BlockedTaskScheduler";
67
0
    if (!this->_shutdown) {
68
0
        this->_shutdown = true;
69
0
        if (_thread) {
70
0
            _task_cond.notify_one();
71
0
            _thread->join();
72
0
        }
73
0
    }
74
0
}
75
76
0
Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) {
77
0
    if (this->_shutdown) {
78
0
        return Status::InternalError("BlockedTaskScheduler shutdown");
79
0
    }
80
0
    std::unique_lock<std::mutex> lock(_task_mutex);
81
0
    if (task->is_pipelineX()) {
82
        // put this task into current dependency's blocking queue and wait for event notification
83
        // instead of using a separate BlockedTaskScheduler.
84
0
        task->set_running(false);
85
0
        return Status::OK();
86
0
    }
87
0
    _blocked_tasks.push_back(task);
88
0
    _task_cond.notify_one();
89
0
    task->set_running(false);
90
0
    return Status::OK();
91
0
}
92
93
0
void BlockedTaskScheduler::_schedule() {
94
0
    _started.store(true);
95
0
    std::list<PipelineTask*> local_blocked_tasks;
96
0
    int empty_times = 0;
97
98
0
    while (!_shutdown) {
99
0
        {
100
0
            std::unique_lock<std::mutex> lock(this->_task_mutex);
101
0
            local_blocked_tasks.splice(local_blocked_tasks.end(), _blocked_tasks);
102
0
            if (local_blocked_tasks.empty()) {
103
0
                while (!_shutdown.load() && _blocked_tasks.empty()) {
104
0
                    _task_cond.wait_for(lock, std::chrono::milliseconds(10));
105
0
                }
106
107
0
                if (_shutdown.load()) {
108
0
                    break;
109
0
                }
110
111
0
                DCHECK(!_blocked_tasks.empty());
112
0
                local_blocked_tasks.splice(local_blocked_tasks.end(), _blocked_tasks);
113
0
            }
114
0
        }
115
116
0
        auto origin_local_block_tasks_size = local_blocked_tasks.size();
117
0
        auto iter = local_blocked_tasks.begin();
118
0
        VecDateTimeValue now = VecDateTimeValue::local_time();
119
0
        while (iter != local_blocked_tasks.end()) {
120
0
            auto* task = *iter;
121
0
            auto state = task->get_state();
122
0
            task->log_detail_if_need();
123
0
            if (state == PipelineTaskState::PENDING_FINISH) {
124
                // should cancel or should finish
125
0
                if (task->is_pending_finish()) {
126
0
                    VLOG_DEBUG << "Task pending" << task->debug_string();
127
0
                    iter++;
128
0
                } else {
129
0
                    _make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH);
130
0
                }
131
0
            } else if (task->query_context()->is_cancelled()) {
132
0
                _make_task_run(local_blocked_tasks, iter);
133
0
            } else if (task->query_context()->is_timeout(now)) {
134
0
                LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id())
135
0
                             << ", instance_id=" << print_id(task->instance_id())
136
0
                             << ", task info: " << task->debug_string();
137
138
0
                task->query_context()->cancel("", Status::Cancelled(""));
139
0
                _make_task_run(local_blocked_tasks, iter);
140
0
            } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
141
0
                if (task->has_dependency()) {
142
0
                    iter++;
143
0
                } else {
144
0
                    _make_task_run(local_blocked_tasks, iter);
145
0
                }
146
0
            } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
147
0
                if (task->source_can_read()) {
148
0
                    _make_task_run(local_blocked_tasks, iter);
149
0
                } else {
150
0
                    iter++;
151
0
                }
152
0
            } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
153
0
                if (task->runtime_filters_are_ready_or_timeout()) {
154
0
                    _make_task_run(local_blocked_tasks, iter);
155
0
                } else {
156
0
                    iter++;
157
0
                }
158
0
            } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
159
0
                if (task->sink_can_write()) {
160
0
                    _make_task_run(local_blocked_tasks, iter);
161
0
                } else {
162
0
                    iter++;
163
0
                }
164
0
            } else {
165
                // TODO: DCHECK the state
166
0
                _make_task_run(local_blocked_tasks, iter);
167
0
            }
168
0
        }
169
170
0
        if (origin_local_block_tasks_size == 0 ||
171
0
            local_blocked_tasks.size() == origin_local_block_tasks_size) {
172
0
            empty_times += 1;
173
0
        } else {
174
0
            empty_times = 0;
175
0
        }
176
177
0
        if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) {
178
0
#ifdef __x86_64__
179
0
            _mm_pause();
180
#else
181
            sched_yield();
182
#endif
183
0
        }
184
0
        if (empty_times == EMPTY_TIMES_TO_YIELD * 10) {
185
0
            empty_times = 0;
186
0
            sched_yield();
187
0
        }
188
0
    }
189
0
    LOG(INFO) << "BlockedTaskScheduler schedule thread stop";
190
0
}
191
192
void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
193
                                          std::list<PipelineTask*>::iterator& task_itr,
194
0
                                          PipelineTaskState t_state) {
195
0
    auto* task = *task_itr;
196
0
    task->set_state(t_state);
197
0
    local_tasks.erase(task_itr++);
198
0
    static_cast<void>(task->get_task_queue()->push_back(task));
199
0
}
200
201
0
TaskScheduler::~TaskScheduler() {
202
0
    stop();
203
0
    LOG(INFO) << "Task scheduler " << _name << " shutdown";
204
0
}
205
206
0
Status TaskScheduler::start() {
207
0
    int cores = _task_queue->cores();
208
0
    RETURN_IF_ERROR(ThreadPoolBuilder(_name)
209
0
                            .set_min_threads(cores)
210
0
                            .set_max_threads(cores)
211
0
                            .set_max_queue_size(0)
212
0
                            .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
213
0
                            .build(&_fix_thread_pool));
214
0
    LOG_INFO("TaskScheduler set cores").tag("size", cores);
215
0
    _markers.reserve(cores);
216
0
    for (size_t i = 0; i < cores; ++i) {
217
0
        _markers.push_back(std::make_unique<std::atomic<bool>>(true));
218
0
        RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); }));
219
0
    }
220
0
    return Status::OK();
221
0
}
222
223
0
Status TaskScheduler::schedule_task(PipelineTask* task) {
224
0
    return _task_queue->push_back(task);
225
    // TODO control num of task
226
0
}
227
228
// after _close_task, task maybe destructed.
229
0
void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) {
230
    // Has to attach memory tracker here, because the close task will also release some memory.
231
    // Should count the memory to the query or the query's memory will not decrease when part of
232
    // task finished.
233
0
    SCOPED_ATTACH_TASK(task->runtime_state());
234
0
    if (task->is_finished()) {
235
0
        task->set_running(false);
236
0
        return;
237
0
    }
238
    // close_a_pipeline may delete fragment context and will core in some defer
239
    // code, because the defer code will access fragment context it self.
240
0
    auto lock_for_context = task->fragment_context()->shared_from_this();
241
    // is_pending_finish does not check status, so has to check status in close API.
242
    // For example, in async writer, the writer may failed during dealing with eos_block
243
    // but it does not return error status. Has to check the error status in close API.
244
    // We have already refactor all source and sink api, the close API does not need waiting
245
    // for pending finish now. So that could call close directly.
246
0
    Status status = task->close(exec_status);
247
0
    if (!status.ok() && state != PipelineTaskState::CANCELED) {
248
0
        if (task->is_pipelineX()) { //should call fragment context cancel, in it will call query context cancel
249
0
            task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
250
0
                                             std::string(status.msg()));
251
0
        } else {
252
0
            task->query_context()->cancel(status.to_string(),
253
0
                                          Status::Cancelled(status.to_string()));
254
0
        }
255
0
        state = PipelineTaskState::CANCELED;
256
0
    }
257
0
    task->set_state(state);
258
0
    task->set_close_pipeline_time();
259
0
    task->finalize();
260
0
    task->set_running(false);
261
0
    task->fragment_context()->close_a_pipeline(task->pipeline_id());
262
0
}
263
264
0
void TaskScheduler::_do_work(size_t index) {
265
0
    const auto& marker = _markers[index];
266
0
    while (*marker) {
267
0
        auto* task = _task_queue->take(index);
268
0
        if (!task) {
269
0
            continue;
270
0
        }
271
0
        if (task->is_pipelineX() && task->is_running()) {
272
0
            static_cast<void>(_task_queue->push_back(task, index));
273
0
            continue;
274
0
        }
275
0
        task->log_detail_if_need();
276
0
        task->set_running(true);
277
0
        task->set_task_queue(_task_queue.get());
278
0
        auto* fragment_ctx = task->fragment_context();
279
0
        bool canceled = fragment_ctx->is_canceled();
280
281
0
        auto state = task->get_state();
282
        // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish
283
        // has to return false. The task is finished and need to close now.
284
0
        if (state == PipelineTaskState::PENDING_FINISH) {
285
0
            DCHECK(task->is_pipelineX() || !task->is_pending_finish())
286
0
                    << "must not pending close " << task->debug_string();
287
0
            Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
288
0
            _close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED,
289
0
                        exec_status);
290
0
            continue;
291
0
        }
292
293
0
        DCHECK(state != PipelineTaskState::FINISHED && state != PipelineTaskState::CANCELED)
294
0
                << "task already finish: " << task->debug_string();
295
296
0
        if (canceled) {
297
            // may change from pending FINISH,should called cancel
298
            // also may change form BLOCK, other task called cancel
299
300
            // If pipeline is canceled, it will report after pipeline closed, and will propagate
301
            // errors to downstream through exchange. So, here we needn't send_report.
302
            // fragment_ctx->send_report(true);
303
0
            Status cancel_status = fragment_ctx->get_query_ctx()->exec_status();
304
0
            _close_task(task, PipelineTaskState::CANCELED, cancel_status);
305
0
            continue;
306
0
        }
307
308
0
        if (task->is_pipelineX()) {
309
0
            task->set_state(PipelineTaskState::RUNNABLE);
310
0
        }
311
312
0
        DCHECK(task->is_pipelineX() || task->get_state() == PipelineTaskState::RUNNABLE)
313
0
                << "state:" << get_state_name(task->get_state())
314
0
                << " task: " << task->debug_string();
315
        // task exec
316
0
        bool eos = false;
317
0
        auto status = Status::OK();
318
319
0
        try {
320
            // This will enable exception handling logic in allocator.h when memory allocate
321
            // failed or sysem memory is not sufficient.
322
0
            doris::enable_thread_catch_bad_alloc++;
323
0
            Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};
324
            //TODO: use a better enclose to abstracting these
325
0
            if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
326
0
                TUniqueId query_id = task->query_context()->query_id();
327
0
                std::string task_name = task->task_name();
328
#ifdef __APPLE__
329
                uint32_t core_id = 0;
330
#else
331
0
                uint32_t core_id = sched_getcpu();
332
0
#endif
333
0
                std::thread::id tid = std::this_thread::get_id();
334
0
                uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
335
0
                uint64_t start_time = MonotonicMicros();
336
337
0
                status = task->execute(&eos);
338
339
0
                uint64_t end_time = MonotonicMicros();
340
0
                auto state = task->get_state();
341
0
                std::string state_name =
342
0
                        state == PipelineTaskState::RUNNABLE ? get_state_name(state) : "";
343
0
                ExecEnv::GetInstance()->pipeline_tracer_context()->record(
344
0
                        {query_id, task_name, core_id, thread_id, start_time, end_time,
345
0
                         state_name});
346
0
            } else {
347
0
                status = task->execute(&eos);
348
0
            }
349
0
        } catch (const Exception& e) {
350
0
            status = e.to_status();
351
0
        }
352
353
0
        task->set_previous_core_id(index);
354
355
0
        if (status.is<ErrorCode::END_OF_FILE>()) {
356
            // Sink operator finished, just close task now.
357
0
            _close_task(task, PipelineTaskState::FINISHED, Status::OK());
358
0
            continue;
359
0
        } else if (!status.ok()) {
360
0
            task->set_eos_time();
361
0
            LOG(WARNING) << fmt::format(
362
0
                    "Pipeline task failed. query_id: {} reason: {}",
363
0
                    PrintInstanceStandardInfo(task->query_context()->query_id(),
364
0
                                              task->fragment_context()->get_fragment_instance_id()),
365
0
                    status.to_string());
366
            // Print detail informations below when you debugging here.
367
            //
368
            // LOG(WARNING)<< "task:\n"<<task->debug_string();
369
370
            // exec failed,cancel all fragment instance
371
0
            fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.msg());
372
0
            _close_task(task, PipelineTaskState::CANCELED, status);
373
0
            continue;
374
0
        }
375
0
        fragment_ctx->trigger_report_if_necessary();
376
377
0
        if (eos) {
378
0
            task->set_eos_time();
379
            // TODO: pipeline parallel need to wait the last task finish to call finalize
380
            //  and find_p_dependency
381
0
            VLOG_DEBUG << fmt::format(
382
0
                    "Try close task: {}, fragment_ctx->is_canceled(): {}",
383
0
                    PrintInstanceStandardInfo(task->query_context()->query_id(),
384
0
                                              task->fragment_context()->get_fragment_instance_id()),
385
0
                    fragment_ctx->is_canceled());
386
0
            if (task->is_pipelineX()) {
387
                // is pending finish will add the task to dependency's blocking queue, and then the task will be
388
                // added to running queue when dependency is ready.
389
0
                if (task->is_pending_finish()) {
390
                    // Only meet eos, should set task to PENDING_FINISH state
391
0
                    task->set_state(PipelineTaskState::PENDING_FINISH);
392
0
                    task->set_running(false);
393
0
                } else {
394
                    // Close the task directly?
395
0
                    Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
396
0
                    _close_task(
397
0
                            task,
398
0
                            canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED,
399
0
                            exec_status);
400
0
                }
401
0
            } else {
402
                // Only meet eos, should set task to PENDING_FINISH state
403
                // pipeline is ok, because it will check is pending finish, and if it is ready, it will be invoked.
404
0
                task->set_state(PipelineTaskState::PENDING_FINISH);
405
0
                task->set_running(false);
406
                // After the task is added to the block queue, it maybe run by another thread
407
                // and the task maybe released in the other thread. And will core at
408
                // task set running.
409
0
                static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
410
0
            }
411
0
            continue;
412
0
        }
413
414
0
        auto pipeline_state = task->get_state();
415
0
        switch (pipeline_state) {
416
0
        case PipelineTaskState::BLOCKED_FOR_SOURCE:
417
0
        case PipelineTaskState::BLOCKED_FOR_SINK:
418
0
        case PipelineTaskState::BLOCKED_FOR_RF:
419
0
        case PipelineTaskState::BLOCKED_FOR_DEPENDENCY:
420
0
            static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
421
0
            break;
422
0
        case PipelineTaskState::RUNNABLE:
423
0
            task->set_running(false);
424
0
            static_cast<void>(_task_queue->push_back(task, index));
425
0
            break;
426
0
        default:
427
0
            DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state)
428
0
                          << " task: " << task->debug_string();
429
0
            break;
430
0
        }
431
0
    }
432
0
}
433
434
0
void TaskScheduler::stop() {
435
0
    if (!this->_shutdown.load()) {
436
0
        if (_task_queue) {
437
0
            _task_queue->close();
438
0
        }
439
0
        if (_fix_thread_pool) {
440
0
            for (const auto& marker : _markers) {
441
0
                marker->store(false);
442
0
            }
443
0
            _fix_thread_pool->shutdown();
444
0
            _fix_thread_pool->wait();
445
0
        }
446
        // Should set at the ending of the stop to ensure that the
447
        // pool is stopped. For example, if there are 2 threads call stop
448
        // then if one thread set shutdown = false, then another thread will
449
        // not check it and will free task scheduler.
450
0
        this->_shutdown.store(true);
451
0
    }
452
0
}
453
454
} // namespace doris::pipeline