Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_context.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <zconf.h>
24
25
#include <cstdint>
26
#include <ctime>
27
#include <memory>
28
#include <mutex>
29
#include <ostream>
30
#include <shared_mutex>
31
#include <tuple>
32
#include <utility>
33
34
#include "common/config.h"
35
#include "common/exception.h"
36
#include "common/logging.h"
37
#include "common/metrics/doris_metrics.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/operator/scan_operator.h"
41
#include "exec/scan/scan_node.h"
42
#include "exec/scan/scanner_scheduler.h"
43
#include "runtime/descriptors.h"
44
#include "runtime/exec_env.h"
45
#include "runtime/runtime_profile.h"
46
#include "runtime/runtime_state.h"
47
#include "storage/tablet/tablet.h"
48
#include "util/time.h"
49
#include "util/uid_util.h"
50
51
namespace doris {
52
53
using namespace std::chrono_literals;
54
#include "common/compile_check_begin.h"
55
ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
56
                               const TupleDescriptor* output_tuple_desc,
57
                               const RowDescriptor* output_row_descriptor,
58
                               const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
59
                               int64_t limit_, std::shared_ptr<Dependency> dependency
60
#ifdef BE_TEST
61
                               ,
62
                               int num_parallel_instances
63
#endif
64
                               )
65
231k
        : HasTaskExecutionCtx(state),
66
231k
          _state(state),
67
231k
          _local_state(local_state),
68
231k
          _output_tuple_desc(output_row_descriptor
69
231k
                                     ? output_row_descriptor->tuple_descriptors().front()
70
231k
                                     : output_tuple_desc),
71
231k
          _output_row_descriptor(output_row_descriptor),
72
231k
          _batch_size(state->batch_size()),
73
231k
          limit(limit_),
74
231k
          _all_scanners(scanners.begin(), scanners.end()),
75
#ifndef BE_TEST
76
231k
          _scanner_scheduler(local_state->scan_scheduler(state)),
77
          _min_scan_concurrency_of_scan_scheduler(
78
231k
                  _scanner_scheduler->get_min_active_scan_threads()),
79
231k
          _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
80
231k
                                         cast_set<int>(scanners.size()))),
81
#else
82
          _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
83
          _min_scan_concurrency_of_scan_scheduler(0),
84
          _max_scan_concurrency(num_parallel_instances),
85
#endif
86
231k
          _min_scan_concurrency(local_state->min_scanners_concurrency(state)) {
87
231k
    DCHECK(_state != nullptr);
88
231k
    DCHECK(_output_row_descriptor == nullptr ||
89
231k
           _output_row_descriptor->tuple_descriptors().size() == 1);
90
231k
    _query_id = _state->get_query_ctx()->query_id();
91
231k
    _resource_ctx = _state->get_query_ctx()->resource_ctx();
92
231k
    ctx_id = UniqueId::gen_uid().to_string();
93
1.02M
    for (auto& scanner : _all_scanners) {
94
1.02M
        _pending_scanners.push(std::make_shared<ScanTask>(scanner));
95
1.02M
    };
96
231k
    if (limit < 0) {
97
231k
        limit = -1;
98
231k
    }
99
231k
    _dependency = dependency;
100
231k
    DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
101
232k
    if (auto ctx = task_exec_ctx(); ctx) {
102
232k
        ctx->ref_task_execution_ctx();
103
232k
    }
104
231k
}
105
106
// After init function call, should not access _parent
107
230k
Status ScannerContext::init() {
108
230k
#ifndef BE_TEST
109
230k
    _scanner_profile = _local_state->_scanner_profile;
110
230k
    _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
111
230k
    _scanner_memory_used_counter = _local_state->_memory_used_counter;
112
113
    // 3. get thread token
114
230k
    if (!_state->get_query_ctx()) {
115
0
        return Status::InternalError("Query context of {} is not set",
116
0
                                     print_id(_state->query_id()));
117
0
    }
118
119
231k
    if (_state->get_query_ctx()->get_scan_scheduler()) {
120
231k
        _should_reset_thread_name = false;
121
231k
    }
122
123
230k
    auto scanner = _all_scanners.front().lock();
124
230k
    DCHECK(scanner != nullptr);
125
126
230k
    if (auto* task_executor_scheduler =
127
231k
                dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
128
231k
        std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
129
231k
        TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id));
130
231k
        _task_handle = DORIS_TRY(task_executor->create_task(
131
231k
                task_id, []() { return 0.0; },
132
231k
                config::task_executor_initial_max_concurrency_per_task > 0
133
231k
                        ? config::task_executor_initial_max_concurrency_per_task
134
231k
                        : std::max(48, CpuInfo::num_cores() * 2),
135
231k
                std::chrono::milliseconds(100), std::nullopt));
136
231k
    }
137
230k
#endif
138
    // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
139
    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
140
    // is larger than 10MB.
141
230k
    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);
142
143
    // Provide more memory for wide tables, increase proportionally by multiples of 300
144
230k
    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
145
146
230k
    if (_all_scanners.empty()) {
147
0
        _is_finished = true;
148
0
        _set_scanner_done();
149
0
    }
150
151
    // when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
152
    // becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
153
    // you can refer https://github.com/apache/doris/issues/35340 for details.
154
230k
    const int32_t max_column_reader_num = _state->max_column_reader_num();
155
156
230k
    if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
157
115k
        int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size());
158
115k
        int32_t current_column_num = scan_column_num * _max_scan_concurrency;
159
115k
        if (current_column_num > max_column_reader_num) {
160
0
            int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
161
0
            new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
162
0
            if (new_max_thread_num < _max_scan_concurrency) {
163
0
                int32_t origin_max_thread_num = _max_scan_concurrency;
164
0
                _max_scan_concurrency = new_max_thread_num;
165
0
                LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
166
0
                          << " scan's max_thread_num from " << origin_max_thread_num << " to "
167
0
                          << _max_scan_concurrency << ",column num: " << scan_column_num
168
0
                          << ", max_column_reader_num: " << max_column_reader_num;
169
0
            }
170
0
        }
171
115k
    }
172
173
230k
    COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
174
230k
    COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
175
176
230k
    std::unique_lock<std::mutex> l(_transfer_lock);
177
230k
    RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
178
179
230k
    return Status::OK();
180
230k
}
181
182
232k
ScannerContext::~ScannerContext() {
183
232k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
184
232k
    _tasks_queue.clear();
185
232k
    BlockUPtr block;
186
413k
    while (_free_blocks.try_dequeue(block)) {
187
        // do nothing
188
181k
    }
189
232k
    block.reset();
190
232k
    DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
191
232k
    if (_task_handle) {
192
0
        if (auto* task_executor_scheduler =
193
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
194
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
195
0
        }
196
0
        _task_handle = nullptr;
197
0
    }
198
232k
    if (auto ctx = task_exec_ctx(); ctx) {
199
230k
        ctx->unref_task_execution_ctx();
200
230k
    }
201
232k
}
202
203
1.07M
BlockUPtr ScannerContext::get_free_block(bool force) {
204
1.07M
    BlockUPtr block = nullptr;
205
1.07M
    if (_free_blocks.try_dequeue(block)) {
206
624k
        DCHECK(block->mem_reuse());
207
624k
        _block_memory_usage -= block->allocated_bytes();
208
624k
        _scanner_memory_used_counter->set(_block_memory_usage);
209
        // A free block is reused, so the memory usage should be decreased
210
        // The caller of get_free_block will increase the memory usage
211
624k
    } else if (_block_memory_usage < _max_bytes_in_queue || force) {
212
447k
        _newly_create_free_blocks_num->update(1);
213
447k
        block = Block::create_unique(_output_tuple_desc->slots(), 0);
214
447k
    }
215
1.07M
    return block;
216
1.07M
}
217
218
1.07M
void ScannerContext::return_free_block(BlockUPtr block) {
219
    // If under low memory mode, should not return the freeblock, it will occupy too much memory.
220
1.07M
    if (!_local_state->low_memory_mode() && block->mem_reuse() &&
221
1.07M
        _block_memory_usage < _max_bytes_in_queue) {
222
806k
        size_t block_size_to_reuse = block->allocated_bytes();
223
806k
        _block_memory_usage += block_size_to_reuse;
224
806k
        _scanner_memory_used_counter->set(_block_memory_usage);
225
806k
        block->clear_column_data();
226
        // Free blocks is used to improve memory efficiency. Failure during pushing back
227
        // free block will not incur any bad result so just ignore the return value.
228
806k
        _free_blocks.enqueue(std::move(block));
229
806k
    }
230
1.07M
}
231
232
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
233
1.03M
                                        std::unique_lock<std::mutex>& /*transfer_lock*/) {
234
    // increase _num_finished_scanners no matter the scan_task is submitted successfully or not.
235
    // since if submit failed, it will be added back by ScannerContext::push_back_scan_task
236
    // and _num_finished_scanners will be reduced.
237
    // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task
238
    // see ScannerScheduler::_scanner_scan.
239
1.03M
    _num_scheduled_scanners++;
240
1.03M
    return _scanner_scheduler->submit(shared_from_this(), scan_task);
241
1.03M
}
242
243
184
void ScannerContext::clear_free_blocks() {
244
184
    clear_blocks(_free_blocks);
245
184
}
246
247
1.02M
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
248
1.02M
    if (scan_task->status_ok()) {
249
1.03M
        for (const auto& [block, _] : scan_task->cached_blocks) {
250
1.03M
            if (block->rows() > 0) {
251
226k
                Status st = validate_block_schema(block.get());
252
226k
                if (!st.ok()) {
253
0
                    scan_task->set_status(st);
254
0
                    break;
255
0
                }
256
226k
            }
257
1.03M
        }
258
1.02M
    }
259
260
1.02M
    std::lock_guard<std::mutex> l(_transfer_lock);
261
1.02M
    if (!scan_task->status_ok()) {
262
1.26k
        _process_status = scan_task->get_status();
263
1.26k
    }
264
1.02M
    _tasks_queue.push_back(scan_task);
265
1.02M
    _num_scheduled_scanners--;
266
267
1.02M
    _dependency->set_ready();
268
1.02M
}
269
270
1.04M
Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) {
271
1.04M
    if (state->is_cancelled()) {
272
1
        _set_scanner_done();
273
1
        return state->cancel_reason();
274
1
    }
275
1.04M
    std::unique_lock l(_transfer_lock);
276
277
1.04M
    if (!_process_status.ok()) {
278
1.17k
        _set_scanner_done();
279
1.17k
        return _process_status;
280
1.17k
    }
281
282
1.04M
    std::shared_ptr<ScanTask> scan_task = nullptr;
283
284
1.04M
    if (!_tasks_queue.empty() && !done()) {
285
        // https://en.cppreference.com/w/cpp/container/list/front
286
        // The behavior is undefined if the list is empty.
287
1.04M
        scan_task = _tasks_queue.front();
288
1.04M
    }
289
290
1.04M
    if (scan_task != nullptr) {
291
        // The abnormal status of scanner may come from the execution of the scanner itself,
292
        // or come from the scanner scheduler, such as TooManyTasks.
293
1.04M
        if (!scan_task->status_ok()) {
294
            // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
295
0
            _process_status = scan_task->get_status();
296
0
            _set_scanner_done();
297
0
            return _process_status;
298
0
        }
299
300
        // No need to worry about small block, block is merged together when they are appended to cached_blocks.
301
1.04M
        if (!scan_task->cached_blocks.empty()) {
302
1.04M
            auto [current_block, block_size] = std::move(scan_task->cached_blocks.front());
303
1.04M
            scan_task->cached_blocks.pop_front();
304
1.04M
            _block_memory_usage -= block_size;
305
            // consume current block
306
1.04M
            block->swap(*current_block);
307
1.04M
            return_free_block(std::move(current_block));
308
1.04M
        }
309
310
1.04M
        VLOG_DEBUG << fmt::format(
311
310
                "ScannerContext {} get block from queue, task_queue size {}, current scan "
312
310
                "task remaing cached_block size {}, eos {}, scheduled tasks {}",
313
310
                ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(), scan_task->is_eos(),
314
310
                _num_scheduled_scanners);
315
316
1.04M
        if (scan_task->cached_blocks.empty()) {
317
            // All Cached blocks are consumed, pop this task from task_queue.
318
1.02M
            if (!_tasks_queue.empty()) {
319
1.02M
                _tasks_queue.pop_front();
320
1.02M
            }
321
322
1.02M
            if (scan_task->is_eos()) {
323
                // 1. if eos, record a finished scanner.
324
1.02M
                _num_finished_scanners++;
325
1.02M
                RETURN_IF_ERROR(
326
1.02M
                        _scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
327
1.02M
            } else {
328
4.58k
                RETURN_IF_ERROR(
329
4.58k
                        _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l));
330
4.58k
            }
331
1.02M
        }
332
1.04M
    }
333
334
1.04M
    if (_num_finished_scanners == _all_scanners.size() && _tasks_queue.empty()) {
335
228k
        _set_scanner_done();
336
228k
        _is_finished = true;
337
228k
    }
338
339
1.04M
    *eos = done();
340
341
1.04M
    if (_tasks_queue.empty()) {
342
910k
        _dependency->block();
343
910k
    }
344
345
1.04M
    return Status::OK();
346
1.04M
}
347
348
226k
Status ScannerContext::validate_block_schema(Block* block) {
349
226k
    size_t index = 0;
350
768k
    for (auto& slot : _output_tuple_desc->slots()) {
351
768k
        auto& data = block->get_by_position(index++);
352
768k
        if (data.column->is_nullable() != data.type->is_nullable()) {
353
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
354
0
                    "column(name: {}) nullable({}) does not match type nullable({}), slot(id: "
355
0
                    "{}, "
356
0
                    "name:{})",
357
0
                    data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(),
358
0
                    slot->col_name());
359
0
        }
360
361
768k
        if (data.column->is_nullable() != slot->is_nullable()) {
362
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
363
0
                    "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) "
364
0
                    "nullable({})",
365
0
                    data.name, data.column->is_nullable(), slot->id(), slot->col_name(),
366
0
                    slot->is_nullable());
367
0
        }
368
768k
    }
369
226k
    return Status::OK();
370
226k
}
371
372
460k
void ScannerContext::stop_scanners(RuntimeState* state) {
373
460k
    std::lock_guard<std::mutex> l(_transfer_lock);
374
460k
    if (_should_stop) {
375
228k
        return;
376
228k
    }
377
232k
    _should_stop = true;
378
232k
    _set_scanner_done();
379
1.03M
    for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
380
1.03M
        if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
381
1.03M
            sc->_scanner->try_stop();
382
1.03M
        }
383
1.03M
    }
384
232k
    _tasks_queue.clear();
385
232k
    if (_task_handle) {
386
232k
        if (auto* task_executor_scheduler =
387
232k
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
388
232k
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
389
232k
        }
390
232k
        _task_handle = nullptr;
391
232k
    }
392
    // TODO yiguolei, call mark close to scanners
393
232k
    if (state->enable_profile()) {
394
1.64k
        std::stringstream scanner_statistics;
395
1.64k
        std::stringstream scanner_rows_read;
396
1.64k
        std::stringstream scanner_wait_worker_time;
397
1.64k
        std::stringstream scanner_projection;
398
1.64k
        scanner_statistics << "[";
399
1.64k
        scanner_rows_read << "[";
400
1.64k
        scanner_wait_worker_time << "[";
401
1.64k
        scanner_projection << "[";
402
        // Scanners can in 3 state
403
        //  state 1: in scanner context, not scheduled
404
        //  state 2: in scanner worker pool's queue, scheduled but not running
405
        //  state 3: scanner is running.
406
3.15k
        for (auto& scanner_ref : _all_scanners) {
407
3.15k
            auto scanner = scanner_ref.lock();
408
3.15k
            if (scanner == nullptr) {
409
0
                continue;
410
0
            }
411
            // Add per scanner running time before close them
412
3.15k
            scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
413
3.15k
                                                       TUnit::TIME_NS)
414
3.15k
                               << ", ";
415
3.15k
            scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(),
416
3.15k
                                                       TUnit::TIME_NS)
417
3.15k
                               << ", ";
418
3.15k
            scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(),
419
3.15k
                                                      TUnit::UNIT)
420
3.15k
                              << ", ";
421
3.15k
            scanner_wait_worker_time
422
3.15k
                    << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
423
3.15k
                                            TUnit::TIME_NS)
424
3.15k
                    << ", ";
425
            // since there are all scanners, some scanners is running, so that could not call scanner
426
            // close here.
427
3.15k
        }
428
1.64k
        scanner_statistics << "]";
429
1.64k
        scanner_rows_read << "]";
430
1.64k
        scanner_wait_worker_time << "]";
431
1.64k
        scanner_projection << "]";
432
1.64k
        _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str());
433
1.64k
        _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
434
1.64k
        _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str());
435
1.64k
        _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str());
436
1.64k
    }
437
232k
}
438
439
34
std::string ScannerContext::debug_string() {
440
34
    return fmt::format(
441
34
            "id: {}, total scanners: {}, pending tasks: {},"
442
34
            " _should_stop: {}, _is_finished: {}, free blocks: {},"
443
34
            " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
444
34
            " _max_bytes_in_queue: {}, query_id: {}",
445
34
            ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished,
446
34
            _free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_scan_concurrency,
447
34
            _max_bytes_in_queue, print_id(_query_id));
448
34
}
449
450
461k
void ScannerContext::_set_scanner_done() {
451
461k
    _dependency->set_always_ready();
452
461k
}
453
454
2.06M
void ScannerContext::update_peak_running_scanner(int num) {
455
2.06M
    _local_state->_peak_running_scanner->add(num);
456
2.06M
}
457
458
int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
459
1.26M
                                    std::unique_lock<std::shared_mutex>& scheduler_lock) {
460
    // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks.
461
1.26M
    int32_t margin_1 = _min_scan_concurrency -
462
1.26M
                       (cast_set<int32_t>(_tasks_queue.size()) + _num_scheduled_scanners);
463
464
    // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks.
465
1.26M
    int32_t margin_2 =
466
1.26M
            _min_scan_concurrency_of_scan_scheduler -
467
1.26M
            (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());
468
469
1.26M
    if (margin_1 <= 0 && margin_2 <= 0) {
470
9.33k
        return 0;
471
9.33k
    }
472
473
1.25M
    int32_t margin = std::max(margin_1, margin_2);
474
475
1.25M
    if (low_memory_mode()) {
476
        // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`.
477
        // So that we will not submit too many scan tasks to scheduler.
478
126
        margin = std::min(low_memory_mode_scanners() - _num_scheduled_scanners, margin);
479
126
    }
480
481
1.25M
    VLOG_DEBUG << fmt::format(
482
0
            "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - "
483
0
            "({} + {}), margin: {}",
484
0
            print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _tasks_queue.size(),
485
0
            _num_scheduled_scanners, margin_2, _min_scan_concurrency_of_scan_scheduler,
486
0
            _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin);
487
488
1.25M
    return margin;
489
1.26M
}
490
491
// This function must be called with:
492
// 1. _transfer_lock held.
493
// 2. ScannerScheduler::_lock held.
494
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
495
                                          std::unique_lock<std::mutex>& transfer_lock,
496
1.26M
                                          std::unique_lock<std::shared_mutex>& scheduler_lock) {
497
1.26M
    if (current_scan_task &&
498
1.26M
        (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) {
499
1
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
500
1
    }
501
502
1.26M
    std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
503
504
1.26M
    int32_t margin = _get_margin(transfer_lock, scheduler_lock);
505
506
    // margin is less than zero. Means this scan operator could not submit any scan task for now.
507
1.26M
    if (margin <= 0) {
508
        // Be careful with current scan task.
509
        // We need to add it back to task queue to make sure it could be resubmitted.
510
9.33k
        if (current_scan_task) {
511
            // This usually happens when we should downgrade the concurrency.
512
181
            _pending_scanners.push(current_scan_task);
513
181
            VLOG_DEBUG << fmt::format(
514
0
                    "{} push back scanner to task queue, because diff <= 0, task_queue size "
515
0
                    "{}, _num_scheduled_scanners {}",
516
0
                    ctx_id, _tasks_queue.size(), _num_scheduled_scanners);
517
181
        }
518
519
9.33k
#ifndef NDEBUG
520
        // This DCHECK is necessary.
521
        // We need to make sure each scan operator could have at least 1 scan tasks.
522
        // Or this scan operator will not be re-scheduled.
523
9.33k
        if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) {
524
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
525
0
        }
526
9.33k
#endif
527
528
9.33k
        return Status::OK();
529
9.33k
    }
530
531
1.25M
    bool first_pull = true;
532
533
2.28M
    while (margin-- > 0) {
534
1.82M
        std::shared_ptr<ScanTask> task_to_run;
535
1.82M
        const int32_t current_concurrency = cast_set<int32_t>(
536
1.82M
                _tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size());
537
1.82M
        VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id,
538
1
                                  current_concurrency, _tasks_queue.size(), _num_scheduled_scanners,
539
1
                                  tasks_to_submit.size());
540
1.82M
        if (first_pull) {
541
1.25M
            task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency);
542
1.25M
            if (task_to_run == nullptr) {
543
                // In two situations we will get nullptr.
544
                // 1. current_concurrency already reached _max_scan_concurrency.
545
                // 2. all scanners are finished.
546
409k
                if (current_scan_task) {
547
1
                    DCHECK(current_scan_task->cached_blocks.empty());
548
1
                    DCHECK(!current_scan_task->is_eos());
549
1
                    if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
550
                        // This should not happen.
551
0
                        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
552
0
                                               "Scanner scheduler logical error.");
553
0
                    }
554
                    // Current scan task is not eos, but we can not resubmit it.
555
                    // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future.
556
1
                    _pending_scanners.push(current_scan_task);
557
1
                }
558
409k
            }
559
1.25M
            first_pull = false;
560
1.25M
        } else {
561
574k
            task_to_run = _pull_next_scan_task(nullptr, current_concurrency);
562
574k
        }
563
564
1.82M
        if (task_to_run) {
565
1.03M
            tasks_to_submit.push_back(task_to_run);
566
1.03M
        } else {
567
792k
            break;
568
792k
        }
569
1.82M
    }
570
571
1.25M
    if (tasks_to_submit.empty()) {
572
409k
        return Status::OK();
573
409k
    }
574
575
18.4E
    VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}",
576
18.4E
                              print_id(_query_id), ctx_id, tasks_to_submit.size(),
577
18.4E
                              _pending_scanners.size());
578
579
1.03M
    for (auto& scan_task_iter : tasks_to_submit) {
580
1.03M
        Status submit_status = submit_scan_task(scan_task_iter, transfer_lock);
581
1.03M
        if (!submit_status.ok()) {
582
0
            _process_status = submit_status;
583
0
            _set_scanner_done();
584
0
            return _process_status;
585
0
        }
586
1.03M
    }
587
588
841k
    return Status::OK();
589
841k
}
590
591
std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
592
1.82M
        std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) {
593
1.82M
    if (current_concurrency >= _max_scan_concurrency) {
594
381k
        VLOG_DEBUG << fmt::format(
595
0
                "ScannerContext {} current concurrency {} >= _max_scan_concurrency {}, skip "
596
0
                "pull",
597
0
                ctx_id, current_concurrency, _max_scan_concurrency);
598
381k
        return nullptr;
599
381k
    }
600
601
1.44M
    if (current_scan_task != nullptr) {
602
3.94k
        if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
603
            // This should not happen.
604
2
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
605
2
        }
606
3.94k
        return current_scan_task;
607
3.94k
    }
608
609
1.43M
    if (!_pending_scanners.empty()) {
610
1.02M
        std::shared_ptr<ScanTask> next_scan_task;
611
1.02M
        next_scan_task = _pending_scanners.top();
612
1.02M
        _pending_scanners.pop();
613
1.02M
        return next_scan_task;
614
1.02M
    } else {
615
411k
        return nullptr;
616
411k
    }
617
1.43M
}
618
619
5.45M
bool ScannerContext::low_memory_mode() const {
620
5.45M
    return _local_state->low_memory_mode();
621
5.45M
}
622
#include "common/compile_check_end.h"
623
} // namespace doris