Coverage Report

Created: 2025-04-27 14:25

/root/doris/be/src/pipeline/task_scheduler.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "task_scheduler.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/types.pb.h>
23
#include <glog/logging.h>
24
#include <sched.h>
25
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <functional>
29
#include <ostream>
30
#include <string>
31
#include <thread>
32
#include <utility>
33
34
#include "common/logging.h"
35
#include "pipeline/pipeline_task.h"
36
#include "pipeline/task_queue.h"
37
#include "pipeline_fragment_context.h"
38
#include "runtime/exec_env.h"
39
#include "runtime/query_context.h"
40
#include "util/thread.h"
41
#include "util/threadpool.h"
42
#include "util/time.h"
43
#include "util/uid_util.h"
44
#include "vec/runtime/vdatetime_value.h"
45
46
namespace doris::pipeline {
47
48
0
TaskScheduler::~TaskScheduler() {
49
0
    stop();
50
0
    LOG(INFO) << "Task scheduler " << _name << " shutdown";
51
0
}
52
53
0
Status TaskScheduler::start() {
54
0
    int cores = _task_queue->cores();
55
0
    RETURN_IF_ERROR(ThreadPoolBuilder(_name)
56
0
                            .set_min_threads(cores)
57
0
                            .set_max_threads(cores)
58
0
                            .set_max_queue_size(0)
59
0
                            .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
60
0
                            .build(&_fix_thread_pool));
61
0
    LOG_INFO("TaskScheduler set cores").tag("size", cores);
62
0
    _markers.resize(cores, true);
63
0
    for (size_t i = 0; i < cores; ++i) {
64
0
        RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); }));
65
0
    }
66
0
    return Status::OK();
67
0
}
68
69
0
Status TaskScheduler::schedule_task(PipelineTask* task) {
70
0
    return _task_queue->push_back(task);
71
0
}
72
73
// after _close_task, task maybe destructed.
74
0
void _close_task(PipelineTask* task, Status exec_status) {
75
    // Has to attach memory tracker here, because the close task will also release some memory.
76
    // Should count the memory to the query or the query's memory will not decrease when part of
77
    // task finished.
78
0
    SCOPED_ATTACH_TASK(task->runtime_state());
79
    // close_a_pipeline may delete fragment context and will core in some defer
80
    // code, because the defer code will access fragment context it self.
81
0
    auto lock_for_context = task->fragment_context()->shared_from_this();
82
    // is_pending_finish does not check status, so has to check status in close API.
83
    // For example, in async writer, the writer may failed during dealing with eos_block
84
    // but it does not return error status. Has to check the error status in close API.
85
    // We have already refactor all source and sink api, the close API does not need waiting
86
    // for pending finish now. So that could call close directly.
87
0
    Status status = task->close(exec_status);
88
0
    if (!status.ok()) {
89
0
        task->fragment_context()->cancel(status);
90
0
    }
91
0
    task->finalize();
92
0
    task->set_running(false);
93
0
    task->fragment_context()->close_a_pipeline(task->pipeline_id());
94
0
}
95
96
0
void TaskScheduler::_do_work(size_t index) {
97
0
    while (_markers[index]) {
98
0
        auto* task = _task_queue->take(index);
99
0
        if (!task) {
100
0
            continue;
101
0
        }
102
0
        if (task->is_running()) {
103
0
            static_cast<void>(_task_queue->push_back(task, index));
104
0
            continue;
105
0
        }
106
0
        if (task->is_finalized()) {
107
0
            continue;
108
0
        }
109
0
        task->log_detail_if_need();
110
0
        task->set_running(true);
111
0
        task->set_task_queue(_task_queue.get());
112
0
        auto* fragment_ctx = task->fragment_context();
113
0
        bool canceled = fragment_ctx->is_canceled();
114
115
        // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish
116
        // has to return false. The task is finished and need to close now.
117
0
        if (canceled) {
118
            // may change from pending FINISH,should called cancel
119
            // also may change form BLOCK, other task called cancel
120
121
            // If pipeline is canceled, it will report after pipeline closed, and will propagate
122
            // errors to downstream through exchange. So, here we needn't send_report.
123
            // fragment_ctx->send_report(true);
124
0
            _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
125
0
            continue;
126
0
        }
127
128
        // task exec
129
0
        bool eos = false;
130
0
        auto status = Status::OK();
131
132
#ifdef __APPLE__
133
        uint32_t core_id = 0;
134
#else
135
0
        uint32_t core_id = sched_getcpu();
136
0
#endif
137
0
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(
138
                //TODO: use a better enclose to abstracting these
139
0
                if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
140
0
                    TUniqueId query_id = task->query_context()->query_id();
141
0
                    std::string task_name = task->task_name();
142
143
0
                    std::thread::id tid = std::this_thread::get_id();
144
0
                    uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
145
0
                    uint64_t start_time = MonotonicMicros();
146
147
0
                    status = task->execute(&eos);
148
149
0
                    uint64_t end_time = MonotonicMicros();
150
0
                    ExecEnv::GetInstance()->pipeline_tracer_context()->record(
151
0
                            {query_id, task_name, core_id, thread_id, start_time, end_time});
152
0
                } else { status = task->execute(&eos); },
153
0
                status);
154
155
0
        task->set_previous_core_id(index);
156
157
0
        if (!status.ok()) {
158
            // Print detail informations below when you debugging here.
159
            //
160
            // LOG(WARNING)<< "task:\n"<<task->debug_string();
161
162
            // exec failed,cancel all fragment instance
163
0
            fragment_ctx->cancel(status);
164
0
            LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}",
165
0
                                        print_id(task->query_context()->query_id()),
166
0
                                        status.to_string());
167
0
            _close_task(task, status);
168
0
            continue;
169
0
        }
170
0
        fragment_ctx->trigger_report_if_necessary();
171
172
0
        if (eos) {
173
            // is pending finish will add the task to dependency's blocking queue, and then the task will be
174
            // added to running queue when dependency is ready.
175
0
            if (task->is_pending_finish()) {
176
                // Only meet eos, should set task to PENDING_FINISH state
177
0
                task->set_running(false);
178
0
            } else {
179
0
                Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
180
0
                _close_task(task, exec_status);
181
0
            }
182
0
            continue;
183
0
        }
184
185
0
        task->set_running(false);
186
0
    }
187
0
}
188
189
0
void TaskScheduler::stop() {
190
0
    if (!_shutdown) {
191
0
        if (_task_queue) {
192
0
            _task_queue->close();
193
0
        }
194
0
        if (_fix_thread_pool) {
195
0
            for (size_t i = 0; i < _markers.size(); ++i) {
196
0
                _markers[i] = false;
197
0
            }
198
0
            _fix_thread_pool->shutdown();
199
0
            _fix_thread_pool->wait();
200
0
        }
201
        // Should set at the ending of the stop to ensure that the
202
        // pool is stopped. For example, if there are 2 threads call stop
203
        // then if one thread set shutdown = false, then another thread will
204
        // not check it and will free task scheduler.
205
0
        _shutdown = true;
206
0
    }
207
0
}
208
209
} // namespace doris::pipeline