Coverage Report

Created: 2024-11-20 16:51

/root/doris/be/src/pipeline/task_scheduler.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "task_scheduler.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/types.pb.h>
23
#include <glog/logging.h>
24
#include <sched.h>
25
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <functional>
29
#include <ostream>
30
#include <string>
31
#include <thread>
32
#include <utility>
33
34
#include "common/logging.h"
35
#include "pipeline/pipeline_task.h"
36
#include "pipeline/task_queue.h"
37
#include "pipeline_fragment_context.h"
38
#include "runtime/exec_env.h"
39
#include "runtime/query_context.h"
40
#include "util/thread.h"
41
#include "util/threadpool.h"
42
#include "util/time.h"
43
#include "util/uid_util.h"
44
#include "vec/runtime/vdatetime_value.h"
45
46
namespace doris::pipeline {
47
#include "common/compile_check_begin.h"
48
0
TaskScheduler::~TaskScheduler() {
49
0
    stop();
50
0
    LOG(INFO) << "Task scheduler " << _name << " shutdown";
51
0
}
52
53
0
Status TaskScheduler::start() {
54
0
    int cores = _task_queue.cores();
55
0
    RETURN_IF_ERROR(ThreadPoolBuilder(_name)
56
0
                            .set_min_threads(cores)
57
0
                            .set_max_threads(cores)
58
0
                            .set_max_queue_size(0)
59
0
                            .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
60
0
                            .build(&_fix_thread_pool));
61
0
    LOG_INFO("TaskScheduler set cores").tag("size", cores);
62
0
    _markers.resize(cores, true);
63
0
    for (int i = 0; i < cores; ++i) {
64
0
        RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); }));
65
0
    }
66
0
    return Status::OK();
67
0
}
68
69
0
Status TaskScheduler::schedule_task(PipelineTask* task) {
70
0
    return _task_queue.push_back(task);
71
0
}
72
73
// after _close_task, task maybe destructed.
74
0
void _close_task(PipelineTask* task, Status exec_status) {
75
    // Has to attach memory tracker here, because the close task will also release some memory.
76
    // Should count the memory to the query or the query's memory will not decrease when part of
77
    // task finished.
78
0
    SCOPED_ATTACH_TASK(task->runtime_state());
79
0
    if (task->is_finalized()) {
80
0
        task->set_running(false);
81
0
        return;
82
0
    }
83
    // close_a_pipeline may delete fragment context and will core in some defer
84
    // code, because the defer code will access fragment context it self.
85
0
    auto lock_for_context = task->fragment_context()->shared_from_this();
86
    // is_pending_finish does not check status, so has to check status in close API.
87
    // For example, in async writer, the writer may failed during dealing with eos_block
88
    // but it does not return error status. Has to check the error status in close API.
89
    // We have already refactor all source and sink api, the close API does not need waiting
90
    // for pending finish now. So that could call close directly.
91
0
    Status status = task->close(exec_status);
92
0
    if (!status.ok()) {
93
0
        task->fragment_context()->cancel(status);
94
0
    }
95
0
    task->finalize();
96
0
    task->set_running(false);
97
0
    task->fragment_context()->close_a_pipeline(task->pipeline_id());
98
0
}
99
100
0
void TaskScheduler::_do_work(int index) {
101
0
    while (_markers[index]) {
102
0
        auto* task = _task_queue.take(index);
103
0
        if (!task) {
104
0
            continue;
105
0
        }
106
0
        if (task->is_running()) {
107
0
            static_cast<void>(_task_queue.push_back(task, index));
108
0
            continue;
109
0
        }
110
0
        task->log_detail_if_need();
111
0
        task->set_running(true);
112
0
        task->set_task_queue(&_task_queue);
113
0
        auto* fragment_ctx = task->fragment_context();
114
0
        bool canceled = fragment_ctx->is_canceled();
115
116
        // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish
117
        // has to return false. The task is finished and need to close now.
118
0
        if (canceled) {
119
            // may change from pending FINISH,should called cancel
120
            // also may change form BLOCK, other task called cancel
121
122
            // If pipeline is canceled, it will report after pipeline closed, and will propagate
123
            // errors to downstream through exchange. So, here we needn't send_report.
124
            // fragment_ctx->send_report(true);
125
0
            _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
126
0
            continue;
127
0
        }
128
129
        // task exec
130
0
        bool eos = false;
131
0
        auto status = Status::OK();
132
133
#ifdef __APPLE__
134
        uint32_t core_id = 0;
135
#else
136
0
        uint32_t core_id = sched_getcpu();
137
0
#endif
138
0
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(
139
                //TODO: use a better enclose to abstracting these
140
0
                if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
141
0
                    TUniqueId query_id = task->query_context()->query_id();
142
0
                    std::string task_name = task->task_name();
143
144
0
                    std::thread::id tid = std::this_thread::get_id();
145
0
                    uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
146
0
                    uint64_t start_time = MonotonicMicros();
147
148
0
                    status = task->execute(&eos);
149
150
0
                    uint64_t end_time = MonotonicMicros();
151
0
                    ExecEnv::GetInstance()->pipeline_tracer_context()->record(
152
0
                            {query_id, task_name, core_id, thread_id, start_time, end_time});
153
0
                } else { status = task->execute(&eos); },
154
0
                status);
155
156
0
        task->set_previous_core_id(index);
157
158
0
        if (!status.ok()) {
159
            // Print detail informations below when you debugging here.
160
            //
161
            // LOG(WARNING)<< "task:\n"<<task->debug_string();
162
163
            // exec failed,cancel all fragment instance
164
0
            fragment_ctx->cancel(status);
165
0
            LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}",
166
0
                                        print_id(task->query_context()->query_id()),
167
0
                                        status.to_string());
168
0
            _close_task(task, status);
169
0
            continue;
170
0
        }
171
0
        fragment_ctx->trigger_report_if_necessary();
172
173
0
        if (eos) {
174
            // is pending finish will add the task to dependency's blocking queue, and then the task will be
175
            // added to running queue when dependency is ready.
176
0
            if (task->is_pending_finish()) {
177
                // Only meet eos, should set task to PENDING_FINISH state
178
0
                task->set_running(false);
179
0
            } else {
180
0
                Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
181
0
                _close_task(task, exec_status);
182
0
            }
183
0
            continue;
184
0
        }
185
186
0
        task->set_running(false);
187
0
    }
188
0
}
189
190
0
void TaskScheduler::stop() {
191
0
    if (!_shutdown) {
192
0
        _task_queue.close();
193
0
        if (_fix_thread_pool) {
194
0
            for (size_t i = 0; i < _markers.size(); ++i) {
195
0
                _markers[i] = false;
196
0
            }
197
0
            _fix_thread_pool->shutdown();
198
0
            _fix_thread_pool->wait();
199
0
        }
200
        // Should set at the ending of the stop to ensure that the
201
        // pool is stopped. For example, if there are 2 threads call stop
202
        // then if one thread set shutdown = false, then another thread will
203
        // not check it and will free task scheduler.
204
0
        _shutdown = true;
205
0
    }
206
0
}
207
208
} // namespace doris::pipeline