Coverage Report

Created: 2026-03-30 23:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_context.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <zconf.h>
24
25
#include <cstdint>
26
#include <ctime>
27
#include <memory>
28
#include <mutex>
29
#include <ostream>
30
#include <shared_mutex>
31
#include <tuple>
32
#include <utility>
33
34
#include "common/config.h"
35
#include "common/exception.h"
36
#include "common/logging.h"
37
#include "common/metrics/doris_metrics.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/operator/scan_operator.h"
41
#include "exec/scan/scan_node.h"
42
#include "exec/scan/scanner_scheduler.h"
43
#include "runtime/descriptors.h"
44
#include "runtime/exec_env.h"
45
#include "runtime/runtime_profile.h"
46
#include "runtime/runtime_state.h"
47
#include "storage/tablet/tablet.h"
48
#include "util/time.h"
49
#include "util/uid_util.h"
50
51
namespace doris {
52
53
using namespace std::chrono_literals;
54
#include "common/compile_check_begin.h"
55
ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
56
                               const TupleDescriptor* output_tuple_desc,
57
                               const RowDescriptor* output_row_descriptor,
58
                               const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
59
                               int64_t limit_, std::shared_ptr<Dependency> dependency,
60
                               std::atomic<int64_t>* shared_scan_limit
61
#ifdef BE_TEST
62
                               ,
63
                               int num_parallel_instances
64
#endif
65
                               )
66
14
        : HasTaskExecutionCtx(state),
67
14
          _state(state),
68
14
          _local_state(local_state),
69
14
          _output_tuple_desc(output_row_descriptor
70
14
                                     ? output_row_descriptor->tuple_descriptors().front()
71
14
                                     : output_tuple_desc),
72
14
          _output_row_descriptor(output_row_descriptor),
73
14
          _batch_size(state->batch_size()),
74
14
          limit(limit_),
75
14
          _shared_scan_limit(shared_scan_limit),
76
14
          _all_scanners(scanners.begin(), scanners.end()),
77
#ifndef BE_TEST
78
          _scanner_scheduler(local_state->scan_scheduler(state)),
79
          _min_scan_concurrency_of_scan_scheduler(
80
                  _scanner_scheduler->get_min_active_scan_threads()),
81
          _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
82
                                         cast_set<int>(scanners.size()))),
83
#else
84
14
          _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
85
14
          _min_scan_concurrency_of_scan_scheduler(0),
86
14
          _max_scan_concurrency(num_parallel_instances),
87
#endif
88
14
          _min_scan_concurrency(local_state->min_scanners_concurrency(state)) {
89
14
    DCHECK(_state != nullptr);
90
14
    DCHECK(_output_row_descriptor == nullptr ||
91
14
           _output_row_descriptor->tuple_descriptors().size() == 1);
92
14
    _query_id = _state->get_query_ctx()->query_id();
93
14
    _resource_ctx = _state->get_query_ctx()->resource_ctx();
94
14
    ctx_id = UniqueId::gen_uid().to_string();
95
151
    for (auto& scanner : _all_scanners) {
96
151
        _pending_scanners.push(std::make_shared<ScanTask>(scanner));
97
151
    };
98
14
    if (limit < 0) {
99
0
        limit = -1;
100
0
    }
101
14
    _dependency = dependency;
102
14
    DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
103
14
}
104
105
0
int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
106
0
    DCHECK(desired > 0);
107
0
    int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
108
0
    while (true) {
109
0
        if (remaining < 0) {
110
            // No limit set, grant all desired rows.
111
0
            return desired;
112
0
        }
113
0
        if (remaining == 0) {
114
0
            return 0;
115
0
        }
116
0
        int64_t granted = std::min(desired, remaining);
117
0
        if (_shared_scan_limit->compare_exchange_weak(remaining, remaining - granted,
118
0
                                                      std::memory_order_acq_rel,
119
0
                                                      std::memory_order_acquire)) {
120
0
            return granted;
121
0
        }
122
        // CAS failed, `remaining` is updated to current value, retry.
123
0
    }
124
0
}
125
126
// After init function call, should not access _parent
127
6
Status ScannerContext::init() {
128
#ifndef BE_TEST
129
    _scanner_profile = _local_state->_scanner_profile;
130
    _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
131
    _scanner_memory_used_counter = _local_state->_memory_used_counter;
132
133
    // 3. get thread token
134
    if (!_state->get_query_ctx()) {
135
        return Status::InternalError("Query context of {} is not set",
136
                                     print_id(_state->query_id()));
137
    }
138
139
    if (_state->get_query_ctx()->get_scan_scheduler()) {
140
        _should_reset_thread_name = false;
141
    }
142
143
    auto scanner = _all_scanners.front().lock();
144
    DCHECK(scanner != nullptr);
145
146
    if (auto* task_executor_scheduler =
147
                dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
148
        std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
149
        TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id));
150
        _task_handle = DORIS_TRY(task_executor->create_task(
151
                task_id, []() { return 0.0; },
152
                config::task_executor_initial_max_concurrency_per_task > 0
153
                        ? config::task_executor_initial_max_concurrency_per_task
154
                        : std::max(48, CpuInfo::num_cores() * 2),
155
                std::chrono::milliseconds(100), std::nullopt));
156
    }
157
#endif
158
    // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
159
    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
160
    // is larger than 10MB.
161
6
    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);
162
163
    // Provide more memory for wide tables, increase proportionally by multiples of 300
164
6
    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
165
166
6
    if (_all_scanners.empty()) {
167
0
        _is_finished = true;
168
0
        _set_scanner_done();
169
0
    }
170
171
    // when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
172
    // becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
173
    // you can refer https://github.com/apache/doris/issues/35340 for details.
174
6
    const int32_t max_column_reader_num = _state->max_column_reader_num();
175
176
6
    if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
177
0
        int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size());
178
0
        int32_t current_column_num = scan_column_num * _max_scan_concurrency;
179
0
        if (current_column_num > max_column_reader_num) {
180
0
            int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
181
0
            new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
182
0
            if (new_max_thread_num < _max_scan_concurrency) {
183
0
                int32_t origin_max_thread_num = _max_scan_concurrency;
184
0
                _max_scan_concurrency = new_max_thread_num;
185
0
                LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
186
0
                          << " scan's max_thread_num from " << origin_max_thread_num << " to "
187
0
                          << _max_scan_concurrency << ",column num: " << scan_column_num
188
0
                          << ", max_column_reader_num: " << max_column_reader_num;
189
0
            }
190
0
        }
191
0
    }
192
193
6
    COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
194
6
    COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
195
196
6
    std::unique_lock<std::mutex> l(_transfer_lock);
197
6
    RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
198
199
6
    return Status::OK();
200
6
}
201
202
14
ScannerContext::~ScannerContext() {
203
14
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
204
14
    _tasks_queue.clear();
205
14
    BlockUPtr block;
206
15
    while (_free_blocks.try_dequeue(block)) {
207
        // do nothing
208
1
    }
209
14
    block.reset();
210
14
    DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
211
14
    if (_task_handle) {
212
0
        if (auto* task_executor_scheduler =
213
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
214
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
215
0
        }
216
0
        _task_handle = nullptr;
217
0
    }
218
14
}
219
220
3
BlockUPtr ScannerContext::get_free_block(bool force) {
221
3
    BlockUPtr block = nullptr;
222
3
    if (_free_blocks.try_dequeue(block)) {
223
1
        DCHECK(block->mem_reuse());
224
1
        _block_memory_usage -= block->allocated_bytes();
225
1
        _scanner_memory_used_counter->set(_block_memory_usage);
226
        // A free block is reused, so the memory usage should be decreased
227
        // The caller of get_free_block will increase the memory usage
228
2
    } else if (_block_memory_usage < _max_bytes_in_queue || force) {
229
2
        _newly_create_free_blocks_num->update(1);
230
2
        block = Block::create_unique(_output_tuple_desc->slots(), 0);
231
2
    }
232
3
    return block;
233
3
}
234
235
1
void ScannerContext::return_free_block(BlockUPtr block) {
236
    // If under low memory mode, should not return the freeblock, it will occupy too much memory.
237
1
    if (!_local_state->low_memory_mode() && block->mem_reuse() &&
238
1
        _block_memory_usage < _max_bytes_in_queue) {
239
1
        size_t block_size_to_reuse = block->allocated_bytes();
240
1
        _block_memory_usage += block_size_to_reuse;
241
1
        _scanner_memory_used_counter->set(_block_memory_usage);
242
1
        block->clear_column_data();
243
        // Free blocks is used to improve memory efficiency. Failure during pushing back
244
        // free block will not incur any bad result so just ignore the return value.
245
1
        _free_blocks.enqueue(std::move(block));
246
1
    }
247
1
}
248
249
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
250
18
                                        std::unique_lock<std::mutex>& /*transfer_lock*/) {
251
    // increase _num_finished_scanners no matter the scan_task is submitted successfully or not.
252
    // since if submit failed, it will be added back by ScannerContext::push_back_scan_task
253
    // and _num_finished_scanners will be reduced.
254
    // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task
255
    // see ScannerScheduler::_scanner_scan.
256
18
    _num_scheduled_scanners++;
257
18
    return _scanner_scheduler->submit(shared_from_this(), scan_task);
258
18
}
259
260
0
void ScannerContext::clear_free_blocks() {
261
0
    clear_blocks(_free_blocks);
262
0
}
263
264
5
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
265
5
    if (scan_task->status_ok()) {
266
5
        for (const auto& [block, _] : scan_task->cached_blocks) {
267
0
            if (block->rows() > 0) {
268
0
                Status st = validate_block_schema(block.get());
269
0
                if (!st.ok()) {
270
0
                    scan_task->set_status(st);
271
0
                    break;
272
0
                }
273
0
            }
274
0
        }
275
5
    }
276
277
5
    std::lock_guard<std::mutex> l(_transfer_lock);
278
5
    if (!scan_task->status_ok()) {
279
0
        _process_status = scan_task->get_status();
280
0
    }
281
5
    _tasks_queue.push_back(scan_task);
282
5
    _num_scheduled_scanners--;
283
284
5
    _dependency->set_ready();
285
5
}
286
287
3
Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) {
288
3
    if (state->is_cancelled()) {
289
1
        _set_scanner_done();
290
1
        return state->cancel_reason();
291
1
    }
292
2
    std::unique_lock l(_transfer_lock);
293
294
2
    if (!_process_status.ok()) {
295
1
        _set_scanner_done();
296
1
        return _process_status;
297
1
    }
298
299
1
    std::shared_ptr<ScanTask> scan_task = nullptr;
300
301
1
    if (!_tasks_queue.empty() && !done()) {
302
        // https://en.cppreference.com/w/cpp/container/list/front
303
        // The behavior is undefined if the list is empty.
304
1
        scan_task = _tasks_queue.front();
305
1
    }
306
307
1
    if (scan_task != nullptr) {
308
        // The abnormal status of scanner may come from the execution of the scanner itself,
309
        // or come from the scanner scheduler, such as TooManyTasks.
310
1
        if (!scan_task->status_ok()) {
311
            // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
312
0
            _process_status = scan_task->get_status();
313
0
            _set_scanner_done();
314
0
            return _process_status;
315
0
        }
316
317
        // No need to worry about small block, block is merged together when they are appended to cached_blocks.
318
1
        if (!scan_task->cached_blocks.empty()) {
319
0
            auto [current_block, block_size] = std::move(scan_task->cached_blocks.front());
320
0
            scan_task->cached_blocks.pop_front();
321
0
            _block_memory_usage -= block_size;
322
            // consume current block
323
0
            block->swap(*current_block);
324
0
            return_free_block(std::move(current_block));
325
0
        }
326
327
1
        VLOG_DEBUG << fmt::format(
328
0
                "ScannerContext {} get block from queue, task_queue size {}, current scan "
329
0
                "task remaing cached_block size {}, eos {}, scheduled tasks {}",
330
0
                ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(), scan_task->is_eos(),
331
0
                _num_scheduled_scanners);
332
333
1
        if (scan_task->cached_blocks.empty()) {
334
            // All Cached blocks are consumed, pop this task from task_queue.
335
1
            if (!_tasks_queue.empty()) {
336
1
                _tasks_queue.pop_front();
337
1
            }
338
339
1
            if (scan_task->is_eos()) {
340
                // 1. if eos, record a finished scanner.
341
1
                _num_finished_scanners++;
342
1
                RETURN_IF_ERROR(
343
1
                        _scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
344
1
            } else {
345
0
                RETURN_IF_ERROR(
346
0
                        _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l));
347
0
            }
348
1
        }
349
1
    }
350
351
    // Mark finished when either:
352
    // (1) all scanners completed normally, or
353
    // (2) shared limit exhausted and no scanners are still running.
354
1
    if (_tasks_queue.empty() && (_num_finished_scanners == _all_scanners.size() ||
355
1
                                 (_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
356
1
                                  _num_scheduled_scanners == 0))) {
357
0
        _set_scanner_done();
358
0
        _is_finished = true;
359
0
    }
360
361
1
    *eos = done();
362
363
1
    if (_tasks_queue.empty()) {
364
1
        _dependency->block();
365
1
    }
366
367
1
    return Status::OK();
368
1
}
369
370
0
Status ScannerContext::validate_block_schema(Block* block) {
371
0
    size_t index = 0;
372
0
    for (auto& slot : _output_tuple_desc->slots()) {
373
0
        auto& data = block->get_by_position(index++);
374
0
        if (data.column->is_nullable() != data.type->is_nullable()) {
375
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
376
0
                    "column(name: {}) nullable({}) does not match type nullable({}), slot(id: "
377
0
                    "{}, "
378
0
                    "name:{})",
379
0
                    data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(),
380
0
                    slot->col_name());
381
0
        }
382
383
0
        if (data.column->is_nullable() != slot->is_nullable()) {
384
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
385
0
                    "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) "
386
0
                    "nullable({})",
387
0
                    data.name, data.column->is_nullable(), slot->id(), slot->col_name(),
388
0
                    slot->is_nullable());
389
0
        }
390
0
    }
391
0
    return Status::OK();
392
0
}
393
394
0
void ScannerContext::stop_scanners(RuntimeState* state) {
395
0
    std::lock_guard<std::mutex> l(_transfer_lock);
396
0
    if (_should_stop) {
397
0
        return;
398
0
    }
399
0
    _should_stop = true;
400
0
    _set_scanner_done();
401
0
    for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
402
0
        if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
403
0
            sc->_scanner->try_stop();
404
0
        }
405
0
    }
406
0
    _tasks_queue.clear();
407
0
    if (_task_handle) {
408
0
        if (auto* task_executor_scheduler =
409
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
410
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
411
0
        }
412
0
        _task_handle = nullptr;
413
0
    }
414
    // TODO yiguolei, call mark close to scanners
415
0
    if (state->enable_profile()) {
416
0
        std::stringstream scanner_statistics;
417
0
        std::stringstream scanner_rows_read;
418
0
        std::stringstream scanner_wait_worker_time;
419
0
        std::stringstream scanner_projection;
420
0
        scanner_statistics << "[";
421
0
        scanner_rows_read << "[";
422
0
        scanner_wait_worker_time << "[";
423
0
        scanner_projection << "[";
424
        // Scanners can in 3 state
425
        //  state 1: in scanner context, not scheduled
426
        //  state 2: in scanner worker pool's queue, scheduled but not running
427
        //  state 3: scanner is running.
428
0
        for (auto& scanner_ref : _all_scanners) {
429
0
            auto scanner = scanner_ref.lock();
430
0
            if (scanner == nullptr) {
431
0
                continue;
432
0
            }
433
            // Add per scanner running time before close them
434
0
            scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
435
0
                                                       TUnit::TIME_NS)
436
0
                               << ", ";
437
0
            scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(),
438
0
                                                       TUnit::TIME_NS)
439
0
                               << ", ";
440
0
            scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(),
441
0
                                                      TUnit::UNIT)
442
0
                              << ", ";
443
0
            scanner_wait_worker_time
444
0
                    << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
445
0
                                            TUnit::TIME_NS)
446
0
                    << ", ";
447
            // since there are all scanners, some scanners is running, so that could not call scanner
448
            // close here.
449
0
        }
450
0
        scanner_statistics << "]";
451
0
        scanner_rows_read << "]";
452
0
        scanner_wait_worker_time << "]";
453
0
        scanner_projection << "]";
454
0
        _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str());
455
0
        _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
456
0
        _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str());
457
0
        _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str());
458
0
    }
459
0
}
460
461
18
std::string ScannerContext::debug_string() {
462
18
    return fmt::format(
463
18
            "id: {}, total scanners: {}, pending tasks: {},"
464
18
            " _should_stop: {}, _is_finished: {}, free blocks: {},"
465
18
            " limit: {}, remaining_limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
466
18
            " _max_bytes_in_queue: {}, query_id: {}",
467
18
            ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished,
468
18
            _free_blocks.size_approx(), limit, _shared_scan_limit->load(std::memory_order_relaxed),
469
18
            _num_scheduled_scanners, _max_scan_concurrency, _max_bytes_in_queue,
470
18
            print_id(_query_id));
471
18
}
472
473
2
void ScannerContext::_set_scanner_done() {
474
2
    _dependency->set_always_ready();
475
2
}
476
477
0
void ScannerContext::update_peak_running_scanner(int num) {
478
0
    _local_state->_peak_running_scanner->add(num);
479
0
}
480
481
int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
482
12
                                    std::unique_lock<std::shared_mutex>& scheduler_lock) {
483
    // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks.
484
12
    int32_t margin_1 = _min_scan_concurrency -
485
12
                       (cast_set<int32_t>(_tasks_queue.size()) + _num_scheduled_scanners);
486
487
    // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks.
488
12
    int32_t margin_2 =
489
12
            _min_scan_concurrency_of_scan_scheduler -
490
12
            (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());
491
492
12
    if (margin_1 <= 0 && margin_2 <= 0) {
493
1
        return 0;
494
1
    }
495
496
11
    int32_t margin = std::max(margin_1, margin_2);
497
498
11
    if (low_memory_mode()) {
499
        // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`.
500
        // So that we will not submit too many scan tasks to scheduler.
501
0
        margin = std::min(low_memory_mode_scanners() - _num_scheduled_scanners, margin);
502
0
    }
503
504
11
    VLOG_DEBUG << fmt::format(
505
0
            "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - "
506
0
            "({} + {}), margin: {}",
507
0
            print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _tasks_queue.size(),
508
0
            _num_scheduled_scanners, margin_2, _min_scan_concurrency_of_scan_scheduler,
509
0
            _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin);
510
511
11
    return margin;
512
12
}
513
514
// This function must be called with:
515
// 1. _transfer_lock held.
516
// 2. ScannerScheduler::_lock held.
517
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
518
                                          std::unique_lock<std::mutex>& transfer_lock,
519
7
                                          std::unique_lock<std::shared_mutex>& scheduler_lock) {
520
7
    if (current_scan_task &&
521
7
        (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) {
522
1
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
523
1
    }
524
525
6
    std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
526
527
6
    int32_t margin = _get_margin(transfer_lock, scheduler_lock);
528
529
    // margin is less than zero. Means this scan operator could not submit any scan task for now.
530
6
    if (margin <= 0) {
531
        // Be careful with current scan task.
532
        // We need to add it back to task queue to make sure it could be resubmitted.
533
0
        if (current_scan_task) {
534
            // This usually happens when we should downgrade the concurrency.
535
0
            _pending_scanners.push(current_scan_task);
536
0
            VLOG_DEBUG << fmt::format(
537
0
                    "{} push back scanner to task queue, because diff <= 0, task_queue size "
538
0
                    "{}, _num_scheduled_scanners {}",
539
0
                    ctx_id, _tasks_queue.size(), _num_scheduled_scanners);
540
0
        }
541
542
0
#ifndef NDEBUG
543
        // This DCHECK is necessary.
544
        // We need to make sure each scan operator could have at least 1 scan tasks.
545
        // Or this scan operator will not be re-scheduled.
546
0
        if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) {
547
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
548
0
        }
549
0
#endif
550
551
0
        return Status::OK();
552
0
    }
553
554
6
    bool first_pull = true;
555
556
24
    while (margin-- > 0) {
557
24
        std::shared_ptr<ScanTask> task_to_run;
558
24
        const int32_t current_concurrency = cast_set<int32_t>(
559
24
                _tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size());
560
24
        VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id,
561
0
                                  current_concurrency, _tasks_queue.size(), _num_scheduled_scanners,
562
0
                                  tasks_to_submit.size());
563
24
        if (first_pull) {
564
6
            task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency);
565
6
            if (task_to_run == nullptr) {
566
                // In two situations we will get nullptr.
567
                // 1. current_concurrency already reached _max_scan_concurrency.
568
                // 2. all scanners are finished.
569
2
                if (current_scan_task) {
570
1
                    DCHECK(current_scan_task->cached_blocks.empty());
571
1
                    DCHECK(!current_scan_task->is_eos());
572
1
                    if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
573
                        // This should not happen.
574
0
                        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
575
0
                                               "Scanner scheduler logical error.");
576
0
                    }
577
                    // Current scan task is not eos, but we can not resubmit it.
578
                    // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future.
579
1
                    _pending_scanners.push(current_scan_task);
580
1
                }
581
2
            }
582
6
            first_pull = false;
583
18
        } else {
584
18
            task_to_run = _pull_next_scan_task(nullptr, current_concurrency);
585
18
        }
586
587
24
        if (task_to_run) {
588
18
            tasks_to_submit.push_back(task_to_run);
589
18
        } else {
590
6
            break;
591
6
        }
592
24
    }
593
594
6
    if (tasks_to_submit.empty()) {
595
2
        return Status::OK();
596
2
    }
597
598
4
    VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}",
599
0
                              print_id(_query_id), ctx_id, tasks_to_submit.size(),
600
0
                              _pending_scanners.size());
601
602
18
    for (auto& scan_task_iter : tasks_to_submit) {
603
18
        Status submit_status = submit_scan_task(scan_task_iter, transfer_lock);
604
18
        if (!submit_status.ok()) {
605
0
            _process_status = submit_status;
606
0
            _set_scanner_done();
607
0
            return _process_status;
608
0
        }
609
18
    }
610
611
4
    return Status::OK();
612
4
}
613
614
std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
615
31
        std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) {
616
31
    if (current_concurrency >= _max_scan_concurrency) {
617
7
        VLOG_DEBUG << fmt::format(
618
0
                "ScannerContext {} current concurrency {} >= _max_scan_concurrency {}, skip "
619
0
                "pull",
620
0
                ctx_id, current_concurrency, _max_scan_concurrency);
621
7
        return nullptr;
622
7
    }
623
624
24
    if (current_scan_task != nullptr) {
625
3
        if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
626
            // This should not happen.
627
2
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
628
2
        }
629
1
        return current_scan_task;
630
3
    }
631
632
21
    if (!_pending_scanners.empty()) {
633
        // If shared limit quota is exhausted, do not submit new scanners from pending queue.
634
19
        int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
635
19
        if (remaining == 0) {
636
0
            return nullptr;
637
0
        }
638
19
        std::shared_ptr<ScanTask> next_scan_task;
639
19
        next_scan_task = _pending_scanners.top();
640
19
        _pending_scanners.pop();
641
19
        return next_scan_task;
642
19
    } else {
643
2
        return nullptr;
644
2
    }
645
21
}
646
647
11
bool ScannerContext::low_memory_mode() const {
648
11
    return _local_state->low_memory_mode();
649
11
}
650
#include "common/compile_check_end.h"
651
} // namespace doris