Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <memory>
22
23
#include "common/be_mock_util.h"
24
#include "common/status.h"
25
#include "exec/scan/scanner_context.h"
26
#include "exec/scan/task_executor/listenable_future.h"
27
#include "exec/scan/task_executor/ticker.h"
28
#include "exec/scan/task_executor/time_sharing/time_sharing_task_executor.h"
29
#include "util/threadpool.h"
30
31
namespace doris {
32
class ExecEnv;
33
34
class Scanner;
35
class Block;
36
37
template <typename T>
38
class BlockingQueue;
39
} // namespace doris
40
41
namespace doris {
42
class ScannerDelegate;
43
class ScanTask;
44
class ScannerContext;
45
class ScannerScheduler;
46
47
struct SimplifiedScanTask {
48
    SimplifiedScanTask() = default;
49
    SimplifiedScanTask(std::function<bool()> scan_func,
50
                       std::shared_ptr<ScannerContext> scanner_context,
51
418k
                       std::shared_ptr<ScanTask> scan_task) {
52
418k
        this->scan_func = scan_func;
53
418k
        this->scanner_context = scanner_context;
54
418k
        this->scan_task = scan_task;
55
418k
    }
56
57
    std::function<bool()> scan_func;
58
    std::shared_ptr<ScannerContext> scanner_context = nullptr;
59
    std::shared_ptr<ScanTask> scan_task = nullptr;
60
};
61
62
class ScannerSplitRunner : public SplitRunner {
63
public:
64
    ScannerSplitRunner(std::string name, std::function<bool()> scan_func)
65
412k
            : _name(std::move(name)), _scan_func(scan_func), _started(false) {}
66
67
825k
    Status init() override { return Status::OK(); }
68
69
    Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) override;
70
71
412k
    void close(const Status& status) override {}
72
73
0
    std::string get_info() const override { return ""; }
74
75
    bool is_finished() override;
76
77
    Status finished_status() override;
78
79
    bool is_started() const;
80
81
    bool is_auto_reschedule() const override;
82
83
private:
84
    std::string _name;
85
    std::function<bool()> _scan_func;
86
87
    std::atomic<bool> _started;
88
    SharedListenableFuture<Void> _completion_future;
89
};
90
91
// Abstract interface for scan scheduler
92
93
// Responsible for the scheduling and execution of all Scanners of a BE node.
94
// Execution thread pool
95
//     When a ScannerContext is launched, it will submit the running scanners to this scheduler.
96
//     The scheduling thread will submit the running scanner and its ScannerContext
97
//     to the execution thread pool to do the actual scan task.
98
//     Each Scanner will act as a producer, read the next block and put it into
99
//     the corresponding block queue.
100
//     The corresponding ScanNode will act as a consumer to consume blocks from the block queue.
101
//     After the block is consumed, the unfinished scanner will resubmit to this scheduler.
102
103
class ScannerScheduler {
104
public:
105
47
    virtual ~ScannerScheduler() {}
106
107
    Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
108
109
    static int default_local_scan_thread_num();
110
111
    static int default_remote_scan_thread_num();
112
113
    static int get_remote_scan_thread_queue_size();
114
115
    static int default_min_active_scan_threads();
116
117
    static int default_min_active_file_scan_threads();
118
119
    virtual Status start(int max_thread_num, int min_thread_num, int queue_size,
120
                         int min_active_scan_threads) = 0;
121
    virtual void stop() = 0;
122
    virtual Status submit_scan_task(SimplifiedScanTask scan_task) = 0;
123
    virtual Status submit_scan_task(SimplifiedScanTask scan_task,
124
                                    const std::string& task_id_string) = 0;
125
126
    virtual void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
127
                                  int min_active_scan_threads) = 0;
128
81.9k
    int get_min_active_scan_threads() const { return _min_active_scan_threads; }
129
130
    virtual int get_queue_size() = 0;
131
    virtual int get_active_threads() = 0;
132
    virtual std::vector<int> thread_debug_info() = 0;
133
134
    virtual Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
135
                                      std::shared_ptr<ScanTask> current_scan_task,
136
                                      std::unique_lock<std::mutex>& transfer_lock) = 0;
137
138
protected:
139
    int _min_active_scan_threads;
140
141
private:
142
    static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
143
                              std::shared_ptr<ScanTask> scan_task);
144
145
    static void _make_sure_virtual_col_is_materialized(const std::shared_ptr<Scanner>& scanner,
146
                                                       Block* block);
147
};
148
149
class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public ScannerScheduler {
150
public:
151
    ThreadPoolSimplifiedScanScheduler(std::string sched_name,
152
                                      std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl,
153
                                      std::string workload_group = "system")
154
23
            : _is_stop(false),
155
23
              _cgroup_cpu_ctl(cgroup_cpu_ctl),
156
23
              _sched_name(sched_name),
157
23
              _workload_group(workload_group) {}
158
159
23
    ~ThreadPoolSimplifiedScanScheduler() override {
160
23
#ifndef BE_TEST
161
23
        stop();
162
23
#endif
163
23
        LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
164
23
    }
165
166
0
    void stop() override {
167
0
        _is_stop.store(true);
168
0
        _scan_thread_pool->shutdown();
169
0
        _scan_thread_pool->wait();
170
0
    }
171
172
    Status start(int max_thread_num, int min_thread_num, int queue_size,
173
0
                 int min_active_scan_threads) override {
174
0
        _min_active_scan_threads = min_active_scan_threads;
175
0
        RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group)
176
0
                                .set_min_threads(min_thread_num)
177
0
                                .set_max_threads(max_thread_num)
178
0
                                .set_max_queue_size(queue_size)
179
0
                                .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
180
0
                                .build(&_scan_thread_pool));
181
0
        return Status::OK();
182
0
    }
183
184
0
    Status submit_scan_task(SimplifiedScanTask scan_task) override {
185
0
        if (!_is_stop) {
186
0
            return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
187
0
        } else {
188
0
            return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
189
0
        }
190
0
    }
191
192
    Status submit_scan_task(SimplifiedScanTask scan_task,
193
0
                            const std::string& task_id_string) override {
194
0
        return submit_scan_task(scan_task);
195
0
    }
196
197
    void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
198
0
                          int min_active_scan_threads) override {
199
0
        _min_active_scan_threads = min_active_scan_threads;
200
0
        int cur_max_thread_num = _scan_thread_pool->max_threads();
201
0
        int cur_min_thread_num = _scan_thread_pool->min_threads();
202
0
        if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == new_min_thread_num) {
203
0
            return;
204
0
        }
205
0
        if (new_max_thread_num >= cur_max_thread_num) {
206
0
            Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num);
207
0
            if (!st_max.ok()) {
208
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
209
0
                             << st_max.to_string();
210
0
            }
211
0
            Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num);
212
0
            if (!st_min.ok()) {
213
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
214
0
                             << st_min.to_string();
215
0
            }
216
0
        } else {
217
0
            Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num);
218
0
            if (!st_min.ok()) {
219
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
220
0
                             << st_min.to_string();
221
0
            }
222
0
            Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num);
223
0
            if (!st_max.ok()) {
224
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
225
0
                             << st_max.to_string();
226
0
            }
227
0
        }
228
0
    }
229
230
0
    int get_queue_size() override { return _scan_thread_pool->get_queue_size(); }
231
232
0
    int get_active_threads() override { return _scan_thread_pool->num_active_threads(); }
233
234
0
    std::vector<int> thread_debug_info() override { return _scan_thread_pool->debug_info(); }
235
236
    Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
237
                              std::shared_ptr<ScanTask> current_scan_task,
238
                              std::unique_lock<std::mutex>& transfer_lock) override;
239
240
private:
241
    std::unique_ptr<ThreadPool> _scan_thread_pool;
242
    std::atomic<bool> _is_stop;
243
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
244
    std::string _sched_name;
245
    std::string _workload_group;
246
    std::shared_mutex _lock;
247
};
248
249
class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler {
250
public:
251
    TaskExecutorSimplifiedScanScheduler(std::string sched_name,
252
                                        std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl,
253
                                        std::string workload_group = "system")
254
48
            : _is_stop(false),
255
48
              _cgroup_cpu_ctl(cgroup_cpu_ctl),
256
48
              _sched_name(sched_name),
257
48
              _workload_group(workload_group) {}
258
259
24
    ~TaskExecutorSimplifiedScanScheduler() override {
260
24
#ifndef BE_TEST
261
24
        stop();
262
24
#endif
263
24
        LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
264
24
    }
265
266
48
    void stop() override {
267
48
        _is_stop.store(true);
268
48
        _task_executor->stop();
269
48
        _task_executor->wait();
270
48
    }
271
272
    Status start(int max_thread_num, int min_thread_num, int queue_size,
273
48
                 int min_active_scan_threads) override {
274
48
        _min_active_scan_threads = min_active_scan_threads;
275
48
        TimeSharingTaskExecutor::ThreadConfig thread_config;
276
48
        thread_config.thread_name = _sched_name;
277
48
        thread_config.workload_group = _workload_group;
278
48
        thread_config.max_thread_num = max_thread_num;
279
48
        thread_config.min_thread_num = min_thread_num;
280
48
        thread_config.max_queue_size = queue_size;
281
48
        thread_config.cgroup_cpu_ctl = _cgroup_cpu_ctl;
282
48
        _task_executor = TimeSharingTaskExecutor::create_shared(
283
48
                thread_config, max_thread_num * 2, config::task_executor_min_concurrency_per_task,
284
48
                config::task_executor_max_concurrency_per_task > 0
285
48
                        ? config::task_executor_max_concurrency_per_task
286
48
                        : std::numeric_limits<int>::max(),
287
48
                std::make_shared<SystemTicker>(), nullptr, false);
288
48
        RETURN_IF_ERROR(_task_executor->init());
289
48
        RETURN_IF_ERROR(_task_executor->start());
290
48
        return Status::OK();
291
48
    }
292
293
412k
    Status submit_scan_task(SimplifiedScanTask scan_task) override {
294
412k
        if (!_is_stop) {
295
412k
            std::shared_ptr<SplitRunner> split_runner;
296
412k
            if (scan_task.scan_task->is_first_schedule) {
297
406k
                split_runner = std::make_shared<ScannerSplitRunner>("scanner_split_runner",
298
406k
                                                                    scan_task.scan_func);
299
406k
                RETURN_IF_ERROR(split_runner->init());
300
406k
                auto result = _task_executor->enqueue_splits(
301
406k
                        scan_task.scanner_context->task_handle(), false, {split_runner});
302
406k
                if (!result.has_value()) {
303
0
                    LOG(WARNING) << "enqueue_splits failed: " << result.error();
304
0
                    return result.error();
305
0
                }
306
406k
                scan_task.scan_task->is_first_schedule = false;
307
406k
            } else {
308
6.29k
                split_runner = scan_task.scan_task->split_runner.lock();
309
6.29k
                if (split_runner == nullptr) {
310
0
                    return Status::OK();
311
0
                }
312
6.29k
                RETURN_IF_ERROR(_task_executor->re_enqueue_split(
313
6.29k
                        scan_task.scanner_context->task_handle(), false, split_runner));
314
6.29k
            }
315
412k
            scan_task.scan_task->split_runner = split_runner;
316
412k
            return Status::OK();
317
412k
        } else {
318
0
            return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
319
0
        }
320
412k
    }
321
322
    // A task has only one split. When the split is created, the task is created according to the task_id,
323
    // and the task is automatically removed when the split ends.
324
    // Now it is only for PInternalService::multiget_data_v2 used by TopN materialization.
325
    Status submit_scan_task(SimplifiedScanTask scan_task,
326
6.29k
                            const std::string& task_id_string) override {
327
6.29k
        if (!_is_stop) {
328
6.29k
            TaskId task_id(task_id_string);
329
6.29k
            std::shared_ptr<TaskHandle> task_handle = DORIS_TRY(_task_executor->create_task(
330
6.29k
                    task_id, []() { return 0.0; },
331
6.29k
                    config::task_executor_initial_max_concurrency_per_task > 0
332
6.29k
                            ? config::task_executor_initial_max_concurrency_per_task
333
6.29k
                            : std::max(48, CpuInfo::num_cores() * 2),
334
6.29k
                    std::chrono::milliseconds(100), std::nullopt));
335
336
6.29k
            auto wrapped_scan_func = [this, task_handle, scan_func = scan_task.scan_func]() {
337
6.29k
                bool result = scan_func();
338
6.29k
                if (result) {
339
6.29k
                    static_cast<void>(_task_executor->remove_task(task_handle));
340
6.29k
                }
341
6.29k
                return result;
342
6.29k
            };
343
344
6.29k
            auto split_runner =
345
6.29k
                    std::make_shared<ScannerSplitRunner>("scanner_split_runner", wrapped_scan_func);
346
6.29k
            RETURN_IF_ERROR(split_runner->init());
347
348
6.29k
            auto result = _task_executor->enqueue_splits(task_handle, false, {split_runner});
349
6.29k
            if (!result.has_value()) {
350
0
                LOG(WARNING) << "enqueue_splits failed: " << result.error();
351
0
                return result.error();
352
0
            }
353
6.29k
            return Status::OK();
354
6.29k
        } else {
355
0
            return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name);
356
0
        }
357
6.29k
    }
358
359
    void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
360
9.96k
                          int min_active_scan_threads) override {
361
9.96k
        _min_active_scan_threads = min_active_scan_threads;
362
9.96k
        auto task_executor =
363
9.96k
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
364
9.96k
        int cur_max_thread_num = task_executor->max_threads();
365
9.96k
        int cur_min_thread_num = task_executor->min_threads();
366
9.96k
        if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num == new_min_thread_num) {
367
9.96k
            return;
368
9.96k
        }
369
2
        if (new_max_thread_num >= cur_max_thread_num) {
370
0
            Status st_max = task_executor->set_max_threads(new_max_thread_num);
371
0
            if (!st_max.ok()) {
372
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
373
0
                             << st_max.to_string();
374
0
            }
375
0
            Status st_min = task_executor->set_min_threads(new_min_thread_num);
376
0
            if (!st_min.ok()) {
377
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
378
0
                             << st_min.to_string();
379
0
            }
380
2
        } else {
381
2
            Status st_min = task_executor->set_min_threads(new_min_thread_num);
382
2
            if (!st_min.ok()) {
383
0
                LOG(WARNING) << "Failed to set min threads for scan thread pool: "
384
0
                             << st_min.to_string();
385
0
            }
386
2
            Status st_max = task_executor->set_max_threads(new_max_thread_num);
387
2
            if (!st_max.ok()) {
388
0
                LOG(WARNING) << "Failed to set max threads for scan thread pool: "
389
0
                             << st_max.to_string();
390
0
            }
391
2
        }
392
2
    }
393
394
494k
    int get_queue_size() override {
395
494k
        auto task_executor =
396
494k
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
397
494k
        return task_executor->get_queue_size();
398
494k
    }
399
400
494k
    int get_active_threads() override {
401
494k
        auto task_executor =
402
494k
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
403
494k
        return task_executor->num_active_threads();
404
494k
    }
405
406
104
    std::vector<int> thread_debug_info() override {
407
104
        auto task_executor =
408
104
                std::dynamic_pointer_cast<doris::TimeSharingTaskExecutor>(_task_executor);
409
104
        return task_executor->debug_info();
410
104
    }
411
412
164k
    std::shared_ptr<TaskExecutor> task_executor() const { return _task_executor; }
413
414
    Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
415
                              std::shared_ptr<ScanTask> current_scan_task,
416
                              std::unique_lock<std::mutex>& transfer_lock) override;
417
418
private:
419
    std::atomic<bool> _is_stop;
420
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
421
    std::string _sched_name;
422
    std::string _workload_group;
423
    std::shared_mutex _lock;
424
    std::shared_ptr<TaskExecutor> _task_executor = nullptr;
425
};
426
427
} // namespace doris