be/src/exec/scan/scanner_scheduler.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <atomic> |
21 | | #include <memory> |
22 | | |
23 | | #include "common/be_mock_util.h" |
24 | | #include "common/status.h" |
25 | | #include "exec/scan/scanner_context.h" |
26 | | #include "exec/scan/task_executor/listenable_future.h" |
27 | | #include "exec/scan/task_executor/ticker.h" |
28 | | #include "exec/scan/task_executor/time_sharing/time_sharing_task_executor.h" |
29 | | #include "util/threadpool.h" |
30 | | |
31 | | namespace doris { |
32 | | class ExecEnv; |
33 | | |
34 | | class Scanner; |
35 | | class Block; |
36 | | |
37 | | template <typename T> |
38 | | class BlockingQueue; |
39 | | } // namespace doris |
40 | | |
41 | | namespace doris { |
42 | | class ScannerDelegate; |
43 | | class ScanTask; |
44 | | class ScannerContext; |
45 | | class ScannerScheduler; |
46 | | |
47 | | struct SimplifiedScanTask { |
48 | | SimplifiedScanTask() = default; |
49 | | SimplifiedScanTask(std::function<bool()> scan_func, |
50 | | std::shared_ptr<ScannerContext> scanner_context, |
51 | 1.01M | std::shared_ptr<ScanTask> scan_task) { |
52 | 1.01M | this->scan_func = scan_func; |
53 | 1.01M | this->scanner_context = scanner_context; |
54 | 1.01M | this->scan_task = scan_task; |
55 | 1.01M | } |
56 | | |
57 | | std::function<bool()> scan_func; |
58 | | std::shared_ptr<ScannerContext> scanner_context = nullptr; |
59 | | std::shared_ptr<ScanTask> scan_task = nullptr; |
60 | | }; |
61 | | |
62 | | class ScannerSplitRunner : public SplitRunner { |
63 | | public: |
64 | | ScannerSplitRunner(std::string name, std::function<bool()> scan_func) |
65 | 1.01M | : _name(std::move(name)), _scan_func(scan_func), _started(false) {} |
66 | | |
67 | 2.02M | Status init() override { return Status::OK(); } |
68 | | |
69 | | Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) override; |
70 | | |
71 | 1.01M | void close(const Status& status) override {} |
72 | | |
73 | 0 | std::string get_info() const override { return ""; } |
74 | | |
75 | | bool is_finished() override; |
76 | | |
77 | | Status finished_status() override; |
78 | | |
79 | | bool is_started() const; |
80 | | |
81 | | bool is_auto_reschedule() const override; |
82 | | |
83 | | private: |
84 | | std::string _name; |
85 | | std::function<bool()> _scan_func; |
86 | | |
87 | | std::atomic<bool> _started; |
88 | | SharedListenableFuture<Void> _completion_future; |
89 | | }; |
90 | | |
91 | | // Abstract interface for scan scheduler |
92 | | |
93 | | // Responsible for the scheduling and execution of all Scanners of a BE node. |
94 | | // Execution thread pool |
95 | | // When a ScannerContext is launched, it will submit the running scanners to this scheduler. |
96 | | // The scheduling thread will submit the running scanner and its ScannerContext |
97 | | // to the execution thread pool to do the actual scan task. |
98 | | // Each Scanner will act as a producer, read the next block and put it into |
99 | | // the corresponding block queue. |
100 | | // The corresponding ScanNode will act as a consumer to consume blocks from the block queue. |
101 | | // After the block is consumed, the unfinished scanner will resubmit to this scheduler. |
102 | | |
103 | | class ScannerScheduler { |
104 | | public: |
105 | 49 | virtual ~ScannerScheduler() {} |
106 | | |
107 | | Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task); |
108 | | |
109 | | static int default_local_scan_thread_num(); |
110 | | |
111 | | static int default_remote_scan_thread_num(); |
112 | | |
113 | | static int get_remote_scan_thread_queue_size(); |
114 | | |
115 | | static int default_min_active_scan_threads(); |
116 | | |
117 | | static int default_min_active_file_scan_threads(); |
118 | | |
119 | | virtual Status start(int max_thread_num, int min_thread_num, int queue_size, |
120 | | int min_active_scan_threads) = 0; |
121 | | virtual void stop() = 0; |
122 | | virtual Status submit_scan_task(SimplifiedScanTask scan_task) = 0; |
123 | | virtual Status submit_scan_task(SimplifiedScanTask scan_task, |
124 | | const std::string& task_id_string) = 0; |
125 | | |
126 | | virtual void reset_thread_num(int new_max_thread_num, int new_min_thread_num, |
127 | | int min_active_scan_threads) = 0; |
128 | 230k | int get_min_active_scan_threads() const { return _min_active_scan_threads; } |
129 | | |
130 | | virtual int get_queue_size() = 0; |
131 | | virtual int get_active_threads() = 0; |
132 | | virtual std::vector<int> thread_debug_info() = 0; |
133 | | |
134 | | virtual Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx, |
135 | | std::shared_ptr<ScanTask> current_scan_task, |
136 | | std::unique_lock<std::mutex>& transfer_lock) = 0; |
137 | | |
138 | | protected: |
139 | | int _min_active_scan_threads; |
140 | | |
141 | | private: |
142 | | static void _scanner_scan(std::shared_ptr<ScannerContext> ctx, |
143 | | std::shared_ptr<ScanTask> scan_task); |
144 | | |
145 | | static void _make_sure_virtual_col_is_materialized(const std::shared_ptr<Scanner>& scanner, |
146 | | Block* block); |
147 | | }; |
148 | | |
149 | | class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerScheduler { |
150 | | public: |
151 | | ThreadPoolSimplifiedScanScheduler(std::string sched_name, |
152 | | std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl, |
153 | | std::string workload_group = "system") |
154 | 23 | : _is_stop(false), |
155 | 23 | _cgroup_cpu_ctl(cgroup_cpu_ctl), |
156 | 23 | _sched_name(sched_name), |
157 | 23 | _workload_group(workload_group) {} |
158 | | |
159 | 23 | ~ThreadPoolSimplifiedScanScheduler() override { |
160 | 23 | #ifndef BE_TEST |
161 | 23 | stop(); |
162 | 23 | #endif |
163 | 23 | LOG(INFO) << "Scanner sche " << _sched_name << " shutdown"; |
164 | 23 | } |
165 | | |
166 | 0 | void stop() override { |
167 | 0 | _is_stop.store(true); |
168 | 0 | _scan_thread_pool->shutdown(); |
169 | 0 | _scan_thread_pool->wait(); |
170 | 0 | } |
171 | | |
172 | | Status start(int max_thread_num, int min_thread_num, int queue_size, |
173 | 0 | int min_active_scan_threads) override { |
174 | 0 | _min_active_scan_threads = min_active_scan_threads; |
175 | 0 | RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group) |
176 | 0 | .set_min_threads(min_thread_num) |
177 | 0 | .set_max_threads(max_thread_num) |
178 | 0 | .set_max_queue_size(queue_size) |
179 | 0 | .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) |
180 | 0 | .build(&_scan_thread_pool)); |
181 | 0 | return Status::OK(); |
182 | 0 | } |
183 | | |
184 | 0 | Status submit_scan_task(SimplifiedScanTask scan_task) override { |
185 | 0 | if (!_is_stop) { |
186 | 0 | return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); |
187 | 0 | } else { |
188 | 0 | return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name); |
189 | 0 | } |
190 | 0 | } |
191 | | |
192 | | Status submit_scan_task(SimplifiedScanTask scan_task, |
193 | 0 | const std::string& task_id_string) override { |
194 | 0 | return submit_scan_task(scan_task); |
195 | 0 | } |
196 | | |
197 | | void reset_thread_num(int new_max_thread_num, int new_min_thread_num, |
198 | 0 | int min_active_scan_threads) override { |
199 | 0 | _min_active_scan_threads = min_active_scan_threads; |
200 | 0 | int cur_max_thread_num = _scan_thread_pool->max_threads(); |
201 | 0 | int cur_min_thread_num = _scan_thread_pool->min_threads(); |
202 | 0 | if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == new_min_thread_num) { |
203 | 0 | return; |
204 | 0 | } |
205 | 0 | if (new_max_thread_num >= cur_max_thread_num) { |
206 | 0 | Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num); |
207 | 0 | if (!st_max.ok()) { |
208 | 0 | LOG(WARNING) << "Failed to set max threads for scan thread pool: " |
209 | 0 | << st_max.to_string(); |
210 | 0 | } |
211 | 0 | Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num); |
212 | 0 | if (!st_min.ok()) { |
213 | 0 | LOG(WARNING) << "Failed to set min threads for scan thread pool: " |
214 | 0 | << st_min.to_string(); |
215 | 0 | } |
216 | 0 | } else { |
217 | 0 | Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num); |
218 | 0 | if (!st_min.ok()) { |
219 | 0 | LOG(WARNING) << "Failed to set min threads for scan thread pool: " |
220 | 0 | << st_min.to_string(); |
221 | 0 | } |
222 | 0 | Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num); |
223 | 0 | if (!st_max.ok()) { |
224 | 0 | LOG(WARNING) << "Failed to set max threads for scan thread pool: " |
225 | 0 | << st_max.to_string(); |
226 | 0 | } |
227 | 0 | } |
228 | 0 | } |
229 | | |
230 | 0 | int get_queue_size() override { return _scan_thread_pool->get_queue_size(); } |
231 | | |
232 | 0 | int get_active_threads() override { return _scan_thread_pool->num_active_threads(); } |
233 | | |
234 | 0 | std::vector<int> thread_debug_info() override { return _scan_thread_pool->debug_info(); } |
235 | | |
236 | | Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx, |
237 | | std::shared_ptr<ScanTask> current_scan_task, |
238 | | std::unique_lock<std::mutex>& transfer_lock) override; |
239 | | |
240 | | private: |
241 | | std::unique_ptr<ThreadPool> _scan_thread_pool; |
242 | | std::atomic<bool> _is_stop; |
243 | | std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; |
244 | | std::string _sched_name; |
245 | | std::string _workload_group; |
246 | | std::shared_mutex _lock; |
247 | | }; |
248 | | |
249 | | class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler { |
250 | | public: |
251 | | TaskExecutorSimplifiedScanScheduler(std::string sched_name, |
252 | | std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl, |
253 | | std::string workload_group = "system") |
254 | 70 | : _is_stop(false), |
255 | 70 | _cgroup_cpu_ctl(cgroup_cpu_ctl), |
256 | 70 | _sched_name(sched_name), |
257 | 70 | _workload_group(workload_group) {} |
258 | | |
259 | 26 | ~TaskExecutorSimplifiedScanScheduler() override { |
260 | 26 | #ifndef BE_TEST |
261 | 26 | stop(); |
262 | 26 | #endif |
263 | 26 | LOG(INFO) << "Scanner sche " << _sched_name << " shutdown"; |
264 | 26 | } |
265 | | |
266 | 52 | void stop() override { |
267 | 52 | _is_stop.store(true); |
268 | 52 | _task_executor->stop(); |
269 | 52 | _task_executor->wait(); |
270 | 52 | } |
271 | | |
272 | | Status start(int max_thread_num, int min_thread_num, int queue_size, |
273 | 70 | int min_active_scan_threads) override { |
274 | 70 | _min_active_scan_threads = min_active_scan_threads; |
275 | 70 | TimeSharingTaskExecutor::ThreadConfig thread_config; |
276 | 70 | thread_config.thread_name = _sched_name; |
277 | 70 | thread_config.workload_group = _workload_group; |
278 | 70 | thread_config.max_thread_num = max_thread_num; |
279 | 70 | thread_config.min_thread_num = min_thread_num; |
280 | 70 | thread_config.max_queue_size = queue_size; |
281 | 70 | thread_config.cgroup_cpu_ctl = _cgroup_cpu_ctl; |
282 | 70 | _task_executor = TimeSharingTaskExecutor::create_shared( |
283 | 70 | thread_config, max_thread_num * 2, config::task_executor_min_concurrency_per_task, |
284 | 70 | config::task_executor_max_concurrency_per_task > 0 |
285 | 70 | ? config::task_executor_max_concurrency_per_task |
286 | 70 | : std::numeric_limits<int>::max(), |
287 | 70 | std::make_shared<SystemTicker>(), nullptr, false); |
288 | 70 | RETURN_IF_ERROR(_task_executor->init()); |
289 | 70 | RETURN_IF_ERROR(_task_executor->start()); |
290 | 70 | return Status::OK(); |
291 | 70 | } |
292 | | |
293 | 1.01M | Status submit_scan_task(SimplifiedScanTask scan_task) override { |
294 | 1.01M | if (!_is_stop) { |
295 | 1.01M | std::shared_ptr<SplitRunner> split_runner; |
296 | 1.01M | if (scan_task.scan_task->is_first_schedule) { |
297 | 1.01M | split_runner = std::make_shared<ScannerSplitRunner>("scanner_split_runner", |
298 | 1.01M | scan_task.scan_func); |
299 | 1.01M | RETURN_IF_ERROR(split_runner->init()); |
300 | 1.01M | auto result = _task_executor->enqueue_splits( |
301 | 1.01M | scan_task.scanner_context->task_handle(), false, {split_runner}); |
302 | 1.01M | if (!result.has_value()) { |
303 | 0 | LOG(WARNING) << "enqueue_splits failed: " << result.error(); |
304 | 0 | return result.error(); |
305 | 0 | } |
306 | 1.01M | scan_task.scan_task->is_first_schedule = false; |
307 | 1.01M | } else { |
308 | 4.64k | split_runner = scan_task.scan_task->split_runner.lock(); |
309 | 4.64k | if (split_runner == nullptr) { |
310 | 0 | return Status::OK(); |
311 | 0 | } |
312 | 4.64k | RETURN_IF_ERROR(_task_executor->re_enqueue_split( |
313 | 4.64k | scan_task.scanner_context->task_handle(), false, split_runner)); |
314 | 4.64k | } |
315 | 1.01M | scan_task.scan_task->split_runner = split_runner; |
316 | 1.01M | return Status::OK(); |
317 | 1.01M | } else { |
318 | 0 | return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name); |
319 | 0 | } |
320 | 1.01M | } |
321 | | |
322 | | // A task has only one split. When the split is created, the task is created according to the task_id, |
323 | | // and the task is automatically removed when the split ends. |
324 | | // Now it is only for PInternalService::multiget_data_v2 used by TopN materialization. |
325 | | Status submit_scan_task(SimplifiedScanTask scan_task, |
326 | 956 | const std::string& task_id_string) override { |
327 | 956 | if (!_is_stop) { |
328 | 956 | TaskId task_id(task_id_string); |
329 | 956 | std::shared_ptr<TaskHandle> task_handle = DORIS_TRY(_task_executor->create_task( |
330 | 956 | task_id, []() { return 0.0; }, |
331 | 956 | config::task_executor_initial_max_concurrency_per_task > 0 |
332 | 956 | ? config::task_executor_initial_max_concurrency_per_task |
333 | 956 | : std::max(48, CpuInfo::num_cores() * 2), |
334 | 956 | std::chrono::milliseconds(100), std::nullopt)); |
335 | | |
336 | 956 | auto wrapped_scan_func = [this, task_handle, scan_func = scan_task.scan_func]() { |
337 | 956 | bool result = scan_func(); |
338 | 956 | if (result) { |
339 | 956 | static_cast<void>(_task_executor->remove_task(task_handle)); |
340 | 956 | } |
341 | 956 | return result; |
342 | 956 | }; |
343 | | |
344 | 956 | auto split_runner = |
345 | 956 | std::make_shared<ScannerSplitRunner>("scanner_split_runner", wrapped_scan_func); |
346 | 956 | RETURN_IF_ERROR(split_runner->init()); |
347 | | |
348 | 956 | auto result = _task_executor->enqueue_splits(task_handle, false, {split_runner}); |
349 | 956 | if (!result.has_value()) { |
350 | 0 | LOG(WARNING) << "enqueue_splits failed: " << result.error(); |
351 | 0 | return result.error(); |
352 | 0 | } |
353 | 956 | return Status::OK(); |
354 | 956 | } else { |
355 | 0 | return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name); |
356 | 0 | } |
357 | 956 | } |
358 | | |
359 | | void reset_thread_num(int new_max_thread_num, int new_min_thread_num, |
360 | 11.6k | int min_active_scan_threads) override { |
361 | 11.6k | _min_active_scan_threads = min_active_scan_threads; |
362 | 11.6k | auto task_executor = |
363 | 11.6k | std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor); |
364 | 11.6k | int cur_max_thread_num = task_executor->max_threads(); |
365 | 11.6k | int cur_min_thread_num = task_executor->min_threads(); |
366 | 11.6k | if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == new_min_thread_num) { |
367 | 11.6k | return; |
368 | 11.6k | } |
369 | 3 | if (new_max_thread_num >= cur_max_thread_num) { |
370 | 0 | Status st_max = task_executor->set_max_threads(new_max_thread_num); |
371 | 0 | if (!st_max.ok()) { |
372 | 0 | LOG(WARNING) << "Failed to set max threads for scan thread pool: " |
373 | 0 | << st_max.to_string(); |
374 | 0 | } |
375 | 0 | Status st_min = task_executor->set_min_threads(new_min_thread_num); |
376 | 0 | if (!st_min.ok()) { |
377 | 0 | LOG(WARNING) << "Failed to set min threads for scan thread pool: " |
378 | 0 | << st_min.to_string(); |
379 | 0 | } |
380 | 3 | } else { |
381 | 3 | Status st_min = task_executor->set_min_threads(new_min_thread_num); |
382 | 3 | if (!st_min.ok()) { |
383 | 0 | LOG(WARNING) << "Failed to set min threads for scan thread pool: " |
384 | 0 | << st_min.to_string(); |
385 | 0 | } |
386 | 3 | Status st_max = task_executor->set_max_threads(new_max_thread_num); |
387 | 3 | if (!st_max.ok()) { |
388 | 0 | LOG(WARNING) << "Failed to set max threads for scan thread pool: " |
389 | 0 | << st_max.to_string(); |
390 | 0 | } |
391 | 3 | } |
392 | 3 | } |
393 | | |
394 | 1.24M | int get_queue_size() override { |
395 | 1.24M | auto task_executor = |
396 | 1.24M | std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor); |
397 | 1.24M | return task_executor->get_queue_size(); |
398 | 1.24M | } |
399 | | |
400 | 1.24M | int get_active_threads() override { |
401 | 1.24M | auto task_executor = |
402 | 1.24M | std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor); |
403 | 1.24M | return task_executor->num_active_threads(); |
404 | 1.24M | } |
405 | | |
406 | 74 | std::vector<int> thread_debug_info() override { |
407 | 74 | auto task_executor = |
408 | 74 | std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor); |
409 | 74 | return task_executor->debug_info(); |
410 | 74 | } |
411 | | |
412 | 464k | std::shared_ptr<TaskExecutor> task_executor() const { return _task_executor; } |
413 | | |
414 | | Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx, |
415 | | std::shared_ptr<ScanTask> current_scan_task, |
416 | | std::unique_lock<std::mutex>& transfer_lock) override; |
417 | | |
418 | | private: |
419 | | std::atomic<bool> _is_stop; |
420 | | std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; |
421 | | std::string _sched_name; |
422 | | std::string _workload_group; |
423 | | std::shared_mutex _lock; |
424 | | std::shared_ptr<TaskExecutor> _task_executor = nullptr; |
425 | | }; |
426 | | |
427 | | } // namespace doris |