Coverage Report

Created: 2026-03-13 12:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_scheduler.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
#include <functional>
23
#include <list>
24
#include <memory>
25
#include <ostream>
26
#include <string>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "exec/pipeline/pipeline_task.h"
36
#include "exec/scan/file_scanner.h"
37
#include "exec/scan/olap_scanner.h" // IWYU pragma: keep
38
#include "exec/scan/scan_node.h"
39
#include "exec/scan/scanner.h"
40
#include "exec/scan/scanner_context.h"
41
#include "runtime/exec_env.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "storage/tablet/tablet.h"
46
#include "util/async_io.h" // IWYU pragma: keep
47
#include "util/cpu_info.h"
48
#include "util/defer_op.h"
49
#include "util/thread.h"
50
#include "util/threadpool.h"
51
52
namespace doris {
53
54
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
55
977k
                                std::shared_ptr<ScanTask> scan_task) {
56
977k
    if (ctx->done()) {
57
0
        return Status::OK();
58
0
    }
59
977k
    auto task_lock = ctx->task_exec_ctx();
60
977k
    if (task_lock == nullptr) {
61
18
        LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
62
18
                  << " maybe finished";
63
18
        return Status::OK();
64
18
    }
65
977k
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
66
977k
    if (scanner_delegate == nullptr) {
67
0
        return Status::OK();
68
0
    }
69
70
977k
    scanner_delegate->_scanner->start_wait_worker_timer();
71
977k
    TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
72
977k
    auto sumbit_task = [&]() {
73
977k
        auto work_func = [scanner_ref = scan_task, ctx]() {
74
976k
            auto status = [&] {
75
976k
                RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
76
976k
                return Status::OK();
77
976k
            }();
78
79
976k
            if (!status.ok()) {
80
0
                scanner_ref->set_status(status);
81
0
                ctx->push_back_scan_task(scanner_ref);
82
0
                return true;
83
0
            }
84
976k
            return scanner_ref->is_eos();
85
976k
        };
86
977k
        SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
87
977k
        return this->submit_scan_task(simple_scan_task);
88
977k
    };
89
90
977k
    Status submit_status = sumbit_task();
91
977k
    if (!submit_status.ok()) {
92
        // User will see TooManyTasks error. It looks like a more reasonable error.
93
0
        Status scan_task_status = Status::TooManyTasks(
94
0
                "Failed to submit scanner to scanner pool reason:" +
95
0
                std::string(submit_status.msg()) + "|type:" + std::to_string(type));
96
0
        scan_task->set_status(scan_task_status);
97
0
        return scan_task_status;
98
0
    }
99
100
977k
    return Status::OK();
101
977k
}
102
103
void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx,
104
0
                                   const Status& st, size_t reserve_size) {
105
0
    ctx->clear_free_blocks();
106
0
    auto* local_state = ctx->local_state();
107
108
0
    auto debug_msg = fmt::format(
109
0
            "Query: {} , scanner try to reserve: {}, operator name {}, "
110
0
            "operator "
111
0
            "id: {}, "
112
0
            "task id: "
113
0
            "{}, failed: {}",
114
0
            print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size),
115
0
            local_state->get_name(), local_state->parent()->node_id(), state->task_id(),
116
0
            st.to_string());
117
    // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
118
0
    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
119
0
        debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str());
120
0
    }
121
0
    VLOG_DEBUG << debug_msg;
122
123
0
    state->get_query_ctx()->set_low_memory_mode();
124
0
}
125
126
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
127
977k
                                     std::shared_ptr<ScanTask> scan_task) {
128
977k
    auto task_lock = ctx->task_exec_ctx();
129
977k
    if (task_lock == nullptr) {
130
0
        return;
131
0
    }
132
977k
    SCOPED_ATTACH_TASK(ctx->state());
133
134
977k
    ctx->update_peak_running_scanner(1);
135
977k
    Defer defer([&] { ctx->update_peak_running_scanner(-1); });
136
137
977k
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
138
977k
    if (scanner_delegate == nullptr) {
139
1
        return;
140
1
    }
141
142
977k
    ScannerSPtr& scanner = scanner_delegate->_scanner;
143
    // for cpu hard limit, thread name should not be reset
144
977k
    if (ctx->_should_reset_thread_name) {
145
0
        Thread::set_self_name("_scanner_scan");
146
0
    }
147
148
977k
#ifndef __APPLE__
149
    // The configuration item is used to lower the priority of the scanner thread,
150
    // typically employed to ensure CPU scheduling for write operations.
151
977k
    if (config::scan_thread_nice_value != 0 && scanner->get_name() != FileScanner::NAME) {
152
0
        Thread::set_thread_nice_value();
153
0
    }
154
977k
#endif
155
156
    // we set and get counter according below order, to make sure the counter is updated before get_block, and the time of get_block is recorded in the counter.
157
    // 1. update_wait_worker_timer to make sure the time of waiting for worker thread is recorded in the timer
158
    // 2. start_scan_cpu_timer to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
159
    // 3. update_scan_cpu_timer when defer, to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
160
    // 4. start_wait_worker_timer when defer, to make sure the time of waiting for worker thread is recorded in the timer
161
162
977k
    MonotonicStopWatch max_run_time_watch;
163
977k
    max_run_time_watch.start();
164
977k
    scanner->update_wait_worker_timer();
165
977k
    scanner->start_scan_cpu_timer();
166
167
977k
    bool need_update_profile = true;
168
977k
    auto update_scanner_profile = [&]() {
169
969k
        if (need_update_profile) {
170
969k
            scanner->update_scan_cpu_timer();
171
969k
            scanner->update_realtime_counters();
172
969k
            need_update_profile = false;
173
969k
        }
174
969k
    };
175
176
977k
    Status status = Status::OK();
177
977k
    bool eos = false;
178
977k
    Defer defer_scanner([&] {
179
976k
        if (status.ok() && !eos) {
180
            // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again.
181
5.24k
            scanner->start_wait_worker_timer();
182
5.24k
        }
183
976k
    });
184
185
977k
    ASSIGN_STATUS_IF_CATCH_EXCEPTION(
186
977k
            RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
187
            // scanner->open may alloc plenty amount of memory(read blocks of data),
188
            // so better to also check low memory and clear free blocks here.
189
977k
            if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
190
191
977k
            if (!scanner->has_prepared()) {
192
977k
                status = scanner->prepare();
193
977k
                if (!status.ok()) {
194
977k
                    eos = true;
195
977k
                }
196
977k
            }
197
198
977k
            if (!eos && !scanner->is_open()) {
199
977k
                status = scanner->open(state);
200
977k
                if (!status.ok()) {
201
977k
                    eos = true;
202
977k
                }
203
977k
                scanner->set_opened();
204
977k
            }
205
206
977k
            Status rf_status = scanner->try_append_late_arrival_runtime_filter();
207
977k
            if (!rf_status.ok()) {
208
977k
                LOG(WARNING) << "Failed to append late arrival runtime filter: "
209
977k
                             << rf_status.to_string();
210
977k
            }
211
212
977k
            size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
213
977k
            if (ctx->low_memory_mode()) {
214
977k
                ctx->clear_free_blocks();
215
977k
                if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
216
977k
                    raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
217
977k
                }
218
977k
            }
219
220
977k
            size_t raw_bytes_read = 0;
221
977k
            bool first_read = true; int64_t limit = scanner->limit();
222
            // If the first block is full, then it is true. Or the first block + second block > batch_size
223
977k
            bool has_first_full_block = false;
224
225
            // During low memory mode, every scan task will return at most 2 block to reduce memory usage.
226
977k
            while (!eos && raw_bytes_read < raw_bytes_threshold &&
227
977k
                   (!ctx->low_memory_mode() || !has_first_full_block) &&
228
977k
                   (!has_first_full_block || doris::thread_context()
229
977k
                                                     ->thread_mem_tracker_mgr->limiter_mem_tracker()
230
977k
                                                     ->check_limit(1))) {
231
977k
                if (UNLIKELY(ctx->done())) {
232
977k
                    eos = true;
233
977k
                    break;
234
977k
                }
235
977k
                if (max_run_time_watch.elapsed_time() >
236
977k
                    config::doris_scanner_max_run_time_ms * 1e6) {
237
977k
                    break;
238
977k
                }
239
977k
                DEFER_RELEASE_RESERVED();
240
977k
                BlockUPtr free_block;
241
977k
                if (first_read) {
242
977k
                    free_block = ctx->get_free_block(first_read);
243
977k
                } else {
244
977k
                    if (state->get_query_ctx()
245
977k
                                ->resource_ctx()
246
977k
                                ->task_controller()
247
977k
                                ->is_enable_reserve_memory()) {
248
977k
                        size_t block_avg_bytes = scanner->get_block_avg_bytes();
249
977k
                        auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(
250
977k
                                block_avg_bytes);
251
977k
                        if (!st.ok()) {
252
977k
                            handle_reserve_memory_failure(state, ctx, st, block_avg_bytes);
253
977k
                            break;
254
977k
                        }
255
977k
                    }
256
977k
                    free_block = ctx->get_free_block(first_read);
257
977k
                }
258
977k
                if (free_block == nullptr) {
259
977k
                    break;
260
977k
                }
261
                // We got a new created block or a reused block.
262
977k
                status = scanner->get_block_after_projects(state, free_block.get(), &eos);
263
977k
                first_read = false;
264
977k
                if (!status.ok()) {
265
977k
                    LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string();
266
977k
                    break;
267
977k
                }
268
                // Check column type only after block is read successfully.
269
                // Or it may cause a crash when the block is not normal.
270
977k
                _make_sure_virtual_col_is_materialized(scanner, free_block.get());
271
                // Projection will truncate useless columns, makes block size change.
272
977k
                auto free_block_bytes = free_block->allocated_bytes();
273
977k
                raw_bytes_read += free_block_bytes;
274
977k
                if (!scan_task->cached_blocks.empty() &&
275
977k
                    scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
276
977k
                            ctx->batch_size()) {
277
977k
                    size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
278
977k
                    MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
279
977k
                    status = mutable_block.merge(*free_block);
280
977k
                    if (!status.ok()) {
281
977k
                        LOG(WARNING) << "Block merge failed: " << status.to_string();
282
977k
                        break;
283
977k
                    }
284
977k
                    scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
285
977k
                    scan_task->cached_blocks.back().first.get()->set_columns(
286
977k
                            std::move(mutable_block.mutable_columns()));
287
288
                    // Return block succeed or not, this free_block is not used by this scan task any more.
289
                    // If block can be reused, its memory usage will be added back.
290
977k
                    ctx->return_free_block(std::move(free_block));
291
977k
                    ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
292
977k
                                         block_size);
293
977k
                } else {
294
977k
                    if (!scan_task->cached_blocks.empty()) {
295
977k
                        has_first_full_block = true;
296
977k
                    }
297
977k
                    ctx->inc_block_usage(free_block->allocated_bytes());
298
977k
                    scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
299
977k
                }
300
301
977k
                if (limit > 0 && limit < ctx->batch_size()) {
302
                    // If this scanner has limit, and less than batch size,
303
                    // return immediately and no need to wait raw_bytes_threshold.
304
                    // This can save time that each scanner may only return a small number of rows,
305
                    // but rows are enough from all scanners.
306
                    // If not break, the query like "select * from tbl where id=1 limit 10"
307
                    // may scan a lot data when the "id=1"'s filter ratio is high.
308
                    // If limit is larger than batch size, this rule is skipped,
309
                    // to avoid user specify a large limit and causing too much small blocks.
310
977k
                    break;
311
977k
                }
312
313
977k
                if (scan_task->cached_blocks.back().first->rows() > 0) {
314
977k
                    auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() +
315
977k
                                            scan_task->cached_blocks.back().first->rows() - 1) /
316
977k
                                           scan_task->cached_blocks.back().first->rows() *
317
977k
                                           ctx->batch_size();
318
977k
                    scanner->update_block_avg_bytes(block_avg_bytes);
319
977k
                }
320
977k
                if (ctx->low_memory_mode()) {
321
977k
                    ctx->clear_free_blocks();
322
977k
                    if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
323
977k
                        raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
324
977k
                    }
325
977k
                }
326
977k
            } // end for while
327
328
977k
            if (UNLIKELY(!status.ok())) {
329
977k
                scan_task->set_status(status);
330
977k
                eos = true;
331
977k
            },
332
977k
            status);
333
334
974k
    if (UNLIKELY(!status.ok())) {
335
1.28k
        scan_task->set_status(status);
336
1.28k
        eos = true;
337
1.28k
    }
338
339
974k
    if (eos) {
340
        // If eos, scanner will call _collect_profile_before_close to update profile,
341
        // so we need update_scanner_profile here
342
969k
        update_scanner_profile();
343
969k
        scanner->mark_to_need_to_close();
344
969k
    }
345
974k
    scan_task->set_eos(eos);
346
347
18.4E
    VLOG_DEBUG << fmt::format(
348
18.4E
            "Scanner context {} has finished task, cached_block {} current scheduled task is "
349
18.4E
            "{}, eos: {}, status: {}",
350
18.4E
            ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos,
351
18.4E
            status.to_string());
352
353
974k
    ctx->push_back_scan_task(scan_task);
354
974k
}
355
16.7k
int ScannerScheduler::default_local_scan_thread_num() {
356
16.7k
    return config::doris_scanner_thread_pool_thread_num > 0
357
16.7k
                   ? config::doris_scanner_thread_pool_thread_num
358
16.7k
                   : std::max(48, CpuInfo::num_cores() * 2);
359
16.7k
}
360
11.0k
int ScannerScheduler::default_remote_scan_thread_num() {
361
11.0k
    int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
362
11.0k
                      ? config::doris_max_remote_scanner_thread_pool_thread_num
363
11.0k
                      : std::max(512, CpuInfo::num_cores() * 10);
364
11.0k
    return std::max(num, default_local_scan_thread_num());
365
11.0k
}
366
367
35
int ScannerScheduler::get_remote_scan_thread_queue_size() {
368
35
    return config::doris_remote_scanner_thread_pool_queue_size;
369
35
}
370
371
5.75k
int ScannerScheduler::default_min_active_scan_threads() {
372
5.75k
    return config::min_active_scan_threads > 0
373
5.75k
                   ? config::min_active_scan_threads
374
5.75k
                   : config::min_active_scan_threads = CpuInfo::num_cores() * 2;
375
5.75k
}
376
377
5.75k
int ScannerScheduler::default_min_active_file_scan_threads() {
378
5.75k
    return config::min_active_file_scan_threads > 0
379
5.75k
                   ? config::min_active_file_scan_threads
380
5.75k
                   : config::min_active_file_scan_threads = CpuInfo::num_cores() * 8;
381
5.75k
}
382
383
void ScannerScheduler::_make_sure_virtual_col_is_materialized(
384
1.01M
        const std::shared_ptr<Scanner>& scanner, Block* free_block) {
385
1.01M
#ifndef NDEBUG
386
    // Currently, virtual column can only be used on olap table.
387
1.01M
    std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner);
388
1.01M
    if (olap_scanner == nullptr) {
389
13.4k
        return;
390
13.4k
    }
391
392
999k
    if (free_block->rows() == 0) {
393
779k
        return;
394
779k
    }
395
396
220k
    size_t idx = 0;
397
691k
    for (const auto& entry : *free_block) {
398
        // Virtual column must be materialized on the end of SegmentIterator's next batch method.
399
691k
        const ColumnNothing* column_nothing =
400
691k
                check_and_get_column<ColumnNothing>(entry.column.get());
401
691k
        if (column_nothing == nullptr) {
402
691k
            idx++;
403
691k
            continue;
404
691k
        }
405
406
18.4E
        std::vector<std::string> vcid_to_idx;
407
408
18.4E
        for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
409
0
            vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
410
0
        }
411
412
18.4E
        std::string error_msg = fmt::format(
413
18.4E
                "Column in idx {} is nothing, block columns {}, normal_columns "
414
18.4E
                "{}, "
415
18.4E
                "vir_cid_to_idx_in_block_msg {}",
416
18.4E
                idx, free_block->columns(), olap_scanner->_return_columns.size(),
417
18.4E
                fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")));
418
18.4E
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
419
691k
    }
420
220k
#endif
421
220k
}
422
423
977k
Result<SharedListenableFuture<Void>> ScannerSplitRunner::process_for(std::chrono::nanoseconds) {
424
977k
    _started = true;
425
977k
    bool is_completed = _scan_func();
426
977k
    if (is_completed) {
427
972k
        _completion_future.set_value(Void {});
428
972k
    }
429
977k
    return SharedListenableFuture<Void>::create_ready(Void {});
430
977k
}
431
432
976k
bool ScannerSplitRunner::is_finished() {
433
976k
    return _completion_future.is_done();
434
976k
}
435
436
971k
Status ScannerSplitRunner::finished_status() {
437
971k
    return _completion_future.get_status();
438
971k
}
439
440
0
bool ScannerSplitRunner::is_started() const {
441
0
    return _started.load();
442
0
}
443
444
5.23k
bool ScannerSplitRunner::is_auto_reschedule() const {
445
5.23k
    return false;
446
5.23k
}
447
448
} // namespace doris