Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <memory>
22
23
#include "common/be_mock_util.h"
24
#include "common/status.h"
25
#include "exec/scan/scanner_context.h"
26
#include "exec/scan/task_executor/listenable_future.h"
27
#include "exec/scan/task_executor/ticker.h"
28
#include "exec/scan/task_executor/time_sharing/time_sharing_task_executor.h"
29
#include "util/threadpool.h"
30
31
namespace doris {
32
class ExecEnv;
33
34
class Scanner;
35
class Block;
36
37
template <typename T>
38
class BlockingQueue;
39
} // namespace doris
40
41
namespace doris {
42
class ScannerDelegate;
43
class ScanTask;
44
class ScannerContext;
45
class ScannerScheduler;
46
47
struct SimplifiedScanTask {
48
    SimplifiedScanTask() = default;
49
    SimplifiedScanTask(std::function<bool()> scan_func,
50
                       std::shared_ptr<ScannerContext> scanner_context,
51
1.01M
                       std::shared_ptr<ScanTask> scan_task) {
52
1.01M
        this->scan_func = scan_func;
53
1.01M
        this->scanner_context = scanner_context;
54
1.01M
        this->scan_task = scan_task;
55
1.01M
    }
56
57
    std::function<bool()> scan_func;
58
    std::shared_ptr<ScannerContext> scanner_context = nullptr;
59
    std::shared_ptr<ScanTask> scan_task = nullptr;
60
};
61
62
class ScannerSplitRunner : public SplitRunner {
63
public:
64
    ScannerSplitRunner(std::string name, std::function<bool()> scan_func)
65
1.01M
            : _name(std::move(name)), _scan_func(scan_func), _started(false) {}
66
67
2.02M
    Status init() override { return Status::OK(); }
68
69
    Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) override;
70
71
1.01M
    void close(const Status& status) override {}
72
73
0
    std::string get_info() const override { return ""; }
74
75
    bool is_finished() override;
76
77
    Status finished_status() override;
78
79
    bool is_started() const;
80
81
    bool is_auto_reschedule() const override;
82
83
private:
84
    std::string _name;
85
    std::function<bool()> _scan_func;
86
87
    std::atomic<bool> _started;
88
    SharedListenableFuture<Void> _completion_future;
89
};
90
91
// Abstract interface for scan scheduler
92
93
// Responsible for the scheduling and execution of all Scanners of a BE node.
94
// Execution thread pool
95
//     When a ScannerContext is launched, it will submit the running scanners to this scheduler.
96
//     The scheduling thread will submit the running scanner and its ScannerContext
97
//     to the execution thread pool to do the actual scan task.
98
//     Each Scanner will act as a producer, read the next block and put it into
99
//     the corresponding block queue.
100
//     The corresponding ScanNode will act as a consumer to consume blocks from the block queue.
101
//     After the block is consumed, the unfinished scanner will resubmit to this scheduler.
102
103
class ScannerScheduler {
104
public:
105
49
    virtual ~ScannerScheduler() {}
106
107
    Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
108
109
    static int default_local_scan_thread_num();
110
111
    static int default_remote_scan_thread_num();
112
113
    static int get_remote_scan_thread_queue_size();
114
115
    static int default_min_active_scan_threads();
116
117
    static int default_min_active_file_scan_threads();
118
119
    virtual Status start(int max_thread_num, int min_thread_num, int queue_size,
120
                         int min_active_scan_threads) = 0;
121
    virtual void stop() = 0;
122
    virtual Status submit_scan_task(SimplifiedScanTask scan_task) = 0;
123
    virtual Status submit_scan_task(SimplifiedScanTask scan_task,
124
                                    const std::string& task_id_string) = 0;
125
126
    virtual void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
127
                                  int min_active_scan_threads) = 0;
128
230k
    int get_min_active_scan_threads() const { return _min_active_scan_threads; }
129
130
    virtual int get_queue_size() = 0;
131
    virtual int get_active_threads() = 0;
132
    virtual std::vector<int> thread_debug_info() = 0;
133
134
    virtual Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
135
                                      std::shared_ptr<ScanTask> current_scan_task,
136
                                      std::unique_lock<std::mutex>& transfer_lock) = 0;
137
138
protected:
139
    int _min_active_scan_threads;
140
141
private:
142
    static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
143
                              std::shared_ptr<ScanTask> scan_task);
144
145
    static void _make_sure_virtual_col_is_materialized(const std::shared_ptr<Scanner>& scanner,
146
                                                       Block* block);
147
};
148
149
class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerScheduler {
150
public:
151
    ThreadPoolSimplifiedScanScheduler(std::string sched_name,
152
                                      std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl,
153
                                      std::string workload_group = "system")
154
23
            : _is_stop(false),
155
23
              _cgroup_cpu_ctl(cgroup_cpu_ctl),
156
23
              _sched_name(sched_name),
157
23
              _workload_group(workload_group) {}
158
159
23
    ~ThreadPoolSimplifiedScanScheduler() override {
160
23
#ifndef BE_TEST
161
23
        stop();
162
23
#endif
163
23
        LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
164
23
    }
165
166
0
    void stop() override {
167
0
        _is_stop.store(true);
168
0
        _scan_thread_pool->shutdown();
169
0
        _scan_thread_pool->wait();
170
0
    }
171
172
    Status start(int max_thread_num, int min_thread_num, int queue_size,
173
0
                 int min_active_scan_threads) override {
174
0
        _min_active_scan_threads = min_active_scan_threads;
175
0
        RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group)
176
0
                                .set_min_threads(min_thread_num)
177
0
                                .set_max_threads(max_thread_num)
178
0
                                .set_max_queue_size(queue_size)
179
0
                                .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
180
0
                                .build(&_scan_thread_pool));
181
0
        return Status::OK();
182
0
    }
183
184
0
    Status submit_scan_task(SimplifiedScanTask scan_task) override {
185
0
        if (!_is_stop) {
186
0
            return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
187
0
        } else {
188
0
            return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
189
0
        }
190
0
    }
191
192
    Status submit_scan_task(SimplifiedScanTask scan_task,
193
0
                            const std::string& task_id_string) override {
194
0
        return submit_scan_task(scan_task);
195
0
    }
196
197
    void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
198
0
                          int min_active_scan_threads) override {
199
0
        _min_active_scan_threads = min_active_scan_threads;
200
0
        int cur_max_thread_num = _scan_thread_pool->max_threads();
201
0
        int cur_min_thread_num = _scan_thread_pool->min_threads();
202
0
        if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == new_min_thread_num) {
203
0
            return;
204
0
        }
205
0
        if (new_max_thread_num >= cur_max_thread_num) {
206
0
            Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num);
207
0
            if (!st_max.ok()) {
208
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
209
0
                             << st_max.to_string();
210
0
            }
211
0
            Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num);
212
0
            if (!st_min.ok()) {
213
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
214
0
                             << st_min.to_string();
215
0
            }
216
0
        } else {
217
0
            Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num);
218
0
            if (!st_min.ok()) {
219
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
220
0
                             << st_min.to_string();
221
0
            }
222
0
            Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num);
223
0
            if (!st_max.ok()) {
224
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
225
0
                             << st_max.to_string();
226
0
            }
227
0
        }
228
0
    }
229
230
0
    int get_queue_size() override { return _scan_thread_pool->get_queue_size(); }
231
232
0
    int get_active_threads() override { return _scan_thread_pool->num_active_threads(); }
233
234
0
    std::vector<int> thread_debug_info() override { return _scan_thread_pool->debug_info(); }
235
236
    Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
237
                              std::shared_ptr<ScanTask> current_scan_task,
238
                              std::unique_lock<std::mutex>& transfer_lock) override;
239
240
private:
241
    std::unique_ptr<ThreadPool> _scan_thread_pool;
242
    std::atomic<bool> _is_stop;
243
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
244
    std::string _sched_name;
245
    std::string _workload_group;
246
    std::shared_mutex _lock;
247
};
248
249
class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler {
250
public:
251
    TaskExecutorSimplifiedScanScheduler(std::string sched_name,
252
                                        std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl,
253
                                        std::string workload_group = "system")
254
70
            : _is_stop(false),
255
70
              _cgroup_cpu_ctl(cgroup_cpu_ctl),
256
70
              _sched_name(sched_name),
257
70
              _workload_group(workload_group) {}
258
259
26
    ~TaskExecutorSimplifiedScanScheduler() override {
260
26
#ifndef BE_TEST
261
26
        stop();
262
26
#endif
263
26
        LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
264
26
    }
265
266
52
    void stop() override {
267
52
        _is_stop.store(true);
268
52
        _task_executor->stop();
269
52
        _task_executor->wait();
270
52
    }
271
272
    Status start(int max_thread_num, int min_thread_num, int queue_size,
273
70
                 int min_active_scan_threads) override {
274
70
        _min_active_scan_threads = min_active_scan_threads;
275
70
        TimeSharingTaskExecutor::ThreadConfig thread_config;
276
70
        thread_config.thread_name = _sched_name;
277
70
        thread_config.workload_group = _workload_group;
278
70
        thread_config.max_thread_num = max_thread_num;
279
70
        thread_config.min_thread_num = min_thread_num;
280
70
        thread_config.max_queue_size = queue_size;
281
70
        thread_config.cgroup_cpu_ctl = _cgroup_cpu_ctl;
282
70
        _task_executor = TimeSharingTaskExecutor::create_shared(
283
70
                thread_config, max_thread_num * 2, config::task_executor_min_concurrency_per_task,
284
70
                config::task_executor_max_concurrency_per_task > 0
285
70
                        ? config::task_executor_max_concurrency_per_task
286
70
                        : std::numeric_limits<int>::max(),
287
70
                std::make_shared<SystemTicker>(), nullptr, false);
288
70
        RETURN_IF_ERROR(_task_executor->init());
289
70
        RETURN_IF_ERROR(_task_executor->start());
290
70
        return Status::OK();
291
70
    }
292
293
1.01M
    Status submit_scan_task(SimplifiedScanTask scan_task) override {
294
1.01M
        if (!_is_stop) {
295
1.01M
            std::shared_ptr<SplitRunner> split_runner;
296
1.01M
            if (scan_task.scan_task->is_first_schedule) {
297
1.01M
                split_runner = std::make_shared<ScannerSplitRunner>("scanner_split_runner",
298
1.01M
                                                                    scan_task.scan_func);
299
1.01M
                RETURN_IF_ERROR(split_runner->init());
300
1.01M
                auto result = _task_executor->enqueue_splits(
301
1.01M
                        scan_task.scanner_context->task_handle(), false, {split_runner});
302
1.01M
                if (!result.has_value()) {
303
0
                    LOG(WARNING) << "enqueue_splits failed: " << result.error();
304
0
                    return result.error();
305
0
                }
306
1.01M
                scan_task.scan_task->is_first_schedule = false;
307
1.01M
            } else {
308
4.64k
                split_runner = scan_task.scan_task->split_runner.lock();
309
4.64k
                if (split_runner == nullptr) {
310
0
                    return Status::OK();
311
0
                }
312
4.64k
                RETURN_IF_ERROR(_task_executor->re_enqueue_split(
313
4.64k
                        scan_task.scanner_context->task_handle(), false, split_runner));
314
4.64k
            }
315
1.01M
            scan_task.scan_task->split_runner = split_runner;
316
1.01M
            return Status::OK();
317
1.01M
        } else {
318
0
            return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
319
0
        }
320
1.01M
    }
321
322
    // A task has only one split. When the split is created, the task is created according to the task_id,
323
    // and the task is automatically removed when the split ends.
324
    // Now it is only for PInternalService::multiget_data_v2 used by TopN materialization.
325
    Status submit_scan_task(SimplifiedScanTask scan_task,
326
956
                            const std::string& task_id_string) override {
327
956
        if (!_is_stop) {
328
956
            TaskId task_id(task_id_string);
329
956
            std::shared_ptr<TaskHandle> task_handle = DORIS_TRY(_task_executor->create_task(
330
956
                    task_id, []() { return 0.0; },
331
956
                    config::task_executor_initial_max_concurrency_per_task > 0
332
956
                            ? config::task_executor_initial_max_concurrency_per_task
333
956
                            : std::max(48, CpuInfo::num_cores() * 2),
334
956
                    std::chrono::milliseconds(100), std::nullopt));
335
336
956
            auto wrapped_scan_func = [this, task_handle, scan_func = scan_task.scan_func]() {
337
956
                bool result = scan_func();
338
956
                if (result) {
339
956
                    static_cast<void>(_task_executor->remove_task(task_handle));
340
956
                }
341
956
                return result;
342
956
            };
343
344
956
            auto split_runner =
345
956
                    std::make_shared<ScannerSplitRunner>("scanner_split_runner", wrapped_scan_func);
346
956
            RETURN_IF_ERROR(split_runner->init());
347
348
956
            auto result = _task_executor->enqueue_splits(task_handle, false, {split_runner});
349
956
            if (!result.has_value()) {
350
0
                LOG(WARNING) << "enqueue_splits failed: " << result.error();
351
0
                return result.error();
352
0
            }
353
956
            return Status::OK();
354
956
        } else {
355
0
            return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
356
0
        }
357
956
    }
358
359
    void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
360
11.6k
                          int min_active_scan_threads) override {
361
11.6k
        _min_active_scan_threads = min_active_scan_threads;
362
11.6k
        auto task_executor =
363
11.6k
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
364
11.6k
        int cur_max_thread_num = task_executor->max_threads();
365
11.6k
        int cur_min_thread_num = task_executor->min_threads();
366
11.6k
        if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == new_min_thread_num) {
367
11.6k
            return;
368
11.6k
        }
369
3
        if (new_max_thread_num >= cur_max_thread_num) {
370
0
            Status st_max = task_executor->set_max_threads(new_max_thread_num);
371
0
            if (!st_max.ok()) {
372
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
373
0
                             << st_max.to_string();
374
0
            }
375
0
            Status st_min = task_executor->set_min_threads(new_min_thread_num);
376
0
            if (!st_min.ok()) {
377
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
378
0
                             << st_min.to_string();
379
0
            }
380
3
        } else {
381
3
            Status st_min = task_executor->set_min_threads(new_min_thread_num);
382
3
            if (!st_min.ok()) {
383
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
384
0
                             << st_min.to_string();
385
0
            }
386
3
            Status st_max = task_executor->set_max_threads(new_max_thread_num);
387
3
            if (!st_max.ok()) {
388
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
389
0
                             << st_max.to_string();
390
0
            }
391
3
        }
392
3
    }
393
394
1.24M
    int get_queue_size() override {
395
1.24M
        auto task_executor =
396
1.24M
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
397
1.24M
        return task_executor->get_queue_size();
398
1.24M
    }
399
400
1.24M
    int get_active_threads() override {
401
1.24M
        auto task_executor =
402
1.24M
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
403
1.24M
        return task_executor->num_active_threads();
404
1.24M
    }
405
406
74
    std::vector<int> thread_debug_info() override {
407
74
        auto task_executor =
408
74
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
409
74
        return task_executor->debug_info();
410
74
    }
411
412
464k
    std::shared_ptr<TaskExecutor> task_executor() const { return _task_executor; }
413
414
    Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
415
                              std::shared_ptr<ScanTask> current_scan_task,
416
                              std::unique_lock<std::mutex>& transfer_lock) override;
417
418
private:
419
    std::atomic<bool> _is_stop;
420
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
421
    std::string _sched_name;
422
    std::string _workload_group;
423
    std::shared_mutex _lock;
424
    std::shared_ptr<TaskExecutor> _task_executor = nullptr;
425
};
426
427
} // namespace doris