Coverage Report

Created: 2026-03-19 00:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_scheduler.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
#include <functional>
23
#include <list>
24
#include <memory>
25
#include <ostream>
26
#include <string>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "exec/pipeline/pipeline_task.h"
36
#include "exec/scan/file_scanner.h"
37
#include "exec/scan/olap_scanner.h" // IWYU pragma: keep
38
#include "exec/scan/scan_node.h"
39
#include "exec/scan/scanner.h"
40
#include "exec/scan/scanner_context.h"
41
#include "runtime/exec_env.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "storage/tablet/tablet.h"
46
#include "util/async_io.h" // IWYU pragma: keep
47
#include "util/cpu_info.h"
48
#include "util/defer_op.h"
49
#include "util/thread.h"
50
#include "util/threadpool.h"
51
52
namespace doris {
53
54
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
55
1.46M
                                std::shared_ptr<ScanTask> scan_task) {
56
1.46M
    if (ctx->done()) {
57
0
        return Status::OK();
58
0
    }
59
1.46M
    auto task_lock = ctx->task_exec_ctx();
60
1.46M
    if (task_lock == nullptr) {
61
18
        LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
62
18
                  << " maybe finished";
63
18
        return Status::OK();
64
18
    }
65
1.46M
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
66
1.46M
    if (scanner_delegate == nullptr) {
67
0
        return Status::OK();
68
0
    }
69
70
1.46M
    scanner_delegate->_scanner->start_wait_worker_timer();
71
1.46M
    TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
72
1.46M
    auto sumbit_task = [&]() {
73
1.46M
        auto work_func = [scanner_ref = scan_task, ctx]() {
74
1.45M
            auto status = [&] {
75
1.45M
                RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
76
1.45M
                return Status::OK();
77
1.45M
            }();
78
79
1.45M
            if (!status.ok()) {
80
0
                scanner_ref->set_status(status);
81
0
                ctx->push_back_scan_task(scanner_ref);
82
0
                return true;
83
0
            }
84
1.45M
            return scanner_ref->is_eos();
85
1.45M
        };
86
1.46M
        SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
87
1.46M
        return this->submit_scan_task(simple_scan_task);
88
1.46M
    };
89
90
1.46M
    Status submit_status = sumbit_task();
91
1.46M
    if (!submit_status.ok()) {
92
        // User will see TooManyTasks error. It looks like a more reasonable error.
93
0
        Status scan_task_status = Status::TooManyTasks(
94
0
                "Failed to submit scanner to scanner pool reason:" +
95
0
                std::string(submit_status.msg()) + "|type:" + std::to_string(type));
96
0
        scan_task->set_status(scan_task_status);
97
0
        return scan_task_status;
98
0
    }
99
100
1.46M
    return Status::OK();
101
1.46M
}
102
103
void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx,
104
0
                                   const Status& st, size_t reserve_size) {
105
0
    ctx->clear_free_blocks();
106
0
    auto* local_state = ctx->local_state();
107
108
0
    auto debug_msg = fmt::format(
109
0
            "Query: {} , scanner try to reserve: {}, operator name {}, "
110
0
            "operator "
111
0
            "id: {}, "
112
0
            "task id: "
113
0
            "{}, failed: {}",
114
0
            print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size),
115
0
            local_state->get_name(), local_state->parent()->node_id(), state->task_id(),
116
0
            st.to_string());
117
    // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
118
0
    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
119
0
        debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str());
120
0
    }
121
0
    VLOG_DEBUG << debug_msg;
122
123
0
    state->get_query_ctx()->set_low_memory_mode();
124
0
}
125
126
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
127
1.45M
                                     std::shared_ptr<ScanTask> scan_task) {
128
1.45M
    auto task_lock = ctx->task_exec_ctx();
129
1.45M
    if (task_lock == nullptr) {
130
0
        return;
131
0
    }
132
1.45M
    SCOPED_ATTACH_TASK(ctx->state());
133
134
1.45M
    ctx->update_peak_running_scanner(1);
135
1.45M
    Defer defer([&] { ctx->update_peak_running_scanner(-1); });
136
137
1.45M
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
138
1.45M
    if (scanner_delegate == nullptr) {
139
3
        return;
140
3
    }
141
142
1.45M
    ScannerSPtr& scanner = scanner_delegate->_scanner;
143
    // for cpu hard limit, thread name should not be reset
144
1.45M
    if (ctx->_should_reset_thread_name) {
145
0
        Thread::set_self_name("_scanner_scan");
146
0
    }
147
148
1.45M
#ifndef __APPLE__
149
    // The configuration item is used to lower the priority of the scanner thread,
150
    // typically employed to ensure CPU scheduling for write operations.
151
1.45M
    if (config::scan_thread_nice_value != 0 && scanner->get_name() != FileScanner::NAME) {
152
0
        Thread::set_thread_nice_value();
153
0
    }
154
1.45M
#endif
155
156
    // we set and get counter according below order, to make sure the counter is updated before get_block, and the time of get_block is recorded in the counter.
157
    // 1. update_wait_worker_timer to make sure the time of waiting for worker thread is recorded in the timer
158
    // 2. start_scan_cpu_timer to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
159
    // 3. update_scan_cpu_timer when defer, to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
160
    // 4. start_wait_worker_timer when defer, to make sure the time of waiting for worker thread is recorded in the timer
161
162
1.45M
    MonotonicStopWatch max_run_time_watch;
163
1.45M
    max_run_time_watch.start();
164
1.45M
    scanner->update_wait_worker_timer();
165
1.45M
    scanner->start_scan_cpu_timer();
166
167
1.45M
    bool need_update_profile = true;
168
1.45M
    auto update_scanner_profile = [&]() {
169
1.44M
        if (need_update_profile) {
170
1.44M
            scanner->update_scan_cpu_timer();
171
1.44M
            scanner->update_realtime_counters();
172
1.44M
            need_update_profile = false;
173
1.44M
        }
174
1.44M
    };
175
176
1.45M
    Status status = Status::OK();
177
1.45M
    bool eos = false;
178
1.45M
    Defer defer_scanner([&] {
179
1.45M
        if (status.ok() && !eos) {
180
            // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again.
181
9.17k
            scanner->start_wait_worker_timer();
182
9.17k
        }
183
1.45M
    });
184
185
1.45M
    ASSIGN_STATUS_IF_CATCH_EXCEPTION(
186
1.45M
            RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
187
            // scanner->open may alloc plenty amount of memory(read blocks of data),
188
            // so better to also check low memory and clear free blocks here.
189
1.45M
            if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
190
191
1.45M
            if (!scanner->has_prepared()) {
192
1.45M
                status = scanner->prepare();
193
1.45M
                if (!status.ok()) {
194
1.45M
                    eos = true;
195
1.45M
                }
196
1.45M
            }
197
198
1.45M
            if (!eos && !scanner->is_open()) {
199
1.45M
                status = scanner->open(state);
200
1.45M
                if (!status.ok()) {
201
1.45M
                    eos = true;
202
1.45M
                }
203
1.45M
                scanner->set_opened();
204
1.45M
            }
205
206
1.45M
            Status rf_status = scanner->try_append_late_arrival_runtime_filter();
207
1.45M
            if (!rf_status.ok()) {
208
1.45M
                LOG(WARNING) << "Failed to append late arrival runtime filter: "
209
1.45M
                             << rf_status.to_string();
210
1.45M
            }
211
212
1.45M
            size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
213
1.45M
            if (ctx->low_memory_mode()) {
214
1.45M
                ctx->clear_free_blocks();
215
1.45M
                if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
216
1.45M
                    raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
217
1.45M
                }
218
1.45M
            }
219
220
1.45M
            size_t raw_bytes_read = 0;
221
1.45M
            bool first_read = true; int64_t limit = scanner->limit();
222
            // If the first block is full, then it is true. Or the first block + second block > batch_size
223
1.45M
            bool has_first_full_block = false;
224
225
            // During low memory mode, every scan task will return at most 2 block to reduce memory usage.
226
1.45M
            while (!eos && raw_bytes_read < raw_bytes_threshold &&
227
1.45M
                   (!ctx->low_memory_mode() || !has_first_full_block) &&
228
1.45M
                   (!has_first_full_block || doris::thread_context()
229
1.45M
                                                     ->thread_mem_tracker_mgr->limiter_mem_tracker()
230
1.45M
                                                     ->check_limit(1))) {
231
1.45M
                if (UNLIKELY(ctx->done())) {
232
1.45M
                    eos = true;
233
1.45M
                    break;
234
1.45M
                }
235
1.45M
                if (max_run_time_watch.elapsed_time() >
236
1.45M
                    config::doris_scanner_max_run_time_ms * 1e6) {
237
1.45M
                    break;
238
1.45M
                }
239
1.45M
                DEFER_RELEASE_RESERVED();
240
1.45M
                BlockUPtr free_block;
241
1.45M
                if (first_read) {
242
1.45M
                    free_block = ctx->get_free_block(first_read);
243
1.45M
                } else {
244
1.45M
                    if (state->get_query_ctx()
245
1.45M
                                ->resource_ctx()
246
1.45M
                                ->task_controller()
247
1.45M
                                ->is_enable_reserve_memory()) {
248
1.45M
                        size_t block_avg_bytes = scanner->get_block_avg_bytes();
249
1.45M
                        auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(
250
1.45M
                                block_avg_bytes);
251
1.45M
                        if (!st.ok()) {
252
1.45M
                            handle_reserve_memory_failure(state, ctx, st, block_avg_bytes);
253
1.45M
                            break;
254
1.45M
                        }
255
1.45M
                    }
256
1.45M
                    free_block = ctx->get_free_block(first_read);
257
1.45M
                }
258
1.45M
                if (free_block == nullptr) {
259
1.45M
                    break;
260
1.45M
                }
261
                // We got a new created block or a reused block.
262
1.45M
                status = scanner->get_block_after_projects(state, free_block.get(), &eos);
263
1.45M
                first_read = false;
264
1.45M
                if (!status.ok()) {
265
1.45M
                    LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string();
266
1.45M
                    break;
267
1.45M
                }
268
                // Check column type only after block is read successfully.
269
                // Or it may cause a crash when the block is not normal.
270
1.45M
                _make_sure_virtual_col_is_materialized(scanner, free_block.get());
271
                // Projection will truncate useless columns, makes block size change.
272
1.45M
                auto free_block_bytes = free_block->allocated_bytes();
273
1.45M
                raw_bytes_read += free_block_bytes;
274
1.45M
                if (!scan_task->cached_blocks.empty() &&
275
1.45M
                    scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
276
1.45M
                            ctx->batch_size()) {
277
1.45M
                    size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
278
1.45M
                    MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
279
1.45M
                    status = mutable_block.merge(*free_block);
280
1.45M
                    if (!status.ok()) {
281
1.45M
                        LOG(WARNING) << "Block merge failed: " << status.to_string();
282
1.45M
                        break;
283
1.45M
                    }
284
1.45M
                    scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
285
1.45M
                    scan_task->cached_blocks.back().first.get()->set_columns(
286
1.45M
                            std::move(mutable_block.mutable_columns()));
287
288
                    // Return block succeed or not, this free_block is not used by this scan task any more.
289
                    // If block can be reused, its memory usage will be added back.
290
1.45M
                    ctx->return_free_block(std::move(free_block));
291
1.45M
                    ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
292
1.45M
                                         block_size);
293
1.45M
                } else {
294
1.45M
                    if (!scan_task->cached_blocks.empty()) {
295
1.45M
                        has_first_full_block = true;
296
1.45M
                    }
297
1.45M
                    ctx->inc_block_usage(free_block->allocated_bytes());
298
1.45M
                    scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
299
1.45M
                }
300
301
1.45M
                if (limit > 0 && limit < ctx->batch_size()) {
302
                    // If this scanner has limit, and less than batch size,
303
                    // return immediately and no need to wait raw_bytes_threshold.
304
                    // This can save time that each scanner may only return a small number of rows,
305
                    // but rows are enough from all scanners.
306
                    // If not break, the query like "select * from tbl where id=1 limit 10"
307
                    // may scan a lot data when the "id=1"'s filter ratio is high.
308
                    // If limit is larger than batch size, this rule is skipped,
309
                    // to avoid user specify a large limit and causing too much small blocks.
310
1.45M
                    break;
311
1.45M
                }
312
313
1.45M
                if (scan_task->cached_blocks.back().first->rows() > 0) {
314
1.45M
                    auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() +
315
1.45M
                                            scan_task->cached_blocks.back().first->rows() - 1) /
316
1.45M
                                           scan_task->cached_blocks.back().first->rows() *
317
1.45M
                                           ctx->batch_size();
318
1.45M
                    scanner->update_block_avg_bytes(block_avg_bytes);
319
1.45M
                }
320
1.45M
                if (ctx->low_memory_mode()) {
321
1.45M
                    ctx->clear_free_blocks();
322
1.45M
                    if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
323
1.45M
                        raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
324
1.45M
                    }
325
1.45M
                }
326
1.45M
            } // end for while
327
328
1.45M
            if (UNLIKELY(!status.ok())) {
329
1.45M
                scan_task->set_status(status);
330
1.45M
                eos = true;
331
1.45M
            },
332
1.45M
            status);
333
334
1.45M
    if (UNLIKELY(!status.ok())) {
335
1.40k
        scan_task->set_status(status);
336
1.40k
        eos = true;
337
1.40k
    }
338
339
1.45M
    if (eos) {
340
        // If eos, scanner will call _collect_profile_before_close to update profile,
341
        // so we need update_scanner_profile here
342
1.44M
        update_scanner_profile();
343
1.44M
        scanner->mark_to_need_to_close();
344
1.44M
    }
345
1.45M
    scan_task->set_eos(eos);
346
347
18.4E
    VLOG_DEBUG << fmt::format(
348
18.4E
            "Scanner context {} has finished task, cached_block {} current scheduled task is "
349
18.4E
            "{}, eos: {}, status: {}",
350
18.4E
            ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos,
351
18.4E
            status.to_string());
352
353
1.45M
    ctx->push_back_scan_task(scan_task);
354
1.45M
}
355
80.6k
int ScannerScheduler::default_local_scan_thread_num() {
356
80.6k
    return config::doris_scanner_thread_pool_thread_num > 0
357
80.6k
                   ? config::doris_scanner_thread_pool_thread_num
358
80.6k
                   : std::max(48, CpuInfo::num_cores() * 2);
359
80.6k
}
360
74.4k
int ScannerScheduler::default_remote_scan_thread_num() {
361
74.4k
    int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
362
74.4k
                      ? config::doris_max_remote_scanner_thread_pool_thread_num
363
74.4k
                      : std::max(512, CpuInfo::num_cores() * 10);
364
74.4k
    return std::max(num, default_local_scan_thread_num());
365
74.4k
}
366
367
37
int ScannerScheduler::get_remote_scan_thread_queue_size() {
368
37
    return config::doris_remote_scanner_thread_pool_queue_size;
369
37
}
370
371
6.21k
int ScannerScheduler::default_min_active_scan_threads() {
372
6.21k
    return config::min_active_scan_threads > 0
373
6.21k
                   ? config::min_active_scan_threads
374
6.21k
                   : config::min_active_scan_threads = CpuInfo::num_cores() * 2;
375
6.21k
}
376
377
6.21k
int ScannerScheduler::default_min_active_file_scan_threads() {
378
6.21k
    return config::min_active_file_scan_threads > 0
379
6.21k
                   ? config::min_active_file_scan_threads
380
6.21k
                   : config::min_active_file_scan_threads = CpuInfo::num_cores() * 8;
381
6.21k
}
382
383
void ScannerScheduler::_make_sure_virtual_col_is_materialized(
384
1.55M
        const std::shared_ptr<Scanner>& scanner, Block* free_block) {
385
1.55M
#ifndef NDEBUG
386
    // Currently, virtual column can only be used on olap table.
387
1.55M
    std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner);
388
1.55M
    if (olap_scanner == nullptr) {
389
170k
        return;
390
170k
    }
391
392
1.38M
    if (free_block->rows() == 0) {
393
1.15M
        return;
394
1.15M
    }
395
396
234k
    size_t idx = 0;
397
727k
    for (const auto& entry : *free_block) {
398
        // Virtual column must be materialized on the end of SegmentIterator's next batch method.
399
727k
        const ColumnNothing* column_nothing =
400
727k
                check_and_get_column<ColumnNothing>(entry.column.get());
401
727k
        if (column_nothing == nullptr) {
402
727k
            idx++;
403
727k
            continue;
404
727k
        }
405
406
18.4E
        std::vector<std::string> vcid_to_idx;
407
408
18.4E
        for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
409
0
            vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
410
0
        }
411
412
18.4E
        std::string error_msg = fmt::format(
413
18.4E
                "Column in idx {} is nothing, block columns {}, normal_columns "
414
18.4E
                "{}, "
415
18.4E
                "vir_cid_to_idx_in_block_msg {}",
416
18.4E
                idx, free_block->columns(), olap_scanner->_return_columns.size(),
417
18.4E
                fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")));
418
18.4E
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
419
727k
    }
420
234k
#endif
421
234k
}
422
423
1.46M
Result<SharedListenableFuture<Void>> ScannerSplitRunner::process_for(std::chrono::nanoseconds) {
424
1.46M
    _started = true;
425
1.46M
    bool is_completed = _scan_func();
426
1.46M
    if (is_completed) {
427
1.45M
        _completion_future.set_value(Void {});
428
1.45M
    }
429
1.46M
    return SharedListenableFuture<Void>::create_ready(Void {});
430
1.46M
}
431
432
1.46M
bool ScannerSplitRunner::is_finished() {
433
1.46M
    return _completion_future.is_done();
434
1.46M
}
435
436
1.45M
Status ScannerSplitRunner::finished_status() {
437
1.45M
    return _completion_future.get_status();
438
1.45M
}
439
440
0
bool ScannerSplitRunner::is_started() const {
441
0
    return _started.load();
442
0
}
443
444
9.16k
bool ScannerSplitRunner::is_auto_reschedule() const {
445
9.16k
    return false;
446
9.16k
}
447
448
} // namespace doris