Coverage Report

Created: 2026-03-13 08:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_scheduler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_scheduler.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
#include <functional>
23
#include <list>
24
#include <memory>
25
#include <ostream>
26
#include <string>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "exec/pipeline/pipeline_task.h"
36
#include "exec/scan/file_scanner.h"
37
#include "exec/scan/olap_scanner.h" // IWYU pragma: keep
38
#include "exec/scan/scan_node.h"
39
#include "exec/scan/scanner.h"
40
#include "exec/scan/scanner_context.h"
41
#include "runtime/exec_env.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "storage/tablet/tablet.h"
46
#include "util/async_io.h" // IWYU pragma: keep
47
#include "util/cpu_info.h"
48
#include "util/defer_op.h"
49
#include "util/thread.h"
50
#include "util/threadpool.h"
51
52
namespace doris {
53
54
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
55
971k
                                std::shared_ptr<ScanTask> scan_task) {
56
971k
    if (ctx->done()) {
57
0
        return Status::OK();
58
0
    }
59
971k
    auto task_lock = ctx->task_exec_ctx();
60
971k
    if (task_lock == nullptr) {
61
18
        LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
62
18
                  << " maybe finished";
63
18
        return Status::OK();
64
18
    }
65
971k
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
66
971k
    if (scanner_delegate == nullptr) {
67
0
        return Status::OK();
68
0
    }
69
70
971k
    scanner_delegate->_scanner->start_wait_worker_timer();
71
971k
    TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
72
971k
    auto sumbit_task = [&]() {
73
971k
        auto work_func = [scanner_ref = scan_task, ctx]() {
74
971k
            auto status = [&] {
75
971k
                RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
76
970k
                return Status::OK();
77
971k
            }();
78
79
971k
            if (!status.ok()) {
80
0
                scanner_ref->set_status(status);
81
0
                ctx->push_back_scan_task(scanner_ref);
82
0
                return true;
83
0
            }
84
971k
            return scanner_ref->is_eos();
85
971k
        };
86
971k
        SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
87
971k
        return this->submit_scan_task(simple_scan_task);
88
971k
    };
89
90
971k
    Status submit_status = sumbit_task();
91
971k
    if (!submit_status.ok()) {
92
        // User will see TooManyTasks error. It looks like a more reasonable error.
93
0
        Status scan_task_status = Status::TooManyTasks(
94
0
                "Failed to submit scanner to scanner pool reason:" +
95
0
                std::string(submit_status.msg()) + "|type:" + std::to_string(type));
96
0
        scan_task->set_status(scan_task_status);
97
0
        return scan_task_status;
98
0
    }
99
100
971k
    return Status::OK();
101
971k
}
102
103
void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx,
104
0
                                   const Status& st, size_t reserve_size) {
105
0
    ctx->clear_free_blocks();
106
0
    auto* local_state = ctx->local_state();
107
108
0
    auto debug_msg = fmt::format(
109
0
            "Query: {} , scanner try to reserve: {}, operator name {}, "
110
0
            "operator "
111
0
            "id: {}, "
112
0
            "task id: "
113
0
            "{}, failed: {}",
114
0
            print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size),
115
0
            local_state->get_name(), local_state->parent()->node_id(), state->task_id(),
116
0
            st.to_string());
117
    // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
118
0
    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
119
0
        debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str());
120
0
    }
121
0
    VLOG_DEBUG << debug_msg;
122
123
0
    state->get_query_ctx()->set_low_memory_mode();
124
0
}
125
126
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
127
971k
                                     std::shared_ptr<ScanTask> scan_task) {
128
971k
    auto task_lock = ctx->task_exec_ctx();
129
971k
    if (task_lock == nullptr) {
130
0
        return;
131
0
    }
132
971k
    SCOPED_ATTACH_TASK(ctx->state());
133
134
971k
    ctx->update_peak_running_scanner(1);
135
971k
    Defer defer([&] { ctx->update_peak_running_scanner(-1); });
136
137
971k
    std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
138
971k
    if (scanner_delegate == nullptr) {
139
6
        return;
140
6
    }
141
142
971k
    ScannerSPtr& scanner = scanner_delegate->_scanner;
143
    // for cpu hard limit, thread name should not be reset
144
971k
    if (ctx->_should_reset_thread_name) {
145
0
        Thread::set_self_name("_scanner_scan");
146
0
    }
147
148
971k
#ifndef __APPLE__
149
    // The configuration item is used to lower the priority of the scanner thread,
150
    // typically employed to ensure CPU scheduling for write operations.
151
971k
    if (config::scan_thread_nice_value != 0 && scanner->get_name() != FileScanner::NAME) {
152
0
        Thread::set_thread_nice_value();
153
0
    }
154
971k
#endif
155
156
    // we set and get counter according below order, to make sure the counter is updated before get_block, and the time of get_block is recorded in the counter.
157
    // 1. update_wait_worker_timer to make sure the time of waiting for worker thread is recorded in the timer
158
    // 2. start_scan_cpu_timer to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
159
    // 3. update_scan_cpu_timer when defer, to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner
160
    // 4. start_wait_worker_timer when defer, to make sure the time of waiting for worker thread is recorded in the timer
161
162
971k
    MonotonicStopWatch max_run_time_watch;
163
971k
    max_run_time_watch.start();
164
971k
    scanner->update_wait_worker_timer();
165
971k
    scanner->start_scan_cpu_timer();
166
167
971k
    bool need_update_profile = true;
168
971k
    auto update_scanner_profile = [&]() {
169
963k
        if (need_update_profile) {
170
963k
            scanner->update_scan_cpu_timer();
171
963k
            scanner->update_realtime_counters();
172
963k
            need_update_profile = false;
173
963k
        }
174
963k
    };
175
176
971k
    Status status = Status::OK();
177
971k
    bool eos = false;
178
971k
    Defer defer_scanner([&] {
179
970k
        if (status.ok() && !eos) {
180
            // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again.
181
4.32k
            scanner->start_wait_worker_timer();
182
4.32k
        }
183
970k
    });
184
185
971k
    ASSIGN_STATUS_IF_CATCH_EXCEPTION(
186
971k
            RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
187
            // scanner->open may alloc plenty amount of memory(read blocks of data),
188
            // so better to also check low memory and clear free blocks here.
189
971k
            if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
190
191
971k
            if (!scanner->has_prepared()) {
192
971k
                status = scanner->prepare();
193
971k
                if (!status.ok()) {
194
971k
                    eos = true;
195
971k
                }
196
971k
            }
197
198
971k
            if (!eos && !scanner->is_open()) {
199
971k
                status = scanner->open(state);
200
971k
                if (!status.ok()) {
201
971k
                    eos = true;
202
971k
                }
203
971k
                scanner->set_opened();
204
971k
            }
205
206
971k
            Status rf_status = scanner->try_append_late_arrival_runtime_filter();
207
971k
            if (!rf_status.ok()) {
208
971k
                LOG(WARNING) << "Failed to append late arrival runtime filter: "
209
971k
                             << rf_status.to_string();
210
971k
            }
211
212
971k
            size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
213
971k
            if (ctx->low_memory_mode()) {
214
971k
                ctx->clear_free_blocks();
215
971k
                if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
216
971k
                    raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
217
971k
                }
218
971k
            }
219
220
971k
            size_t raw_bytes_read = 0;
221
971k
            bool first_read = true; int64_t limit = scanner->limit();
222
            // If the first block is full, then it is true. Or the first block + second block > batch_size
223
971k
            bool has_first_full_block = false;
224
225
            // During low memory mode, every scan task will return at most 2 block to reduce memory usage.
226
971k
            while (!eos && raw_bytes_read < raw_bytes_threshold &&
227
971k
                   (!ctx->low_memory_mode() || !has_first_full_block) &&
228
971k
                   (!has_first_full_block || doris::thread_context()
229
971k
                                                     ->thread_mem_tracker_mgr->limiter_mem_tracker()
230
971k
                                                     ->check_limit(1))) {
231
971k
                if (UNLIKELY(ctx->done())) {
232
971k
                    eos = true;
233
971k
                    break;
234
971k
                }
235
971k
                if (max_run_time_watch.elapsed_time() >
236
971k
                    config::doris_scanner_max_run_time_ms * 1e6) {
237
971k
                    break;
238
971k
                }
239
971k
                DEFER_RELEASE_RESERVED();
240
971k
                BlockUPtr free_block;
241
971k
                if (first_read) {
242
971k
                    free_block = ctx->get_free_block(first_read);
243
971k
                } else {
244
971k
                    if (state->get_query_ctx()
245
971k
                                ->resource_ctx()
246
971k
                                ->task_controller()
247
971k
                                ->is_enable_reserve_memory()) {
248
971k
                        size_t block_avg_bytes = scanner->get_block_avg_bytes();
249
971k
                        auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(
250
971k
                                block_avg_bytes);
251
971k
                        if (!st.ok()) {
252
971k
                            handle_reserve_memory_failure(state, ctx, st, block_avg_bytes);
253
971k
                            break;
254
971k
                        }
255
971k
                    }
256
971k
                    free_block = ctx->get_free_block(first_read);
257
971k
                }
258
971k
                if (free_block == nullptr) {
259
971k
                    break;
260
971k
                }
261
                // We got a new created block or a reused block.
262
971k
                status = scanner->get_block_after_projects(state, free_block.get(), &eos);
263
971k
                first_read = false;
264
971k
                if (!status.ok()) {
265
971k
                    LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string();
266
971k
                    break;
267
971k
                }
268
                // Check column type only after block is read successfully.
269
                // Or it may cause a crash when the block is not normal.
270
971k
                _make_sure_virtual_col_is_materialized(scanner, free_block.get());
271
                // Projection will truncate useless columns, makes block size change.
272
971k
                auto free_block_bytes = free_block->allocated_bytes();
273
971k
                raw_bytes_read += free_block_bytes;
274
971k
                if (!scan_task->cached_blocks.empty() &&
275
971k
                    scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
276
971k
                            ctx->batch_size()) {
277
971k
                    size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
278
971k
                    MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
279
971k
                    status = mutable_block.merge(*free_block);
280
971k
                    if (!status.ok()) {
281
971k
                        LOG(WARNING) << "Block merge failed: " << status.to_string();
282
971k
                        break;
283
971k
                    }
284
971k
                    scan_task->cached_blocks.back().second = mutable_block.allocated_bytes();
285
971k
                    scan_task->cached_blocks.back().first.get()->set_columns(
286
971k
                            std::move(mutable_block.mutable_columns()));
287
288
                    // Return block succeed or not, this free_block is not used by this scan task any more.
289
                    // If block can be reused, its memory usage will be added back.
290
971k
                    ctx->return_free_block(std::move(free_block));
291
971k
                    ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
292
971k
                                         block_size);
293
971k
                } else {
294
971k
                    if (!scan_task->cached_blocks.empty()) {
295
971k
                        has_first_full_block = true;
296
971k
                    }
297
971k
                    ctx->inc_block_usage(free_block->allocated_bytes());
298
971k
                    scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
299
971k
                }
300
301
971k
                if (limit > 0 && limit < ctx->batch_size()) {
302
                    // If this scanner has limit, and less than batch size,
303
                    // return immediately and no need to wait raw_bytes_threshold.
304
                    // This can save time that each scanner may only return a small number of rows,
305
                    // but rows are enough from all scanners.
306
                    // If not break, the query like "select * from tbl where id=1 limit 10"
307
                    // may scan a lot data when the "id=1"'s filter ratio is high.
308
                    // If limit is larger than batch size, this rule is skipped,
309
                    // to avoid user specify a large limit and causing too much small blocks.
310
971k
                    break;
311
971k
                }
312
313
971k
                if (scan_task->cached_blocks.back().first->rows() > 0) {
314
971k
                    auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() +
315
971k
                                            scan_task->cached_blocks.back().first->rows() - 1) /
316
971k
                                           scan_task->cached_blocks.back().first->rows() *
317
971k
                                           ctx->batch_size();
318
971k
                    scanner->update_block_avg_bytes(block_avg_bytes);
319
971k
                }
320
971k
                if (ctx->low_memory_mode()) {
321
971k
                    ctx->clear_free_blocks();
322
971k
                    if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) {
323
971k
                        raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner();
324
971k
                    }
325
971k
                }
326
971k
            } // end for while
327
328
971k
            if (UNLIKELY(!status.ok())) {
329
971k
                scan_task->set_status(status);
330
971k
                eos = true;
331
971k
            },
332
971k
            status);
333
334
967k
    if (UNLIKELY(!status.ok())) {
335
1.29k
        scan_task->set_status(status);
336
1.29k
        eos = true;
337
1.29k
    }
338
339
967k
    if (eos) {
340
        // If eos, scanner will call _collect_profile_before_close to update profile,
341
        // so we need update_scanner_profile here
342
963k
        update_scanner_profile();
343
963k
        scanner->mark_to_need_to_close();
344
963k
    }
345
967k
    scan_task->set_eos(eos);
346
347
18.4E
    VLOG_DEBUG << fmt::format(
348
18.4E
            "Scanner context {} has finished task, cached_block {} current scheduled task is "
349
18.4E
            "{}, eos: {}, status: {}",
350
18.4E
            ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos,
351
18.4E
            status.to_string());
352
353
967k
    ctx->push_back_scan_task(scan_task);
354
967k
}
355
17.0k
int ScannerScheduler::default_local_scan_thread_num() {
356
17.0k
    return config::doris_scanner_thread_pool_thread_num > 0
357
17.0k
                   ? config::doris_scanner_thread_pool_thread_num
358
17.0k
                   : std::max(48, CpuInfo::num_cores() * 2);
359
17.0k
}
360
11.1k
int ScannerScheduler::default_remote_scan_thread_num() {
361
11.1k
    int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
362
11.1k
                      ? config::doris_max_remote_scanner_thread_pool_thread_num
363
11.1k
                      : std::max(512, CpuInfo::num_cores() * 10);
364
11.1k
    return std::max(num, default_local_scan_thread_num());
365
11.1k
}
366
367
35
int ScannerScheduler::get_remote_scan_thread_queue_size() {
368
35
    return config::doris_remote_scanner_thread_pool_queue_size;
369
35
}
370
371
5.91k
int ScannerScheduler::default_min_active_scan_threads() {
372
5.91k
    return config::min_active_scan_threads > 0
373
5.91k
                   ? config::min_active_scan_threads
374
5.91k
                   : config::min_active_scan_threads = CpuInfo::num_cores() * 2;
375
5.91k
}
376
377
5.91k
int ScannerScheduler::default_min_active_file_scan_threads() {
378
5.91k
    return config::min_active_file_scan_threads > 0
379
5.91k
                   ? config::min_active_file_scan_threads
380
5.91k
                   : config::min_active_file_scan_threads = CpuInfo::num_cores() * 8;
381
5.91k
}
382
383
void ScannerScheduler::_make_sure_virtual_col_is_materialized(
384
1.00M
        const std::shared_ptr<Scanner>& scanner, Block* free_block) {
385
1.00M
#ifndef NDEBUG
386
    // Currently, virtual column can only be used on olap table.
387
1.00M
    std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner);
388
1.00M
    if (olap_scanner == nullptr) {
389
13.0k
        return;
390
13.0k
    }
391
392
995k
    if (free_block->rows() == 0) {
393
774k
        return;
394
774k
    }
395
396
220k
    size_t idx = 0;
397
690k
    for (const auto& entry : *free_block) {
398
        // Virtual column must be materialized on the end of SegmentIterator's next batch method.
399
690k
        const ColumnNothing* column_nothing =
400
690k
                check_and_get_column<ColumnNothing>(entry.column.get());
401
690k
        if (column_nothing == nullptr) {
402
690k
            idx++;
403
690k
            continue;
404
690k
        }
405
406
18.4E
        std::vector<std::string> vcid_to_idx;
407
408
18.4E
        for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
409
0
            vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
410
0
        }
411
412
18.4E
        std::string error_msg = fmt::format(
413
18.4E
                "Column in idx {} is nothing, block columns {}, normal_columns "
414
18.4E
                "{}, "
415
18.4E
                "vir_cid_to_idx_in_block_msg {}",
416
18.4E
                idx, free_block->columns(), olap_scanner->_return_columns.size(),
417
18.4E
                fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")));
418
18.4E
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
419
690k
    }
420
220k
#endif
421
220k
}
422
423
972k
Result<SharedListenableFuture<Void>> ScannerSplitRunner::process_for(std::chrono::nanoseconds) {
424
972k
    _started = true;
425
972k
    bool is_completed = _scan_func();
426
972k
    if (is_completed) {
427
967k
        _completion_future.set_value(Void {});
428
967k
    }
429
972k
    return SharedListenableFuture<Void>::create_ready(Void {});
430
972k
}
431
432
969k
bool ScannerSplitRunner::is_finished() {
433
969k
    return _completion_future.is_done();
434
969k
}
435
436
965k
Status ScannerSplitRunner::finished_status() {
437
965k
    return _completion_future.get_status();
438
965k
}
439
440
0
bool ScannerSplitRunner::is_started() const {
441
0
    return _started.load();
442
0
}
443
444
4.31k
bool ScannerSplitRunner::is_auto_reschedule() const {
445
4.31k
    return false;
446
4.31k
}
447
448
} // namespace doris