Coverage Report

Created: 2026-03-17 07:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_scheduler.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
#include <functional>
23
#include <list>
24
#include <memory>
25
#include <ostream>
26
#include <string>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "exec/pipeline/pipeline_task.h"
36
#include "exec/scan/file_scanner.h"
37
#include "exec/scan/olap_scanner.h" // IWYU pragma: keep
38
#include "exec/scan/scan_node.h"
39
#include "exec/scan/scanner.h"
40
#include "exec/scan/scanner_context.h"
41
#include "runtime/exec_env.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "storage/tablet/tablet.h"
46
#include "util/async_io.h" // IWYU pragma: keep
47
#include "util/cpu_info.h"
48
#include "util/defer_op.h"
49
#include "util/thread.h"
50
#include "util/threadpool.h"
51
52
namespace doris {
53
54
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
55
423k
                                std::shared_ptr<ScanTask> scan_task) {
56
423k
    if (ctx->done()) {
57
0
        return Status::OK();
58
0
    }
59
423k
    auto task_lock = ctx->task_exec_ctx();
60
423k
    if (task_lock == nullptr) {
61
18
        LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
62
18
                  << " maybe finished";
63
18
        return Status::OK();
64
18
    }
65
423k
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
66
423k
    if (scanner_delegate == nullptr) {
67
0
        return Status::OK();
68
0
    }
69
70
423k
    scanner_delegate->_scanner->start_wait_worker_timer();
71
423k
    TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
72
423k
    auto sumbit_task = [&]() {
73
423k
        auto work_func = [scanner_ref = scan_task, ctx]() {
74
423k
            auto status = [&] {
75
423k
                RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
76
423k
                return Status::OK();
77
423k
            }();
78
79
423k
            if (!status.ok()) {
80
0
                scanner_ref->set_status(status);
81
0
                ctx->push_back_scan_task(scanner_ref);
82
0
                return true;
83
0
            }
84
423k
            return scanner_ref->is_eos();
85
423k
        };
86
423k
        SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
87
423k
        return this->submit_scan_task(simple_scan_task);
88
423k
    };
89
90
423k
    Status submit_status = sumbit_task();
91
423k
    if (!submit_status.ok()) {
92
        // User will see TooManyTasks error. It looks like a more reasonable error.
93
0
        Status scan_task_status = Status::TooManyTasks(
94
0
                "Failed to submit scanner to scanner pool reason:" +
95
0
                std::string(submit_status.msg()) + "|type:" + std::to_string(type));
96
0
        scan_task->set_status(scan_task_status);
97
0
        return scan_task_status;
98
0
    }
99
100
423k
    return Status::OK();
101
423k
}
102
103
void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx,
104
0
                                   const Status& st, size_t reserve_size) {
105
0
    ctx->clear_free_blocks();
106
0
    auto* local_state = ctx->local_state();
107
108
0
    auto debug_msg = fmt::format(
109
0
            "Query: {} , scanner try to reserve: {}, operator name {}, "
110
0
            "operator "
111
0
            "id: {}, "
112
0
            "task id: "
113
0
            "{}, failed: {}",
114
0
            print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size),
115
0
            local_state->get_name(), local_state->parent()->node_id(), state->task_id(),
116
0
            st.to_string());
117
    // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
118
0
    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
119
0
        debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str());
120
0
    }
121
0
    VLOG_DEBUG << debug_msg;
122
123
0
    state->get_query_ctx()->set_low_memory_mode();
124
0
}
125
126
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
127
423k
                                     std::shared_ptr<ScanTask> scan_task) {
128
423k
    auto task_lock = ctx->task_exec_ctx();
129
423k
    if (task_lock == nullptr) {
130
0
        return;
131
0
    }
132
423k
    SCOPED_ATTACH_TASK(ctx->state());
133
134
423k
    ctx->update_peak_running_scanner(1);
135
423k
    Defer defer([&] { ctx->update_peak_running_scanner(-1); });
136
137
423k
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
138
423k
    if (scanner_delegate == nullptr) {
139
0
        return;
140
0
    }
141
142
423k
    ScannerSPtr& scanner = scanner_delegate->_scanner;
143
    // for cpu hard limit, thread name should not be reset
144
423k
    if (ctx->_should_reset_thread_name) {
145
0
        Thread::set_self_name("_scanner_scan");
146
0
    }
147
148
423k
#ifndef __APPLE__
149
    // The configuration item is used to lower the priority of the scanner thread,
150
    // typically employed to ensure CPU scheduling for write operations.
151
423k
    if (config::scan_thread_nice_value != 0 && scanner->get_name() != FileScanner::NAME) {
152
0
        Thread::set_thread_nice_value();
153
0
    }
154
423k
#endif
155
156
    // we set and get counter according below order, to make sure the counter is updated before get_block, and the time of get_block is recorded in the counter.
157
    // 1. update_wait_worker_timer to make sure the time of waiting for worker thread is recorded in the timer
158
    // 2. start_scan_cpu_timer to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
159
    // 3. update_scan_cpu_timer when defer, to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
160
    // 4. start_wait_worker_timer when defer, to make sure the time of waiting for worker thread is recorded in the timer
161
162
423k
    MonotonicStopWatch max_run_time_watch;
163
423k
    max_run_time_watch.start();
164
423k
    scanner->update_wait_worker_timer();
165
423k
    scanner->start_scan_cpu_timer();
166
167
423k
    bool need_update_profile = true;
168
423k
    auto update_scanner_profile = [&]() {
169
418k
        if (need_update_profile) {
170
418k
            scanner->update_scan_cpu_timer();
171
418k
            scanner->update_realtime_counters();
172
418k
            need_update_profile = false;
173
418k
        }
174
418k
    };
175
176
423k
    Status status = Status::OK();
177
423k
    bool eos = false;
178
423k
    Defer defer_scanner([&] {
179
423k
        if (status.ok() && !eos) {
180
            // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again.
181
4.79k
            scanner->start_wait_worker_timer();
182
4.79k
        }
183
423k
    });
184
185
423k
    ASSIGN_STATUS_IF_CATCH_EXCEPTION(
186
423k
            RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
187
            // scanner->open may alloc plenty amount of memory(read blocks of data),
188
            // so better to also check low memory and clear free blocks here.
189
423k
            if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
190
191
423k
            if (!scanner->has_prepared()) {
192
423k
                status = scanner->prepare();
193
423k
                if (!status.ok()) {
194
423k
                    eos = true;
195
423k
                }
196
423k
            }
197
198
423k
            if (!eos && !scanner->is_open()) {
199
423k
                status = scanner->open(state);
200
423k
                if (!status.ok()) {
201
423k
                    eos = true;
202
423k
                }
203
423k
                scanner->set_opened();
204
423k
            }
205
206
423k
            Status rf_status = scanner->try_append_late_arrival_runtime_filter();
207
423k
            if (!rf_status.ok()) {
208
423k
                LOG(WARNING) << "Failed to append late arrival runtime filter: "
209
423k
                             << rf_status.to_string();
210
423k
            }
211
212
423k
            size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
213
423k
            if (ctx->low_memory_mode()) {
214
423k
                ctx->clear_free_blocks();
215
423k
                if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
216
423k
                    raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
217
423k
                }
218
423k
            }
219
220
423k
            size_t raw_bytes_read = 0;
221
423k
            bool first_read = true; int64_t limit = scanner->limit();
222
            // If the first block is full, then it is true. Or the first block + second block > batch_size
223
423k
            bool has_first_full_block = false;
224
225
            // During low memory mode, every scan task will return at most 2 block to reduce memory usage.
226
423k
            while (!eos && raw_bytes_read < raw_bytes_threshold &&
227
423k
                   (!ctx->low_memory_mode() || !has_first_full_block) &&
228
423k
                   (!has_first_full_block || doris::thread_context()
229
423k
                                                     ->thread_mem_tracker_mgr->limiter_mem_tracker()
230
423k
                                                     ->check_limit(1))) {
231
423k
                if (UNLIKELY(ctx->done())) {
232
423k
                    eos = true;
233
423k
                    break;
234
423k
                }
235
423k
                if (max_run_time_watch.elapsed_time() >
236
423k
                    config::doris_scanner_max_run_time_ms * 1e6) {
237
423k
                    break;
238
423k
                }
239
423k
                DEFER_RELEASE_RESERVED();
240
423k
                BlockUPtr free_block;
241
423k
                if (first_read) {
242
423k
                    free_block = ctx->get_free_block(first_read);
243
423k
                } else {
244
423k
                    if (state->get_query_ctx()
245
423k
                                ->resource_ctx()
246
423k
                                ->task_controller()
247
423k
                                ->is_enable_reserve_memory()) {
248
423k
                        size_t block_avg_bytes = scanner->get_block_avg_bytes();
249
423k
                        auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(
250
423k
                                block_avg_bytes);
251
423k
                        if (!st.ok()) {
252
423k
                            handle_reserve_memory_failure(state, ctx, st, block_avg_bytes);
253
423k
                            break;
254
423k
                        }
255
423k
                    }
256
423k
                    free_block = ctx->get_free_block(first_read);
257
423k
                }
258
423k
                if (free_block == nullptr) {
259
423k
                    break;
260
423k
                }
261
                // We got a new created block or a reused block.
262
423k
                status = scanner->get_block_after_projects(state, free_block.get(), &eos);
263
423k
                first_read = false;
264
423k
                if (!status.ok()) {
265
423k
                    LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string();
266
423k
                    break;
267
423k
                }
268
                // Check column type only after block is read successfully.
269
                // Or it may cause a crash when the block is not normal.
270
423k
                _make_sure_virtual_col_is_materialized(scanner, free_block.get());
271
                // Projection will truncate useless columns, makes block size change.
272
423k
                auto free_block_bytes = free_block->allocated_bytes();
273
423k
                raw_bytes_read += free_block_bytes;
274
423k
                if (!scan_task->cached_blocks.empty() &&
275
423k
                    scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
276
423k
                            ctx->batch_size()) {
277
423k
                    size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
278
423k
                    MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
279
423k
                    status = mutable_block.merge(*free_block);
280
423k
                    if (!status.ok()) {
281
423k
                        LOG(WARNING) << "Block merge failed: " << status.to_string();
282
423k
                        break;
283
423k
                    }
284
423k
                    scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
285
423k
                    scan_task->cached_blocks.back().first.get()->set_columns(
286
423k
                            std::move(mutable_block.mutable_columns()));
287
288
                    // Return block succeed or not, this free_block is not used by this scan task any more.
289
                    // If block can be reused, its memory usage will be added back.
290
423k
                    ctx->return_free_block(std::move(free_block));
291
423k
                    ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
292
423k
                                         block_size);
293
423k
                } else {
294
423k
                    if (!scan_task->cached_blocks.empty()) {
295
423k
                        has_first_full_block = true;
296
423k
                    }
297
423k
                    ctx->inc_block_usage(free_block->allocated_bytes());
298
423k
                    scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
299
423k
                }
300
301
423k
                if (limit > 0 && limit < ctx->batch_size()) {
302
                    // If this scanner has limit, and less than batch size,
303
                    // return immediately and no need to wait raw_bytes_threshold.
304
                    // This can save time that each scanner may only return a small number of rows,
305
                    // but rows are enough from all scanners.
306
                    // If not break, the query like "select * from tbl where id=1 limit 10"
307
                    // may scan a lot data when the "id=1"'s filter ratio is high.
308
                    // If limit is larger than batch size, this rule is skipped,
309
                    // to avoid user specify a large limit and causing too much small blocks.
310
423k
                    break;
311
423k
                }
312
313
423k
                if (scan_task->cached_blocks.back().first->rows() > 0) {
314
423k
                    auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() +
315
423k
                                            scan_task->cached_blocks.back().first->rows() - 1) /
316
423k
                                           scan_task->cached_blocks.back().first->rows() *
317
423k
                                           ctx->batch_size();
318
423k
                    scanner->update_block_avg_bytes(block_avg_bytes);
319
423k
                }
320
423k
                if (ctx->low_memory_mode()) {
321
423k
                    ctx->clear_free_blocks();
322
423k
                    if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
323
423k
                        raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
324
423k
                    }
325
423k
                }
326
423k
            } // end for while
327
328
423k
            if (UNLIKELY(!status.ok())) {
329
423k
                scan_task->set_status(status);
330
423k
                eos = true;
331
423k
            },
332
423k
            status);
333
334
423k
    if (UNLIKELY(!status.ok())) {
335
70
        scan_task->set_status(status);
336
70
        eos = true;
337
70
    }
338
339
423k
    if (eos) {
340
        // If eos, scanner will call _collect_profile_before_close to update profile,
341
        // so we need update_scanner_profile here
342
418k
        update_scanner_profile();
343
418k
        scanner->mark_to_need_to_close();
344
418k
    }
345
423k
    scan_task->set_eos(eos);
346
347
423k
    VLOG_DEBUG << fmt::format(
348
286
            "Scanner context {} has finished task, cached_block {} current scheduled task is "
349
286
            "{}, eos: {}, status: {}",
350
286
            ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos,
351
286
            status.to_string());
352
353
423k
    ctx->push_back_scan_task(scan_task);
354
423k
}
355
73.4k
int ScannerScheduler::default_local_scan_thread_num() {
356
73.4k
    return config::doris_scanner_thread_pool_thread_num > 0
357
73.4k
                   ? config::doris_scanner_thread_pool_thread_num
358
73.4k
                   : std::max(48, CpuInfo::num_cores() * 2);
359
73.4k
}
360
68.1k
int ScannerScheduler::default_remote_scan_thread_num() {
361
68.1k
    int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
362
68.1k
                      ? config::doris_max_remote_scanner_thread_pool_thread_num
363
68.1k
                      : std::max(512, CpuInfo::num_cores() * 10);
364
68.1k
    return std::max(num, default_local_scan_thread_num());
365
68.1k
}
366
367
24
int ScannerScheduler::get_remote_scan_thread_queue_size() {
368
24
    return config::doris_remote_scanner_thread_pool_queue_size;
369
24
}
370
371
5.28k
int ScannerScheduler::default_min_active_scan_threads() {
372
5.28k
    return config::min_active_scan_threads > 0
373
5.28k
                   ? config::min_active_scan_threads
374
5.28k
                   : config::min_active_scan_threads = CpuInfo::num_cores() * 2;
375
5.28k
}
376
377
5.28k
int ScannerScheduler::default_min_active_file_scan_threads() {
378
5.28k
    return config::min_active_file_scan_threads > 0
379
5.28k
                   ? config::min_active_file_scan_threads
380
5.28k
                   : config::min_active_file_scan_threads = CpuInfo::num_cores() * 8;
381
5.28k
}
382
383
void ScannerScheduler::_make_sure_virtual_col_is_materialized(
384
482k
        const std::shared_ptr<Scanner>& scanner, Block* free_block) {
385
482k
#ifndef NDEBUG
386
    // Currently, virtual column can only be used on olap table.
387
482k
    std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner);
388
482k
    if (olap_scanner == nullptr) {
389
157k
        return;
390
157k
    }
391
392
325k
    if (free_block->rows() == 0) {
393
306k
        return;
394
306k
    }
395
396
18.8k
    size_t idx = 0;
397
37.7k
    for (const auto& entry : *free_block) {
398
        // Virtual column must be materialized on the end of SegmentIterator's next batch method.
399
37.7k
        const ColumnNothing* column_nothing =
400
37.7k
                check_and_get_column<ColumnNothing>(entry.column.get());
401
37.7k
        if (column_nothing == nullptr) {
402
37.7k
            idx++;
403
37.7k
            continue;
404
37.7k
        }
405
406
18.4E
        std::vector<std::string> vcid_to_idx;
407
408
18.4E
        for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
409
0
            vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
410
0
        }
411
412
18.4E
        std::string error_msg = fmt::format(
413
18.4E
                "Column in idx {} is nothing, block columns {}, normal_columns "
414
18.4E
                "{}, "
415
18.4E
                "vir_cid_to_idx_in_block_msg {}",
416
18.4E
                idx, free_block->columns(), olap_scanner->_return_columns.size(),
417
18.4E
                fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")));
418
18.4E
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
419
37.7k
    }
420
18.8k
#endif
421
18.8k
}
422
423
429k
Result<SharedListenableFuture<Void>> ScannerSplitRunner::process_for(std::chrono::nanoseconds) {
424
429k
    _started = true;
425
429k
    bool is_completed = _scan_func();
426
429k
    if (is_completed) {
427
425k
        _completion_future.set_value(Void {});
428
425k
    }
429
429k
    return SharedListenableFuture<Void>::create_ready(Void {});
430
429k
}
431
432
429k
bool ScannerSplitRunner::is_finished() {
433
429k
    return _completion_future.is_done();
434
429k
}
435
436
424k
Status ScannerSplitRunner::finished_status() {
437
424k
    return _completion_future.get_status();
438
424k
}
439
440
0
bool ScannerSplitRunner::is_started() const {
441
0
    return _started.load();
442
0
}
443
444
4.79k
bool ScannerSplitRunner::is_auto_reschedule() const {
445
4.79k
    return false;
446
4.79k
}
447
448
} // namespace doris