Coverage Report

Created: 2026-03-27 12:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_scheduler.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
#include <functional>
23
#include <list>
24
#include <memory>
25
#include <ostream>
26
#include <string>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "exec/pipeline/pipeline_task.h"
36
#include "exec/scan/file_scanner.h"
37
#include "exec/scan/olap_scanner.h" // IWYU pragma: keep
38
#include "exec/scan/scan_node.h"
39
#include "exec/scan/scanner.h"
40
#include "exec/scan/scanner_context.h"
41
#include "runtime/exec_env.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "storage/tablet/tablet.h"
46
#include "util/async_io.h" // IWYU pragma: keep
47
#include "util/cpu_info.h"
48
#include "util/defer_op.h"
49
#include "util/thread.h"
50
#include "util/threadpool.h"
51
52
namespace doris {
53
54
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
55
18
                                std::shared_ptr<ScanTask> scan_task) {
56
18
    if (ctx->done()) {
57
0
        return Status::OK();
58
0
    }
59
18
    auto task_lock = ctx->task_exec_ctx();
60
18
    if (task_lock == nullptr) {
61
18
        LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
62
18
                  << " maybe finished";
63
18
        return Status::OK();
64
18
    }
65
0
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
66
0
    if (scanner_delegate == nullptr) {
67
0
        return Status::OK();
68
0
    }
69
70
0
    scanner_delegate->_scanner->start_wait_worker_timer();
71
0
    TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
72
0
    auto sumbit_task = [&]() {
73
0
        auto work_func = [scanner_ref = scan_task, ctx]() {
74
0
            auto status = [&] {
75
0
                RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
76
0
                return Status::OK();
77
0
            }();
78
79
0
            if (!status.ok()) {
80
0
                scanner_ref->set_status(status);
81
0
                ctx->push_back_scan_task(scanner_ref);
82
0
                return true;
83
0
            }
84
0
            return scanner_ref->is_eos();
85
0
        };
86
0
        SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
87
0
        return this->submit_scan_task(simple_scan_task);
88
0
    };
89
90
0
    Status submit_status = sumbit_task();
91
0
    if (!submit_status.ok()) {
92
        // User will see TooManyTasks error. It looks like a more reasonable error.
93
0
        Status scan_task_status = Status::TooManyTasks(
94
0
                "Failed to submit scanner to scanner pool reason:" +
95
0
                std::string(submit_status.msg()) + "|type:" + std::to_string(type));
96
0
        scan_task->set_status(scan_task_status);
97
0
        return scan_task_status;
98
0
    }
99
100
0
    return Status::OK();
101
0
}
102
103
void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx,
104
0
                                   const Status& st, size_t reserve_size) {
105
0
    ctx->clear_free_blocks();
106
0
    auto* local_state = ctx->local_state();
107
108
0
    auto debug_msg = fmt::format(
109
0
            "Query: {} , scanner try to reserve: {}, operator name {}, "
110
0
            "operator "
111
0
            "id: {}, "
112
0
            "task id: "
113
0
            "{}, failed: {}",
114
0
            print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size),
115
0
            local_state->get_name(), local_state->parent()->node_id(), state->task_id(),
116
0
            st.to_string());
117
    // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
118
0
    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
119
0
        debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str());
120
0
    }
121
0
    VLOG_DEBUG << debug_msg;
122
123
0
    state->get_query_ctx()->set_low_memory_mode();
124
0
}
125
126
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
127
0
                                     std::shared_ptr<ScanTask> scan_task) {
128
0
    auto task_lock = ctx->task_exec_ctx();
129
0
    if (task_lock == nullptr) {
130
0
        return;
131
0
    }
132
0
    SCOPED_ATTACH_TASK(ctx->state());
133
134
0
    ctx->update_peak_running_scanner(1);
135
0
    Defer defer([&] { ctx->update_peak_running_scanner(-1); });
136
137
0
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
138
0
    if (scanner_delegate == nullptr) {
139
0
        return;
140
0
    }
141
142
0
    ScannerSPtr& scanner = scanner_delegate->_scanner;
143
    // for cpu hard limit, thread name should not be reset
144
0
    if (ctx->_should_reset_thread_name) {
145
0
        Thread::set_self_name("_scanner_scan");
146
0
    }
147
148
0
#ifndef __APPLE__
149
    // The configuration item is used to lower the priority of the scanner thread,
150
    // typically employed to ensure CPU scheduling for write operations.
151
0
    if (config::scan_thread_nice_value != 0 && scanner->get_name() != FileScanner::NAME) {
152
0
        Thread::set_thread_nice_value();
153
0
    }
154
0
#endif
155
156
    // we set and get counter according below order, to make sure the counter is updated before get_block, and the time of get_block is recorded in the counter.
157
    // 1. update_wait_worker_timer to make sure the time of waiting for worker thread is recorded in the timer
158
    // 2. start_scan_cpu_timer to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
159
    // 3. update_scan_cpu_timer when defer, to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
160
    // 4. start_wait_worker_timer when defer, to make sure the time of waiting for worker thread is recorded in the timer
161
162
0
    MonotonicStopWatch max_run_time_watch;
163
0
    max_run_time_watch.start();
164
0
    scanner->update_wait_worker_timer();
165
0
    scanner->start_scan_cpu_timer();
166
167
0
    bool need_update_profile = true;
168
0
    auto update_scanner_profile = [&]() {
169
0
        if (need_update_profile) {
170
0
            scanner->update_scan_cpu_timer();
171
0
            scanner->update_realtime_counters();
172
0
            need_update_profile = false;
173
0
        }
174
0
    };
175
176
0
    Status status = Status::OK();
177
0
    bool eos = false;
178
0
    Defer defer_scanner([&] {
179
0
        if (status.ok() && !eos) {
180
            // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again.
181
0
            scanner->start_wait_worker_timer();
182
0
        }
183
0
    });
184
185
0
    ASSIGN_STATUS_IF_CATCH_EXCEPTION(
186
0
            RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
187
            // scanner->open may alloc plenty amount of memory(read blocks of data),
188
            // so better to also check low memory and clear free blocks here.
189
0
            if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
190
191
0
            if (!scanner->has_prepared()) {
192
0
                status = scanner->prepare();
193
0
                if (!status.ok()) {
194
0
                    eos = true;
195
0
                }
196
0
            }
197
198
0
            if (!eos && !scanner->is_open()) {
199
0
                status = scanner->open(state);
200
0
                if (!status.ok()) {
201
0
                    eos = true;
202
0
                }
203
0
                scanner->set_opened();
204
0
            }
205
206
0
            Status rf_status = scanner->try_append_late_arrival_runtime_filter();
207
0
            if (!rf_status.ok()) {
208
0
                LOG(WARNING) << "Failed to append late arrival runtime filter: "
209
0
                             << rf_status.to_string();
210
0
            }
211
212
0
            size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
213
0
            if (ctx->low_memory_mode()) {
214
0
                ctx->clear_free_blocks();
215
0
                if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
216
0
                    raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
217
0
                }
218
0
            }
219
220
0
            size_t raw_bytes_read = 0;
221
0
            bool first_read = true; int64_t limit = scanner->limit();
222
            // If the first block is full, then it is true. Or the first block + second block > batch_size
223
0
            bool has_first_full_block = false;
224
225
            // During low memory mode, every scan task will return at most 2 block to reduce memory usage.
226
0
            while (!eos && raw_bytes_read < raw_bytes_threshold &&
227
0
                   (!ctx->low_memory_mode() || !has_first_full_block) &&
228
0
                   (!has_first_full_block || doris::thread_context()
229
0
                                                     ->thread_mem_tracker_mgr->limiter_mem_tracker()
230
0
                                                     ->check_limit(1))) {
231
0
                if (UNLIKELY(ctx->done())) {
232
0
                    eos = true;
233
0
                    break;
234
0
                }
235
                // If shared limit quota is exhausted, stop scanning.
236
0
                if (ctx->remaining_limit() == 0) {
237
0
                    eos = true;
238
0
                    break;
239
0
                }
240
0
                if (max_run_time_watch.elapsed_time() >
241
0
                    config::doris_scanner_max_run_time_ms * 1e6) {
242
0
                    break;
243
0
                }
244
0
                DEFER_RELEASE_RESERVED();
245
0
                BlockUPtr free_block;
246
0
                if (first_read) {
247
0
                    free_block = ctx->get_free_block(first_read);
248
0
                } else {
249
0
                    if (state->get_query_ctx()
250
0
                                ->resource_ctx()
251
0
                                ->task_controller()
252
0
                                ->is_enable_reserve_memory()) {
253
0
                        size_t block_avg_bytes = scanner->get_block_avg_bytes();
254
0
                        auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(
255
0
                                block_avg_bytes);
256
0
                        if (!st.ok()) {
257
0
                            handle_reserve_memory_failure(state, ctx, st, block_avg_bytes);
258
0
                            break;
259
0
                        }
260
0
                    }
261
0
                    free_block = ctx->get_free_block(first_read);
262
0
                }
263
0
                if (free_block == nullptr) {
264
0
                    break;
265
0
                }
266
                // We got a new created block or a reused block.
267
0
                status = scanner->get_block_after_projects(state, free_block.get(), &eos);
268
0
                first_read = false;
269
0
                if (!status.ok()) {
270
0
                    LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string();
271
0
                    break;
272
0
                }
273
                // Check column type only after block is read successfully.
274
                // Or it may cause a crash when the block is not normal.
275
0
                _make_sure_virtual_col_is_materialized(scanner, free_block.get());
276
277
                // Shared limit quota: acquire rows from the context's shared pool.
278
                // Discard or truncate the block if quota is exhausted.
279
0
                if (free_block->rows() > 0) {
280
0
                    int64_t block_rows = free_block->rows();
281
0
                    int64_t granted = ctx->acquire_limit_quota(block_rows);
282
0
                    if (granted == 0) {
283
                        // No quota remaining, discard this block and mark eos.
284
0
                        ctx->return_free_block(std::move(free_block));
285
0
                        eos = true;
286
0
                        break;
287
0
                    } else if (granted < block_rows) {
288
                        // Partial quota: truncate block to granted rows and mark eos.
289
0
                        free_block->set_num_rows(granted);
290
0
                        eos = true;
291
0
                    }
292
0
                }
293
                // Projection will truncate useless columns, makes block size change.
294
0
                auto free_block_bytes = free_block->allocated_bytes();
295
0
                raw_bytes_read += free_block_bytes;
296
0
                if (!scan_task->cached_blocks.empty() &&
297
0
                    scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
298
0
                            ctx->batch_size()) {
299
0
                    size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
300
0
                    MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
301
0
                    status = mutable_block.merge(*free_block);
302
0
                    if (!status.ok()) {
303
0
                        LOG(WARNING) << "Block merge failed: " << status.to_string();
304
0
                        break;
305
0
                    }
306
0
                    scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
307
0
                    scan_task->cached_blocks.back().first.get()->set_columns(
308
0
                            std::move(mutable_block.mutable_columns()));
309
310
                    // Return block succeed or not, this free_block is not used by this scan task any more.
311
                    // If block can be reused, its memory usage will be added back.
312
0
                    ctx->return_free_block(std::move(free_block));
313
0
                    ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
314
0
                                         block_size);
315
0
                } else {
316
0
                    if (!scan_task->cached_blocks.empty()) {
317
0
                        has_first_full_block = true;
318
0
                    }
319
0
                    ctx->inc_block_usage(free_block->allocated_bytes());
320
0
                    scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
321
0
                }
322
323
                // Per-scanner small-limit optimization: if limit is small (< batch_size),
324
                // return immediately instead of accumulating to raw_bytes_threshold.
325
0
                if (limit > 0 && limit < ctx->batch_size()) {
326
0
                    break;
327
0
                }
328
329
0
                if (scan_task->cached_blocks.back().first->rows() > 0) {
330
0
                    auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() +
331
0
                                            scan_task->cached_blocks.back().first->rows() - 1) /
332
0
                                           scan_task->cached_blocks.back().first->rows() *
333
0
                                           ctx->batch_size();
334
0
                    scanner->update_block_avg_bytes(block_avg_bytes);
335
0
                }
336
0
                if (ctx->low_memory_mode()) {
337
0
                    ctx->clear_free_blocks();
338
0
                    if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
339
0
                        raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
340
0
                    }
341
0
                }
342
0
            } // end for while
343
344
0
            if (UNLIKELY(!status.ok())) {
345
0
                scan_task->set_status(status);
346
0
                eos = true;
347
0
            },
348
0
            status);
349
350
0
    if (UNLIKELY(!status.ok())) {
351
0
        scan_task->set_status(status);
352
0
        eos = true;
353
0
    }
354
355
0
    if (eos) {
356
        // If eos, scanner will call _collect_profile_before_close to update profile,
357
        // so we need update_scanner_profile here
358
0
        update_scanner_profile();
359
0
        scanner->mark_to_need_to_close();
360
0
    }
361
0
    scan_task->set_eos(eos);
362
363
0
    VLOG_DEBUG << fmt::format(
364
0
            "Scanner context {} has finished task, cached_block {} current scheduled task is "
365
0
            "{}, eos: {}, status: {}",
366
0
            ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos,
367
0
            status.to_string());
368
369
0
    ctx->push_back_scan_task(scan_task);
370
0
}
371
12
int ScannerScheduler::default_local_scan_thread_num() {
372
12
    return config::doris_scanner_thread_pool_thread_num > 0
373
12
                   ? config::doris_scanner_thread_pool_thread_num
374
12
                   : std::max(48, CpuInfo::num_cores() * 2);
375
12
}
376
7
int ScannerScheduler::default_remote_scan_thread_num() {
377
7
    int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
378
7
                      ? config::doris_max_remote_scanner_thread_pool_thread_num
379
7
                      : std::max(512, CpuInfo::num_cores() * 10);
380
7
    return std::max(num, default_local_scan_thread_num());
381
7
}
382
383
0
int ScannerScheduler::get_remote_scan_thread_queue_size() {
384
0
    return config::doris_remote_scanner_thread_pool_queue_size;
385
0
}
386
387
0
int ScannerScheduler::default_min_active_scan_threads() {
388
0
    return config::min_active_scan_threads > 0
389
0
                   ? config::min_active_scan_threads
390
0
                   : config::min_active_scan_threads = CpuInfo::num_cores() * 2;
391
0
}
392
393
0
int ScannerScheduler::default_min_active_file_scan_threads() {
394
0
    return config::min_active_file_scan_threads > 0
395
0
                   ? config::min_active_file_scan_threads
396
0
                   : config::min_active_file_scan_threads = CpuInfo::num_cores() * 8;
397
0
}
398
399
void ScannerScheduler::_make_sure_virtual_col_is_materialized(
400
0
        const std::shared_ptr<Scanner>& scanner, Block* free_block) {
401
0
#ifndef NDEBUG
402
    // Currently, virtual column can only be used on olap table.
403
0
    std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner);
404
0
    if (olap_scanner == nullptr) {
405
0
        return;
406
0
    }
407
408
0
    if (free_block->rows() == 0) {
409
0
        return;
410
0
    }
411
412
0
    size_t idx = 0;
413
0
    for (const auto& entry : *free_block) {
414
        // Virtual column must be materialized on the end of SegmentIterator's next batch method.
415
0
        const ColumnNothing* column_nothing =
416
0
                check_and_get_column<ColumnNothing>(entry.column.get());
417
0
        if (column_nothing == nullptr) {
418
0
            idx++;
419
0
            continue;
420
0
        }
421
422
0
        std::vector<std::string> vcid_to_idx;
423
424
0
        for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
425
0
            vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
426
0
        }
427
428
0
        std::string error_msg = fmt::format(
429
0
                "Column in idx {} is nothing, block columns {}, normal_columns "
430
0
                "{}, "
431
0
                "vir_cid_to_idx_in_block_msg {}",
432
0
                idx, free_block->columns(), olap_scanner->_return_columns.size(),
433
0
                fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")));
434
0
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
435
0
    }
436
0
#endif
437
0
}
438
439
0
Result<SharedListenableFuture<Void>> ScannerSplitRunner::process_for(std::chrono::nanoseconds) {
440
0
    _started = true;
441
0
    bool is_completed = _scan_func();
442
0
    if (is_completed) {
443
0
        _completion_future.set_value(Void {});
444
0
    }
445
0
    return SharedListenableFuture<Void>::create_ready(Void {});
446
0
}
447
448
0
bool ScannerSplitRunner::is_finished() {
449
0
    return _completion_future.is_done();
450
0
}
451
452
0
Status ScannerSplitRunner::finished_status() {
453
0
    return _completion_future.get_status();
454
0
}
455
456
0
bool ScannerSplitRunner::is_started() const {
457
0
    return _started.load();
458
0
}
459
460
0
bool ScannerSplitRunner::is_auto_reschedule() const {
461
0
    return false;
462
0
}
463
464
} // namespace doris