Coverage Report

Created: 2026-04-02 16:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_context.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <zconf.h>
24
25
#include <cstdint>
26
#include <ctime>
27
#include <memory>
28
#include <mutex>
29
#include <ostream>
30
#include <shared_mutex>
31
#include <tuple>
32
#include <utility>
33
34
#include "common/config.h"
35
#include "common/exception.h"
36
#include "common/logging.h"
37
#include "common/metrics/doris_metrics.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/operator/scan_operator.h"
41
#include "exec/scan/scan_node.h"
42
#include "exec/scan/scanner_scheduler.h"
43
#include "runtime/descriptors.h"
44
#include "runtime/exec_env.h"
45
#include "runtime/runtime_profile.h"
46
#include "runtime/runtime_state.h"
47
#include "storage/tablet/tablet.h"
48
#include "util/time.h"
49
#include "util/uid_util.h"
50
51
namespace doris {
52
53
using namespace std::chrono_literals;
54
#include "common/compile_check_begin.h"
55
56
// ==================== ScannerContext ====================
57
ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
58
                               const TupleDescriptor* output_tuple_desc,
59
                               const RowDescriptor* output_row_descriptor,
60
                               const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
61
                               int64_t limit_, std::shared_ptr<Dependency> dependency,
62
                               std::atomic<int64_t>* shared_scan_limit,
63
                               std::shared_ptr<MemShareArbitrator> arb,
64
                               std::shared_ptr<MemLimiter> limiter, int ins_idx,
65
                               bool enable_adaptive_scan
66
#ifdef BE_TEST
67
                               ,
68
                               int num_parallel_instances
69
#endif
70
                               )
71
18
        : HasTaskExecutionCtx(state),
72
18
          _state(state),
73
18
          _local_state(local_state),
74
18
          _output_tuple_desc(output_row_descriptor
75
18
                                     ? output_row_descriptor->tuple_descriptors().front()
76
18
                                     : output_tuple_desc),
77
18
          _output_row_descriptor(output_row_descriptor),
78
18
          _batch_size(state->batch_size()),
79
18
          limit(limit_),
80
18
          _shared_scan_limit(shared_scan_limit),
81
18
          _all_scanners(scanners.begin(), scanners.end()),
82
#ifndef BE_TEST
83
          _scanner_scheduler(local_state->scan_scheduler(state)),
84
          _min_scan_concurrency_of_scan_scheduler(
85
                  _scanner_scheduler->get_min_active_scan_threads()),
86
          _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
87
                                         cast_set<int>(scanners.size()))),
88
#else
89
18
          _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
90
18
          _min_scan_concurrency_of_scan_scheduler(0),
91
18
          _max_scan_concurrency(num_parallel_instances),
92
#endif
93
18
          _min_scan_concurrency(local_state->min_scanners_concurrency(state)),
94
18
          _scanner_mem_limiter(limiter),
95
18
          _mem_share_arb(arb),
96
18
          _ins_idx(ins_idx),
97
18
          _enable_adaptive_scanners(enable_adaptive_scan) {
98
18
    DCHECK(_state != nullptr);
99
18
    DCHECK(_output_row_descriptor == nullptr ||
100
18
           _output_row_descriptor->tuple_descriptors().size() == 1);
101
18
    _query_id = _state->get_query_ctx()->query_id();
102
18
    _resource_ctx = _state->get_query_ctx()->resource_ctx();
103
18
    ctx_id = UniqueId::gen_uid().to_string();
104
171
    for (auto& scanner : _all_scanners) {
105
171
        _pending_tasks.push(std::make_shared<ScanTask>(scanner));
106
171
    }
107
18
    if (limit < 0) {
108
0
        limit = -1;
109
0
    }
110
18
    _dependency = dependency;
111
    // Initialize adaptive processor
112
18
    _adaptive_processor = ScannerAdaptiveProcessor::create_shared();
113
18
    DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
114
18
}
115
116
0
int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
117
0
    DCHECK(desired > 0);
118
0
    int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
119
0
    while (true) {
120
0
        if (remaining < 0) {
121
            // No limit set, grant all desired rows.
122
0
            return desired;
123
0
        }
124
0
        if (remaining == 0) {
125
0
            return 0;
126
0
        }
127
0
        int64_t granted = std::min(desired, remaining);
128
0
        if (_shared_scan_limit->compare_exchange_weak(remaining, remaining - granted,
129
0
                                                      std::memory_order_acq_rel,
130
0
                                                      std::memory_order_acquire)) {
131
0
            return granted;
132
0
        }
133
        // CAS failed, `remaining` is updated to current value, retry.
134
0
    }
135
0
}
136
137
5
void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t new_value) {
138
5
    if (!_enable_adaptive_scanners) {
139
0
        return;
140
0
    }
141
142
5
    int64_t new_scan_mem_limit = _mem_share_arb->update_mem_bytes(old_value, new_value);
143
5
    _scanner_mem_limiter->update_mem_limit(new_scan_mem_limit);
144
5
    _scanner_mem_limiter->update_arb_mem_bytes(new_value);
145
146
5
    VLOG_DEBUG << fmt::format(
147
0
            "adjust_scan_mem_limit. context = {}, new mem scan limit = {}, scanner mem bytes = {} "
148
0
            "-> {}",
149
0
            debug_string(), new_scan_mem_limit, old_value, new_value);
150
5
}
151
152
12
int ScannerContext::_available_pickup_scanner_count() {
153
12
    if (!_enable_adaptive_scanners) {
154
12
        return _max_scan_concurrency;
155
12
    }
156
157
0
    int min_scanners = std::max(1, _min_scan_concurrency);
158
0
    int max_scanners = _scanner_mem_limiter->available_scanner_count(_ins_idx);
159
0
    max_scanners = std::min(max_scanners, _max_scan_concurrency);
160
0
    min_scanners = std::min(min_scanners, max_scanners);
161
0
    if (_ins_idx == 0) {
162
        // Adjust memory limit via memory share arbitrator
163
0
        _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(),
164
0
                               _scanner_mem_limiter->get_estimated_block_mem_bytes());
165
0
    }
166
167
0
    ScannerAdaptiveProcessor& P = *_adaptive_processor;
168
0
    int& scanners = P.expected_scanners;
169
0
    int64_t now = UnixMillis();
170
    // Avoid frequent adjustment - only adjust every 100ms
171
0
    if (now - P.adjust_scanners_last_timestamp <= config::doris_scanner_dynamic_interval_ms) {
172
0
        return scanners;
173
0
    }
174
0
    P.adjust_scanners_last_timestamp = now;
175
0
    auto old_scanners = P.expected_scanners;
176
177
0
    scanners = std::max(min_scanners, scanners);
178
0
    scanners = std::min(max_scanners, scanners);
179
0
    VLOG_DEBUG << fmt::format(
180
0
            "_available_pickup_scanner_count. context = {}, old_scanners = {}, scanners = {} "
181
0
            ", min_scanners: {}, max_scanners: {}",
182
0
            debug_string(), old_scanners, scanners, min_scanners, max_scanners);
183
184
    // TODO(gabriel): Scanners are scheduled adaptively based on the memory usage now.
185
0
    return scanners;
186
0
}
187
188
// After init function call, should not access _parent
189
6
Status ScannerContext::init() {
190
#ifndef BE_TEST
191
    _scanner_profile = _local_state->_scanner_profile;
192
    _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
193
    _scanner_memory_used_counter = _local_state->_memory_used_counter;
194
195
    // 3. get thread token
196
    if (!_state->get_query_ctx()) {
197
        return Status::InternalError("Query context of {} is not set",
198
                                     print_id(_state->query_id()));
199
    }
200
201
    if (_state->get_query_ctx()->get_scan_scheduler()) {
202
        _should_reset_thread_name = false;
203
    }
204
205
    auto scanner = _all_scanners.front().lock();
206
    DCHECK(scanner != nullptr);
207
208
    if (auto* task_executor_scheduler =
209
                dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
210
        std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
211
        TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id));
212
        _task_handle = DORIS_TRY(task_executor->create_task(
213
                task_id, []() { return 0.0; },
214
                config::task_executor_initial_max_concurrency_per_task > 0
215
                        ? config::task_executor_initial_max_concurrency_per_task
216
                        : std::max(48, CpuInfo::num_cores() * 2),
217
                std::chrono::milliseconds(100), std::nullopt));
218
    }
219
#endif
220
    // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
221
    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
222
    // is larger than 10MB.
223
6
    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);
224
225
    // Provide more memory for wide tables, increase proportionally by multiples of 300
226
6
    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
227
228
6
    if (_all_scanners.empty()) {
229
0
        _is_finished = true;
230
0
        _set_scanner_done();
231
0
    }
232
233
    // Initialize memory limiter if memory-aware scheduling is enabled
234
6
    if (_enable_adaptive_scanners) {
235
0
        DCHECK(_scanner_mem_limiter && _mem_share_arb);
236
0
        int64_t c = _scanner_mem_limiter->update_open_tasks_count(1);
237
        // TODO(gabriel): set estimated block size
238
0
        _scanner_mem_limiter->reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
239
0
        _scanner_mem_limiter->update_arb_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
240
0
        if (c == 0) {
241
            // First scanner context to open, adjust scan memory limit
242
0
            _adjust_scan_mem_limit(DEFAULT_SCANNER_MEM_BYTES,
243
0
                                   _scanner_mem_limiter->get_arb_scanner_mem_bytes());
244
0
        }
245
0
    }
246
247
    // when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
248
    // becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
249
    // you can refer https://github.com/apache/doris/issues/35340 for details.
250
6
    const int32_t max_column_reader_num = _state->max_column_reader_num();
251
252
6
    if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
253
0
        int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size());
254
0
        int32_t current_column_num = scan_column_num * _max_scan_concurrency;
255
0
        if (current_column_num > max_column_reader_num) {
256
0
            int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
257
0
            new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
258
0
            if (new_max_thread_num < _max_scan_concurrency) {
259
0
                int32_t origin_max_thread_num = _max_scan_concurrency;
260
0
                _max_scan_concurrency = new_max_thread_num;
261
0
                LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
262
0
                          << " scan's max_thread_num from " << origin_max_thread_num << " to "
263
0
                          << _max_scan_concurrency << ",column num: " << scan_column_num
264
0
                          << ", max_column_reader_num: " << max_column_reader_num;
265
0
            }
266
0
        }
267
0
    }
268
269
6
    COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
270
6
    COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
271
272
6
    std::unique_lock<std::mutex> l(_transfer_lock);
273
6
    RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
274
275
6
    return Status::OK();
276
6
}
277
278
18
ScannerContext::~ScannerContext() {
279
18
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
280
18
    _completed_tasks.clear();
281
18
    BlockUPtr block;
282
19
    while (_free_blocks.try_dequeue(block)) {
283
        // do nothing
284
1
    }
285
18
    block.reset();
286
18
    DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
287
288
    // Cleanup memory limiter if last context closing
289
18
    if (_enable_adaptive_scanners) {
290
4
        if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) {
291
            // Last scanner context to close, reset scan memory limit
292
4
            _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), 0);
293
4
        }
294
4
    }
295
296
18
    if (_task_handle) {
297
0
        if (auto* task_executor_scheduler =
298
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
299
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
300
0
        }
301
0
        _task_handle = nullptr;
302
0
    }
303
18
}
304
305
3
BlockUPtr ScannerContext::get_free_block(bool force) {
306
3
    BlockUPtr block = nullptr;
307
3
    if (_free_blocks.try_dequeue(block)) {
308
1
        DCHECK(block->mem_reuse());
309
1
        _block_memory_usage -= block->allocated_bytes();
310
1
        _scanner_memory_used_counter->set(_block_memory_usage);
311
        // A free block is reused, so the memory usage should be decreased
312
        // The caller of get_free_block will increase the memory usage
313
2
    } else if (_block_memory_usage < _max_bytes_in_queue || force) {
314
2
        _newly_create_free_blocks_num->update(1);
315
2
        block = Block::create_unique(_output_tuple_desc->slots(), 0);
316
2
    }
317
3
    return block;
318
3
}
319
320
1
void ScannerContext::return_free_block(BlockUPtr block) {
321
    // If under low memory mode, should not return the freeblock, it will occupy too much memory.
322
1
    if (!_local_state->low_memory_mode() && block->mem_reuse() &&
323
1
        _block_memory_usage < _max_bytes_in_queue) {
324
1
        size_t block_size_to_reuse = block->allocated_bytes();
325
1
        _block_memory_usage += block_size_to_reuse;
326
1
        _scanner_memory_used_counter->set(_block_memory_usage);
327
1
        block->clear_column_data();
328
        // Free blocks is used to improve memory efficiency. Failure during pushing back
329
        // free block will not incur any bad result so just ignore the return value.
330
1
        _free_blocks.enqueue(std::move(block));
331
1
    }
332
1
}
333
334
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
335
18
                                        std::unique_lock<std::mutex>& /*transfer_lock*/) {
336
    // increase _num_finished_scanners no matter the scan_task is submitted successfully or not.
337
    // since if submit failed, it will be added back by ScannerContext::push_back_scan_task
338
    // and _num_finished_scanners will be reduced.
339
    // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task
340
    // see ScannerScheduler::_scanner_scan.
341
18
    _in_flight_tasks_num++;
342
18
    return _scanner_scheduler->submit(shared_from_this(), scan_task);
343
18
}
344
345
0
void ScannerContext::clear_free_blocks() {
346
0
    clear_blocks(_free_blocks);
347
0
}
348
349
5
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
350
5
    if (scan_task->status_ok()) {
351
5
        if (scan_task->cached_block && scan_task->cached_block->rows() > 0) {
352
0
            Status st = validate_block_schema(scan_task->cached_block.get());
353
0
            if (!st.ok()) {
354
0
                scan_task->set_status(st);
355
0
            }
356
0
        }
357
5
    }
358
359
5
    std::lock_guard<std::mutex> l(_transfer_lock);
360
5
    if (!scan_task->status_ok()) {
361
0
        _process_status = scan_task->get_status();
362
0
    }
363
5
    _completed_tasks.push_back(scan_task);
364
5
    _in_flight_tasks_num--;
365
366
5
    _dependency->set_ready();
367
5
}
368
369
3
Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) {
370
3
    if (state->is_cancelled()) {
371
1
        _set_scanner_done();
372
1
        return state->cancel_reason();
373
1
    }
374
2
    std::unique_lock l(_transfer_lock);
375
376
2
    if (!_process_status.ok()) {
377
1
        _set_scanner_done();
378
1
        return _process_status;
379
1
    }
380
381
1
    std::shared_ptr<ScanTask> scan_task = nullptr;
382
383
1
    if (!_completed_tasks.empty() && !done()) {
384
        // https://en.cppreference.com/w/cpp/container/list/front
385
        // The behavior is undefined if the list is empty.
386
1
        scan_task = _completed_tasks.front();
387
1
        _completed_tasks.pop_front();
388
1
    }
389
390
1
    if (scan_task != nullptr) {
391
        // The abnormal status of scanner may come from the execution of the scanner itself,
392
        // or come from the scanner scheduler, such as TooManyTasks.
393
1
        if (!scan_task->status_ok()) {
394
            // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
395
0
            _process_status = scan_task->get_status();
396
0
            _set_scanner_done();
397
0
            return _process_status;
398
0
        }
399
400
1
        if (scan_task->cached_block) {
401
            // No need to worry about small block, block is merged together when they are appended to cached_blocks.
402
0
            auto current_block = std::move(scan_task->cached_block);
403
0
            auto block_size = current_block->allocated_bytes();
404
0
            scan_task->cached_block.reset();
405
0
            _block_memory_usage -= block_size;
406
            // consume current block
407
0
            block->swap(*current_block);
408
0
            return_free_block(std::move(current_block));
409
0
        }
410
1
        VLOG_DEBUG << fmt::format(
411
0
                "ScannerContext {} get block from queue, current scan "
412
0
                "task remaing cached_block size {}, eos {}, scheduled tasks {}",
413
0
                ctx_id, _completed_tasks.size(), scan_task->is_eos(), _in_flight_tasks_num);
414
1
        if (scan_task->is_eos()) {
415
            // 1. if eos, record a finished scanner.
416
1
            _num_finished_scanners++;
417
1
            RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
418
1
        } else {
419
0
            scan_task->set_state(ScanTask::State::IN_FLIGHT);
420
0
            RETURN_IF_ERROR(
421
0
                    _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l));
422
0
        }
423
1
    }
424
425
    // Mark finished when either:
426
    // (1) all scanners completed normally, or
427
    // (2) shared limit exhausted and no scanners are still running.
428
1
    if (_completed_tasks.empty() &&
429
1
        (_num_finished_scanners == _all_scanners.size() ||
430
1
         (_shared_scan_limit->load(std::memory_order_acquire) == 0 && _in_flight_tasks_num == 0))) {
431
0
        _set_scanner_done();
432
0
        _is_finished = true;
433
0
    }
434
435
1
    *eos = done();
436
437
1
    if (_completed_tasks.empty()) {
438
1
        _dependency->block();
439
1
    }
440
441
1
    return Status::OK();
442
1
}
443
444
0
Status ScannerContext::validate_block_schema(Block* block) {
445
0
    size_t index = 0;
446
0
    for (auto& slot : _output_tuple_desc->slots()) {
447
0
        auto& data = block->get_by_position(index++);
448
0
        if (data.column->is_nullable() != data.type->is_nullable()) {
449
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
450
0
                    "column(name: {}) nullable({}) does not match type nullable({}), slot(id: "
451
0
                    "{}, "
452
0
                    "name:{})",
453
0
                    data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(),
454
0
                    slot->col_name());
455
0
        }
456
457
0
        if (data.column->is_nullable() != slot->is_nullable()) {
458
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
459
0
                    "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) "
460
0
                    "nullable({})",
461
0
                    data.name, data.column->is_nullable(), slot->id(), slot->col_name(),
462
0
                    slot->is_nullable());
463
0
        }
464
0
    }
465
0
    return Status::OK();
466
0
}
467
468
0
void ScannerContext::stop_scanners(RuntimeState* state) {
469
0
    std::lock_guard<std::mutex> l(_transfer_lock);
470
0
    if (_should_stop) {
471
0
        return;
472
0
    }
473
0
    _should_stop = true;
474
0
    _set_scanner_done();
475
0
    for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
476
0
        if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
477
0
            sc->_scanner->try_stop();
478
0
        }
479
0
    }
480
0
    _completed_tasks.clear();
481
0
    if (_task_handle) {
482
0
        if (auto* task_executor_scheduler =
483
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
484
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
485
0
        }
486
0
        _task_handle = nullptr;
487
0
    }
488
    // TODO yiguolei, call mark close to scanners
489
0
    if (state->enable_profile()) {
490
0
        std::stringstream scanner_statistics;
491
0
        std::stringstream scanner_rows_read;
492
0
        std::stringstream scanner_wait_worker_time;
493
0
        std::stringstream scanner_projection;
494
0
        scanner_statistics << "[";
495
0
        scanner_rows_read << "[";
496
0
        scanner_wait_worker_time << "[";
497
0
        scanner_projection << "[";
498
        // Scanners can in 3 state
499
        //  state 1: in scanner context, not scheduled
500
        //  state 2: in scanner worker pool's queue, scheduled but not running
501
        //  state 3: scanner is running.
502
0
        for (auto& scanner_ref : _all_scanners) {
503
0
            auto scanner = scanner_ref.lock();
504
0
            if (scanner == nullptr) {
505
0
                continue;
506
0
            }
507
            // Add per scanner running time before close them
508
0
            scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
509
0
                                                       TUnit::TIME_NS)
510
0
                               << ", ";
511
0
            scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(),
512
0
                                                       TUnit::TIME_NS)
513
0
                               << ", ";
514
0
            scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(),
515
0
                                                      TUnit::UNIT)
516
0
                              << ", ";
517
0
            scanner_wait_worker_time
518
0
                    << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
519
0
                                            TUnit::TIME_NS)
520
0
                    << ", ";
521
            // since there are all scanners, some scanners is running, so that could not call scanner
522
            // close here.
523
0
        }
524
0
        scanner_statistics << "]";
525
0
        scanner_rows_read << "]";
526
0
        scanner_wait_worker_time << "]";
527
0
        scanner_projection << "]";
528
0
        _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str());
529
0
        _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
530
0
        _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str());
531
0
        _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str());
532
0
    }
533
0
}
534
535
18
std::string ScannerContext::debug_string() {
536
18
    return fmt::format(
537
18
            "_query_id: {}, id: {}, total scanners: {}, pending tasks: {}, completed tasks: {},"
538
18
            " _should_stop: {}, _is_finished: {}, free blocks: {},"
539
18
            " limit: {}, _in_flight_tasks_num: {}, remaining_limit: {}, _num_running_scanners: {}, "
540
18
            "_max_thread_num: {},"
541
18
            " _max_bytes_in_queue: {}, _ins_idx: {}, _enable_adaptive_scanners: {}, "
542
18
            "_mem_share_arb: {}, _scanner_mem_limiter: {}",
543
18
            print_id(_query_id), ctx_id, _all_scanners.size(), _pending_tasks.size(),
544
18
            _completed_tasks.size(), _should_stop, _is_finished, _free_blocks.size_approx(), limit,
545
18
            _shared_scan_limit->load(std::memory_order_relaxed), _in_flight_tasks_num,
546
18
            _num_finished_scanners, _max_scan_concurrency, _max_bytes_in_queue, _ins_idx,
547
18
            _enable_adaptive_scanners,
548
18
            _enable_adaptive_scanners ? _mem_share_arb->debug_string() : "NULL",
549
18
            _enable_adaptive_scanners ? _scanner_mem_limiter->debug_string() : "NULL");
550
18
}
551
552
2
void ScannerContext::_set_scanner_done() {
553
2
    _dependency->set_always_ready();
554
2
}
555
556
1
void ScannerContext::update_peak_running_scanner(int num) {
557
#ifndef BE_TEST
558
    _local_state->_peak_running_scanner->add(num);
559
#endif
560
1
    if (_enable_adaptive_scanners) {
561
1
        _scanner_mem_limiter->update_running_tasks_count(num);
562
1
    }
563
1
}
564
565
1
void ScannerContext::reestimated_block_mem_bytes(int64_t num) {
566
1
    if (_enable_adaptive_scanners) {
567
1
        _scanner_mem_limiter->reestimated_block_mem_bytes(num);
568
1
    }
569
1
}
570
571
int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
572
12
                                    std::unique_lock<std::shared_mutex>& scheduler_lock) {
573
    // Get effective max concurrency considering adaptive scheduling
574
12
    int32_t effective_max_concurrency = _available_pickup_scanner_count();
575
12
    DCHECK_LE(effective_max_concurrency, _max_scan_concurrency);
576
577
    // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks.
578
12
    int32_t margin_1 = _min_scan_concurrency -
579
12
                       (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num);
580
581
    // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks.
582
12
    int32_t margin_2 =
583
12
            _min_scan_concurrency_of_scan_scheduler -
584
12
            (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());
585
586
    // margin_3 is used to respect adaptive max concurrency limit
587
12
    int32_t margin_3 =
588
12
            std::max(effective_max_concurrency -
589
12
                             (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num),
590
12
                     1);
591
592
12
    if (margin_1 <= 0 && margin_2 <= 0) {
593
1
        return 0;
594
1
    }
595
596
11
    int32_t margin = std::max(margin_1, margin_2);
597
11
    if (_enable_adaptive_scanners) {
598
0
        margin = std::min(margin, margin_3); // Cap by adaptive limit
599
0
    }
600
601
11
    if (low_memory_mode()) {
602
        // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`.
603
        // So that we will not submit too many scan tasks to scheduler.
604
0
        margin = std::min(low_memory_mode_scanners() - _in_flight_tasks_num, margin);
605
0
    }
606
607
11
    VLOG_DEBUG << fmt::format(
608
0
            "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - "
609
0
            "({} + {}), margin_3: {} = {} - ({} + {}), margin: {}, adaptive: {}",
610
0
            print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _completed_tasks.size(),
611
0
            _in_flight_tasks_num, margin_2, _min_scan_concurrency_of_scan_scheduler,
612
0
            _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(),
613
0
            margin_3, effective_max_concurrency, _completed_tasks.size(), _in_flight_tasks_num,
614
0
            margin, _enable_adaptive_scanners);
615
616
11
    return margin;
617
12
}
618
619
// This function must be called with:
620
// 1. _transfer_lock held.
621
// 2. ScannerScheduler::_lock held.
622
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
623
                                          std::unique_lock<std::mutex>& transfer_lock,
624
7
                                          std::unique_lock<std::shared_mutex>& scheduler_lock) {
625
7
    if (current_scan_task &&
626
7
        (current_scan_task->cached_block != nullptr || current_scan_task->is_eos())) {
627
1
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
628
1
    }
629
630
6
    std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
631
632
6
    int32_t margin = _get_margin(transfer_lock, scheduler_lock);
633
634
    // margin is less than zero. Means this scan operator could not submit any scan task for now.
635
6
    if (margin <= 0) {
636
        // Be careful with current scan task.
637
        // We need to add it back to task queue to make sure it could be resubmitted.
638
0
        if (current_scan_task) {
639
            // This usually happens when we should downgrade the concurrency.
640
0
            current_scan_task->set_state(ScanTask::State::PENDING);
641
0
            _pending_tasks.push(current_scan_task);
642
0
            VLOG_DEBUG << fmt::format(
643
0
                    "{} push back scanner to task queue, because diff <= 0, _completed_tasks size "
644
0
                    "{}, _in_flight_tasks_num {}",
645
0
                    ctx_id, _completed_tasks.size(), _in_flight_tasks_num);
646
0
        }
647
648
0
#ifndef NDEBUG
649
        // This DCHECK is necessary.
650
        // We need to make sure each scan operator could have at least 1 scan tasks.
651
        // Or this scan operator will not be re-scheduled.
652
0
        if (!_pending_tasks.empty() && _in_flight_tasks_num == 0 && _completed_tasks.empty()) {
653
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
654
0
        }
655
0
#endif
656
657
0
        return Status::OK();
658
0
    }
659
660
6
    bool first_pull = true;
661
662
24
    while (margin-- > 0) {
663
24
        std::shared_ptr<ScanTask> task_to_run;
664
24
        const int32_t current_concurrency = cast_set<int32_t>(
665
24
                _completed_tasks.size() + _in_flight_tasks_num + tasks_to_submit.size());
666
24
        VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id,
667
0
                                  current_concurrency, _completed_tasks.size(),
668
0
                                  _in_flight_tasks_num, tasks_to_submit.size());
669
24
        if (first_pull) {
670
6
            task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency);
671
6
            if (task_to_run == nullptr) {
672
                // In two situations we will get nullptr.
673
                // 1. current_concurrency already reached _max_scan_concurrency.
674
                // 2. all scanners are finished.
675
2
                if (current_scan_task) {
676
1
                    DCHECK(current_scan_task->cached_block == nullptr);
677
1
                    DCHECK(!current_scan_task->is_eos());
678
1
                    if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) {
679
                        // This should not happen.
680
0
                        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
681
0
                                               "Scanner scheduler logical error.");
682
0
                    }
683
                    // Current scan task is not scheduled, we need to add it back to task queue to make sure it could be resubmitted.
684
1
                    current_scan_task->set_state(ScanTask::State::PENDING);
685
1
                    _pending_tasks.push(current_scan_task);
686
1
                }
687
2
            }
688
6
            first_pull = false;
689
18
        } else {
690
18
            task_to_run = _pull_next_scan_task(nullptr, current_concurrency);
691
18
        }
692
693
24
        if (task_to_run) {
694
18
            tasks_to_submit.push_back(task_to_run);
695
18
        } else {
696
6
            break;
697
6
        }
698
24
    }
699
700
6
    if (tasks_to_submit.empty()) {
701
2
        return Status::OK();
702
2
    }
703
704
4
    VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}",
705
0
                              print_id(_query_id), ctx_id, tasks_to_submit.size(),
706
0
                              _pending_tasks.size());
707
708
18
    for (auto& scan_task_iter : tasks_to_submit) {
709
18
        Status submit_status = submit_scan_task(scan_task_iter, transfer_lock);
710
18
        if (!submit_status.ok()) {
711
0
            _process_status = submit_status;
712
0
            _set_scanner_done();
713
0
            return _process_status;
714
0
        }
715
18
    }
716
717
4
    return Status::OK();
718
4
}
719
720
std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
721
31
        std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) {
722
31
    int32_t effective_max_concurrency = _max_scan_concurrency;
723
31
    if (_enable_adaptive_scanners) {
724
0
        effective_max_concurrency = _adaptive_processor->expected_scanners > 0
725
0
                                            ? _adaptive_processor->expected_scanners
726
0
                                            : _max_scan_concurrency;
727
0
    }
728
729
31
    if (current_concurrency >= effective_max_concurrency) {
730
7
        VLOG_DEBUG << fmt::format(
731
0
                "ScannerContext {} current concurrency {} >= effective_max_concurrency {}, skip "
732
0
                "pull",
733
0
                ctx_id, current_concurrency, effective_max_concurrency);
734
7
        return nullptr;
735
7
    }
736
737
24
    if (current_scan_task != nullptr) {
738
3
        if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) {
739
            // This should not happen.
740
2
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
741
2
        }
742
1
        return current_scan_task;
743
3
    }
744
745
21
    if (!_pending_tasks.empty()) {
746
        // If shared limit quota is exhausted, do not submit new scanners from pending queue.
747
19
        int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
748
19
        if (remaining == 0) {
749
0
            return nullptr;
750
0
        }
751
19
        std::shared_ptr<ScanTask> next_scan_task;
752
19
        next_scan_task = _pending_tasks.top();
753
19
        _pending_tasks.pop();
754
19
        return next_scan_task;
755
19
    } else {
756
2
        return nullptr;
757
2
    }
758
21
}
759
760
11
bool ScannerContext::low_memory_mode() const {
761
11
    return _local_state->low_memory_mode();
762
11
}
763
#include "common/compile_check_end.h"
764
} // namespace doris