Coverage Report

Created: 2026-03-31 15:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_scheduler.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
#include <functional>
23
#include <list>
24
#include <memory>
25
#include <ostream>
26
#include <string>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "exec/pipeline/pipeline_task.h"
36
#include "exec/scan/file_scanner.h"
37
#include "exec/scan/olap_scanner.h" // IWYU pragma: keep
38
#include "exec/scan/scan_node.h"
39
#include "exec/scan/scanner.h"
40
#include "exec/scan/scanner_context.h"
41
#include "runtime/exec_env.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "storage/tablet/tablet.h"
46
#include "util/async_io.h" // IWYU pragma: keep
47
#include "util/cpu_info.h"
48
#include "util/defer_op.h"
49
#include "util/thread.h"
50
#include "util/threadpool.h"
51
52
namespace doris {
53
54
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
55
1.26M
                                std::shared_ptr<ScanTask> scan_task) {
56
1.26M
    if (ctx->done()) {
57
0
        return Status::OK();
58
0
    }
59
1.26M
    auto task_lock = ctx->task_exec_ctx();
60
1.26M
    if (task_lock == nullptr) {
61
18
        LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
62
18
                  << " maybe finished";
63
18
        return Status::OK();
64
18
    }
65
1.26M
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
66
1.26M
    if (scanner_delegate == nullptr) {
67
0
        return Status::OK();
68
0
    }
69
70
1.26M
    scanner_delegate->_scanner->start_wait_worker_timer();
71
1.26M
    TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
72
1.26M
    auto sumbit_task = [&]() {
73
1.26M
        auto work_func = [scanner_ref = scan_task, ctx]() {
74
1.26M
            auto status = [&] {
75
1.26M
                RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
76
1.26M
                return Status::OK();
77
1.26M
            }();
78
79
1.26M
            if (!status.ok()) {
80
0
                scanner_ref->set_status(status);
81
0
                ctx->push_back_scan_task(scanner_ref);
82
0
                return true;
83
0
            }
84
1.26M
            return scanner_ref->is_eos();
85
1.26M
        };
86
1.26M
        SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
87
1.26M
        return this->submit_scan_task(simple_scan_task);
88
1.26M
    };
89
90
1.26M
    Status submit_status = sumbit_task();
91
1.26M
    if (!submit_status.ok()) {
92
        // User will see TooManyTasks error. It looks like a more reasonable error.
93
0
        Status scan_task_status = Status::TooManyTasks(
94
0
                "Failed to submit scanner to scanner pool reason:" +
95
0
                std::string(submit_status.msg()) + "|type:" + std::to_string(type));
96
0
        scan_task->set_status(scan_task_status);
97
0
        return scan_task_status;
98
0
    }
99
100
1.26M
    return Status::OK();
101
1.26M
}
102
103
void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx,
104
0
                                   const Status& st, size_t reserve_size) {
105
0
    ctx->clear_free_blocks();
106
0
    auto* local_state = ctx->local_state();
107
108
0
    auto debug_msg = fmt::format(
109
0
            "Query: {} , scanner try to reserve: {}, operator name {}, "
110
0
            "operator "
111
0
            "id: {}, "
112
0
            "task id: "
113
0
            "{}, failed: {}",
114
0
            print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size),
115
0
            local_state->get_name(), local_state->parent()->node_id(), state->task_id(),
116
0
            st.to_string());
117
    // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
118
0
    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
119
0
        debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str());
120
0
    }
121
0
    VLOG_DEBUG << debug_msg;
122
123
0
    state->get_query_ctx()->set_low_memory_mode();
124
0
}
125
126
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
127
1.26M
                                     std::shared_ptr<ScanTask> scan_task) {
128
1.26M
    auto task_lock = ctx->task_exec_ctx();
129
1.26M
    if (task_lock == nullptr) {
130
0
        return;
131
0
    }
132
1.26M
    SCOPED_ATTACH_TASK(ctx->state());
133
134
1.26M
    ctx->update_peak_running_scanner(1);
135
1.26M
    Defer defer([&] { ctx->update_peak_running_scanner(-1); });
136
137
1.26M
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
138
1.26M
    if (scanner_delegate == nullptr) {
139
0
        return;
140
0
    }
141
142
1.26M
    ScannerSPtr& scanner = scanner_delegate->_scanner;
143
    // for cpu hard limit, thread name should not be reset
144
1.26M
    if (ctx->_should_reset_thread_name) {
145
0
        Thread::set_self_name("_scanner_scan");
146
0
    }
147
148
1.26M
#ifndef __APPLE__
149
    // The configuration item is used to lower the priority of the scanner thread,
150
    // typically employed to ensure CPU scheduling for write operations.
151
1.26M
    if (config::scan_thread_nice_value != 0 && scanner->get_name() != FileScanner::NAME) {
152
0
        Thread::set_thread_nice_value();
153
0
    }
154
1.26M
#endif
155
156
    // we set and get counter according below order, to make sure the counter is updated before get_block, and the time of get_block is recorded in the counter.
157
    // 1. update_wait_worker_timer to make sure the time of waiting for worker thread is recorded in the timer
158
    // 2. start_scan_cpu_timer to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
159
    // 3. update_scan_cpu_timer when defer, to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
160
    // 4. start_wait_worker_timer when defer, to make sure the time of waiting for worker thread is recorded in the timer
161
162
1.26M
    MonotonicStopWatch max_run_time_watch;
163
1.26M
    max_run_time_watch.start();
164
1.26M
    scanner->update_wait_worker_timer();
165
1.26M
    scanner->start_scan_cpu_timer();
166
167
1.26M
    bool need_update_profile = true;
168
1.26M
    auto update_scanner_profile = [&]() {
169
1.25M
        if (need_update_profile) {
170
1.25M
            scanner->update_scan_cpu_timer();
171
1.25M
            scanner->update_realtime_counters();
172
1.25M
            need_update_profile = false;
173
1.25M
        }
174
1.25M
    };
175
176
1.26M
    Status status = Status::OK();
177
1.26M
    bool eos = false;
178
1.26M
    Defer defer_scanner([&] {
179
1.26M
        if (status.ok() && !eos) {
180
            // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again.
181
8.93k
            scanner->start_wait_worker_timer();
182
8.93k
        }
183
1.26M
    });
184
185
1.26M
    ASSIGN_STATUS_IF_CATCH_EXCEPTION(
186
1.26M
            RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
187
            // scanner->open may alloc plenty amount of memory(read blocks of data),
188
            // so better to also check low memory and clear free blocks here.
189
1.26M
            if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
190
191
1.26M
            if (!scanner->has_prepared()) {
192
1.26M
                status = scanner->prepare();
193
1.26M
                if (!status.ok()) {
194
1.26M
                    eos = true;
195
1.26M
                }
196
1.26M
            }
197
198
1.26M
            if (!eos && !scanner->is_open()) {
199
1.26M
                status = scanner->open(state);
200
1.26M
                if (!status.ok()) {
201
1.26M
                    eos = true;
202
1.26M
                }
203
1.26M
                scanner->set_opened();
204
1.26M
            }
205
206
1.26M
            Status rf_status = scanner->try_append_late_arrival_runtime_filter();
207
1.26M
            if (!rf_status.ok()) {
208
1.26M
                LOG(WARNING) << "Failed to append late arrival runtime filter: "
209
1.26M
                             << rf_status.to_string();
210
1.26M
            }
211
212
1.26M
            size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
213
1.26M
            if (ctx->low_memory_mode()) {
214
1.26M
                ctx->clear_free_blocks();
215
1.26M
                if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
216
1.26M
                    raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
217
1.26M
                }
218
1.26M
            }
219
220
1.26M
            size_t raw_bytes_read = 0;
221
1.26M
            bool first_read = true; int64_t limit = scanner->limit();
222
            // If the first block is full, then it is true. Or the first block + second block > batch_size
223
1.26M
            bool has_first_full_block = false;
224
225
            // During low memory mode, every scan task will return at most 2 block to reduce memory usage.
226
1.26M
            while (!eos && raw_bytes_read < raw_bytes_threshold &&
227
1.26M
                   (!ctx->low_memory_mode() || !has_first_full_block) &&
228
1.26M
                   (!has_first_full_block || doris::thread_context()
229
1.26M
                                                     ->thread_mem_tracker_mgr->limiter_mem_tracker()
230
1.26M
                                                     ->check_limit(1))) {
231
1.26M
                if (UNLIKELY(ctx->done())) {
232
1.26M
                    eos = true;
233
1.26M
                    break;
234
1.26M
                }
235
                // If shared limit quota is exhausted, stop scanning.
236
1.26M
                if (ctx->remaining_limit() == 0) {
237
1.26M
                    eos = true;
238
1.26M
                    break;
239
1.26M
                }
240
1.26M
                if (max_run_time_watch.elapsed_time() >
241
1.26M
                    config::doris_scanner_max_run_time_ms * 1e6) {
242
1.26M
                    break;
243
1.26M
                }
244
1.26M
                DEFER_RELEASE_RESERVED();
245
1.26M
                BlockUPtr free_block;
246
1.26M
                if (first_read) {
247
1.26M
                    free_block = ctx->get_free_block(first_read);
248
1.26M
                } else {
249
1.26M
                    if (state->get_query_ctx()
250
1.26M
                                ->resource_ctx()
251
1.26M
                                ->task_controller()
252
1.26M
                                ->is_enable_reserve_memory()) {
253
1.26M
                        size_t block_avg_bytes = scanner->get_block_avg_bytes();
254
1.26M
                        auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(
255
1.26M
                                block_avg_bytes);
256
1.26M
                        if (!st.ok()) {
257
1.26M
                            handle_reserve_memory_failure(state, ctx, st, block_avg_bytes);
258
1.26M
                            break;
259
1.26M
                        }
260
1.26M
                    }
261
1.26M
                    free_block = ctx->get_free_block(first_read);
262
1.26M
                }
263
1.26M
                if (free_block == nullptr) {
264
1.26M
                    break;
265
1.26M
                }
266
                // We got a new created block or a reused block.
267
1.26M
                status = scanner->get_block_after_projects(state, free_block.get(), &eos);
268
1.26M
                first_read = false;
269
1.26M
                if (!status.ok()) {
270
1.26M
                    LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string();
271
1.26M
                    break;
272
1.26M
                }
273
                // Check column type only after block is read successfully.
274
                // Or it may cause a crash when the block is not normal.
275
1.26M
                _make_sure_virtual_col_is_materialized(scanner, free_block.get());
276
277
                // Shared limit quota: acquire rows from the context's shared pool.
278
                // Discard or truncate the block if quota is exhausted.
279
1.26M
                if (free_block->rows() > 0) {
280
1.26M
                    int64_t block_rows = free_block->rows();
281
1.26M
                    int64_t granted = ctx->acquire_limit_quota(block_rows);
282
1.26M
                    if (granted == 0) {
283
                        // No quota remaining, discard this block and mark eos.
284
1.26M
                        ctx->return_free_block(std::move(free_block));
285
1.26M
                        eos = true;
286
1.26M
                        break;
287
1.26M
                    } else if (granted < block_rows) {
288
                        // Partial quota: truncate block to granted rows and mark eos.
289
1.26M
                        free_block->set_num_rows(granted);
290
1.26M
                        eos = true;
291
1.26M
                    }
292
1.26M
                }
293
                // Projection will truncate useless columns, makes block size change.
294
1.26M
                auto free_block_bytes = free_block->allocated_bytes();
295
1.26M
                raw_bytes_read += free_block_bytes;
296
1.26M
                if (!scan_task->cached_blocks.empty() &&
297
1.26M
                    scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
298
1.26M
                            ctx->batch_size()) {
299
1.26M
                    size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
300
1.26M
                    MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
301
1.26M
                    status = mutable_block.merge(*free_block);
302
1.26M
                    if (!status.ok()) {
303
1.26M
                        LOG(WARNING) << "Block merge failed: " << status.to_string();
304
1.26M
                        break;
305
1.26M
                    }
306
1.26M
                    scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
307
1.26M
                    scan_task->cached_blocks.back().first.get()->set_columns(
308
1.26M
                            std::move(mutable_block.mutable_columns()));
309
310
                    // Return block succeed or not, this free_block is not used by this scan task any more.
311
                    // If block can be reused, its memory usage will be added back.
312
1.26M
                    ctx->return_free_block(std::move(free_block));
313
1.26M
                    ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
314
1.26M
                                         block_size);
315
1.26M
                } else {
316
1.26M
                    if (!scan_task->cached_blocks.empty()) {
317
1.26M
                        has_first_full_block = true;
318
1.26M
                    }
319
1.26M
                    ctx->inc_block_usage(free_block->allocated_bytes());
320
1.26M
                    scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
321
1.26M
                }
322
323
                // Per-scanner small-limit optimization: if limit is small (< batch_size),
324
                // return immediately instead of accumulating to raw_bytes_threshold.
325
1.26M
                if (limit > 0 && limit < ctx->batch_size()) {
326
1.26M
                    break;
327
1.26M
                }
328
329
1.26M
                if (scan_task->cached_blocks.back().first->rows() > 0) {
330
1.26M
                    auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() +
331
1.26M
                                            scan_task->cached_blocks.back().first->rows() - 1) /
332
1.26M
                                           scan_task->cached_blocks.back().first->rows() *
333
1.26M
                                           ctx->batch_size();
334
1.26M
                    scanner->update_block_avg_bytes(block_avg_bytes);
335
1.26M
                }
336
1.26M
                if (ctx->low_memory_mode()) {
337
1.26M
                    ctx->clear_free_blocks();
338
1.26M
                    if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
339
1.26M
                        raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
340
1.26M
                    }
341
1.26M
                }
342
1.26M
            } // end for while
343
344
1.26M
            if (UNLIKELY(!status.ok())) {
345
1.26M
                scan_task->set_status(status);
346
1.26M
                eos = true;
347
1.26M
            },
348
1.26M
            status);
349
350
1.26M
    if (UNLIKELY(!status.ok())) {
351
1.49k
        scan_task->set_status(status);
352
1.49k
        eos = true;
353
1.49k
    }
354
355
1.26M
    if (eos) {
356
        // If eos, scanner will call _collect_profile_before_close to update profile,
357
        // so we need update_scanner_profile here
358
1.25M
        update_scanner_profile();
359
1.25M
        scanner->mark_to_need_to_close();
360
1.25M
    }
361
1.26M
    scan_task->set_eos(eos);
362
363
18.4E
    VLOG_DEBUG << fmt::format(
364
18.4E
            "Scanner context {} has finished task, cached_block {} current scheduled task is "
365
18.4E
            "{}, eos: {}, status: {}",
366
18.4E
            ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos,
367
18.4E
            status.to_string());
368
369
1.26M
    ctx->push_back_scan_task(scan_task);
370
1.26M
}
371
73.2k
int ScannerScheduler::default_local_scan_thread_num() {
372
73.2k
    return config::doris_scanner_thread_pool_thread_num > 0
373
73.2k
                   ? config::doris_scanner_thread_pool_thread_num
374
73.2k
                   : std::max(48, CpuInfo::num_cores() * 2);
375
73.2k
}
376
66.6k
int ScannerScheduler::default_remote_scan_thread_num() {
377
66.6k
    int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
378
66.6k
                      ? config::doris_max_remote_scanner_thread_pool_thread_num
379
66.6k
                      : std::max(512, CpuInfo::num_cores() * 10);
380
66.6k
    return std::max(num, default_local_scan_thread_num());
381
66.6k
}
382
383
35
int ScannerScheduler::get_remote_scan_thread_queue_size() {
384
35
    return config::doris_remote_scanner_thread_pool_queue_size;
385
35
}
386
387
6.56k
int ScannerScheduler::default_min_active_scan_threads() {
388
6.56k
    return config::min_active_scan_threads > 0
389
6.56k
                   ? config::min_active_scan_threads
390
6.56k
                   : config::min_active_scan_threads = CpuInfo::num_cores() * 2;
391
6.56k
}
392
393
6.56k
int ScannerScheduler::default_min_active_file_scan_threads() {
394
6.56k
    return config::min_active_file_scan_threads > 0
395
6.56k
                   ? config::min_active_file_scan_threads
396
6.56k
                   : config::min_active_file_scan_threads = CpuInfo::num_cores() * 8;
397
6.56k
}
398
399
void ScannerScheduler::_make_sure_virtual_col_is_materialized(
400
1.36M
        const std::shared_ptr<Scanner>& scanner, Block* free_block) {
401
1.36M
#ifndef NDEBUG
402
    // Currently, virtual column can only be used on olap table.
403
1.36M
    std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner);
404
1.36M
    if (olap_scanner == nullptr) {
405
152k
        return;
406
152k
    }
407
408
1.20M
    if (free_block->rows() == 0) {
409
973k
        return;
410
973k
    }
411
412
235k
    size_t idx = 0;
413
725k
    for (const auto& entry : *free_block) {
414
        // Virtual column must be materialized on the end of SegmentIterator's next batch method.
415
725k
        const ColumnNothing* column_nothing =
416
725k
                check_and_get_column<ColumnNothing>(entry.column.get());
417
725k
        if (column_nothing == nullptr) {
418
725k
            idx++;
419
725k
            continue;
420
725k
        }
421
422
18.4E
        std::vector<std::string> vcid_to_idx;
423
424
18.4E
        for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
425
0
            vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
426
0
        }
427
428
18.4E
        std::string error_msg = fmt::format(
429
18.4E
                "Column in idx {} is nothing, block columns {}, normal_columns "
430
18.4E
                "{}, "
431
18.4E
                "vir_cid_to_idx_in_block_msg {}",
432
18.4E
                idx, free_block->columns(), olap_scanner->_return_columns.size(),
433
18.4E
                fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")));
434
18.4E
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
435
725k
    }
436
235k
#endif
437
235k
}
438
439
1.27M
Result<SharedListenableFuture<Void>> ScannerSplitRunner::process_for(std::chrono::nanoseconds) {
440
1.27M
    _started = true;
441
1.27M
    bool is_completed = _scan_func();
442
1.27M
    if (is_completed) {
443
1.26M
        _completion_future.set_value(Void {});
444
1.26M
    }
445
1.27M
    return SharedListenableFuture<Void>::create_ready(Void {});
446
1.27M
}
447
448
1.27M
bool ScannerSplitRunner::is_finished() {
449
1.27M
    return _completion_future.is_done();
450
1.27M
}
451
452
1.26M
Status ScannerSplitRunner::finished_status() {
453
1.26M
    return _completion_future.get_status();
454
1.26M
}
455
456
0
bool ScannerSplitRunner::is_started() const {
457
0
    return _started.load();
458
0
}
459
460
8.91k
bool ScannerSplitRunner::is_auto_reschedule() const {
461
8.91k
    return false;
462
8.91k
}
463
464
} // namespace doris