Coverage Report

Created: 2026-01-04 11:29

/root/doris/be/src/pipeline/task_scheduler.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "task_scheduler.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/types.pb.h>
23
#include <glog/logging.h>
24
#include <sched.h>
25
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <functional>
29
#include <ostream>
30
#include <string>
31
#include <thread>
32
#include <utility>
33
34
#include "common/logging.h"
35
#include "pipeline/pipeline_task.h"
36
#include "pipeline/task_queue.h"
37
#include "pipeline_fragment_context.h"
38
#include "runtime/exec_env.h"
39
#include "runtime/query_context.h"
40
#include "util/thread.h"
41
#include "util/threadpool.h"
42
#include "util/time.h"
43
#include "util/uid_util.h"
44
#include "vec/runtime/vdatetime_value.h"
45
46
namespace doris::pipeline {
47
48
0
TaskScheduler::~TaskScheduler() {
49
0
    stop();
50
0
    LOG(INFO) << "Task scheduler " << _name << " shutdown";
51
0
}
52
53
0
Status TaskScheduler::start() {
54
0
    int cores = _task_queue->cores();
55
0
    RETURN_IF_ERROR(ThreadPoolBuilder(_name)
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
56
0
                            .set_min_threads(cores)
57
0
                            .set_max_threads(cores)
58
0
                            .set_max_queue_size(0)
59
0
                            .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
60
0
                            .build(&_fix_thread_pool));
61
0
    LOG_INFO("TaskScheduler set cores").tag("size", cores);
Line
Count
Source
118
0
#define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO)
62
0
    _markers.resize(cores, true);
63
0
    for (size_t i = 0; i < cores; ++i) {
  Branch (63:24): [True: 0, False: 0]
64
0
        RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); }));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
65
0
    }
66
0
    return Status::OK();
67
0
}
68
69
0
Status TaskScheduler::schedule_task(PipelineTaskSPtr task) {
70
0
    return _task_queue->push_back(task);
71
0
}
72
73
// after _close_task, task maybe destructed.
74
0
void _close_task(PipelineTask* task, Status exec_status, PipelineFragmentContext* ctx) {
75
    // Has to attach memory tracker here, because the close task will also release some memory.
76
    // Should count the memory to the query or the query's memory will not decrease when part of
77
    // task finished.
78
0
    SCOPED_ATTACH_TASK(task->runtime_state());
Line
Count
Source
74
0
    auto VARNAME_LINENUM(scoped_tls_at) = doris::ScopedInitThreadContext()
79
    // is_pending_finish does not check status, so has to check status in close API.
80
    // For example, in async writer, the writer may failed during dealing with eos_block
81
    // but it does not return error status. Has to check the error status in close API.
82
    // We have already refactor all source and sink api, the close API does not need waiting
83
    // for pending finish now. So that could call close directly.
84
0
    Status status = task->close(exec_status);
85
0
    if (!status.ok()) {
  Branch (85:9): [True: 0, False: 0]
86
0
        ctx->cancel(status);
87
0
    }
88
0
    task->finalize();
89
0
    task->set_running(false);
90
0
    ctx->close_a_pipeline(task->pipeline_id());
91
0
}
92
93
0
void TaskScheduler::_do_work(size_t index) {
94
0
    while (_markers[index]) {
  Branch (94:12): [True: 0, False: 0]
95
0
        auto task = _task_queue->take(index);
96
0
        if (!task) {
  Branch (96:13): [True: 0, False: 0]
97
0
            continue;
98
0
        }
99
0
        if (task->is_running()) {
  Branch (99:13): [True: 0, False: 0]
100
0
            static_cast<void>(_task_queue->push_back(task, index));
101
0
            continue;
102
0
        }
103
0
        if (task->is_finalized()) {
  Branch (103:13): [True: 0, False: 0]
104
0
            continue;
105
0
        }
106
0
        auto fragment_context = task->fragment_context().lock();
107
0
        if (!fragment_context) {
  Branch (107:13): [True: 0, False: 0]
108
            // Fragment already finishedquery
109
0
            continue;
110
0
        }
111
0
        task->log_detail_if_need();
112
0
        task->set_running(true);
113
0
        task->set_task_queue(_task_queue.get());
114
0
        bool canceled = fragment_context->is_canceled();
115
116
        // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish
117
        // has to return false. The task is finished and need to close now.
118
0
        if (canceled) {
  Branch (118:13): [True: 0, False: 0]
119
            // may change from pending FINISH,should called cancel
120
            // also may change form BLOCK, other task called cancel
121
122
            // If pipeline is canceled, it will report after pipeline closed, and will propagate
123
            // errors to downstream through exchange. So, here we needn't send_report.
124
            // fragment_ctx->send_report(true);
125
0
            _close_task(task.get(), fragment_context->get_query_ctx()->exec_status(),
126
0
                        fragment_context.get());
127
0
            continue;
128
0
        }
129
130
        // task exec
131
0
        bool eos = false;
132
0
        auto status = Status::OK();
133
134
#ifdef __APPLE__
135
        uint32_t core_id = 0;
136
#else
137
0
        uint32_t core_id = sched_getcpu();
138
0
#endif
139
0
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(
Line
Count
Source
117
0
    do {                                                                                         \
118
0
        try {                                                                                    \
119
0
            doris::enable_thread_catch_bad_alloc++;                                              \
120
0
            Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};                     \
121
0
            { stmt; }                                                                            \
  Branch (121:15): [True: 0, False: 0]
122
0
        } catch (const doris::Exception& e) {                                                    \
123
0
            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {                                \
  Branch (123:17): [True: 0, False: 0]
124
0
                status_ = Status::MemoryLimitExceeded(fmt::format(                               \
125
0
                        "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
126
0
                        e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__));      \
127
0
            } else {                                                                             \
128
0
                status_ = e.to_status();                                                         \
129
0
            }                                                                                    \
130
0
        }                                                                                        \
131
0
    } while (0);
  Branch (131:14): [Folded - Ignored]
140
                //TODO: use a better enclose to abstracting these
141
0
                if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
142
0
                    TUniqueId query_id = fragment_context->get_query_id();
143
0
                    std::string task_name = task->task_name();
144
145
0
                    std::thread::id tid = std::this_thread::get_id();
146
0
                    uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
147
0
                    uint64_t start_time = MonotonicMicros();
148
149
0
                    status = task->execute(&eos);
150
151
0
                    uint64_t end_time = MonotonicMicros();
152
0
                    ExecEnv::GetInstance()->pipeline_tracer_context()->record(
153
0
                            {query_id, task_name, core_id, thread_id, start_time, end_time});
154
0
                } else { status = task->execute(&eos); },
155
0
                status);
156
157
0
        task->set_previous_core_id(index);
158
159
0
        if (!status.ok()) {
  Branch (159:13): [True: 0, False: 0]
160
            // Print detail informations below when you debugging here.
161
            //
162
            // LOG(WARNING)<< "task:\n"<<task->debug_string();
163
164
            // exec failed,cancel all fragment instance
165
0
            fragment_context->cancel(status);
166
0
            LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}",
167
0
                                        print_id(fragment_context->get_query_ctx()->query_id()),
168
0
                                        status.to_string());
169
0
            _close_task(task.get(), status, fragment_context.get());
170
0
            continue;
171
0
        }
172
0
        fragment_context->trigger_report_if_necessary();
173
174
0
        if (eos) {
  Branch (174:13): [True: 0, False: 0]
175
            // is pending finish will add the task to dependency's blocking queue, and then the task will be
176
            // added to running queue when dependency is ready.
177
0
            if (task->is_pending_finish()) {
  Branch (177:17): [True: 0, False: 0]
178
                // Only meet eos, should set task to PENDING_FINISH state
179
0
                task->set_running(false);
180
0
            } else {
181
0
                Status exec_status = fragment_context->get_query_ctx()->exec_status();
182
0
                _close_task(task.get(), exec_status, fragment_context.get());
183
0
            }
184
0
            continue;
185
0
        }
186
187
0
        task->set_running(false);
188
0
    }
189
0
}
190
191
0
void TaskScheduler::stop() {
192
0
    if (!_shutdown) {
  Branch (192:9): [True: 0, False: 0]
193
0
        if (_task_queue) {
  Branch (193:13): [True: 0, False: 0]
194
0
            _task_queue->close();
195
0
        }
196
0
        if (_fix_thread_pool) {
  Branch (196:13): [True: 0, False: 0]
197
0
            for (size_t i = 0; i < _markers.size(); ++i) {
  Branch (197:32): [True: 0, False: 0]
198
0
                _markers[i] = false;
199
0
            }
200
0
            _fix_thread_pool->shutdown();
201
0
            _fix_thread_pool->wait();
202
0
        }
203
        // Should set at the ending of the stop to ensure that the
204
        // pool is stopped. For example, if there are 2 threads call stop
205
        // then if one thread set shutdown = false, then another thread will
206
        // not check it and will free task scheduler.
207
0
        _shutdown = true;
208
0
    }
209
0
}
210
211
} // namespace doris::pipeline