Coverage Report

Created: 2026-05-09 05:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_context.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <zconf.h>
24
25
#include <cstdint>
26
#include <ctime>
27
#include <memory>
28
#include <mutex>
29
#include <ostream>
30
#include <shared_mutex>
31
#include <tuple>
32
#include <utility>
33
34
#include "common/config.h"
35
#include "common/exception.h"
36
#include "common/logging.h"
37
#include "common/metrics/doris_metrics.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/operator/scan_operator.h"
41
#include "exec/scan/scan_node.h"
42
#include "exec/scan/scanner_scheduler.h"
43
#include "runtime/descriptors.h"
44
#include "runtime/exec_env.h"
45
#include "runtime/runtime_profile.h"
46
#include "runtime/runtime_state.h"
47
#include "storage/tablet/tablet.h"
48
#include "util/time.h"
49
#include "util/uid_util.h"
50
51
namespace doris {
52
53
using namespace std::chrono_literals;
54
55
// ==================== ScannerContext ====================
56
ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
57
                               const TupleDescriptor* output_tuple_desc,
58
                               const RowDescriptor* output_row_descriptor,
59
                               const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
60
                               int64_t limit_, std::shared_ptr<Dependency> dependency,
61
                               std::atomic<int64_t>* shared_scan_limit,
62
                               std::shared_ptr<MemShareArbitrator> arb,
63
                               std::shared_ptr<MemLimiter> limiter, int ins_idx,
64
                               bool enable_adaptive_scan
65
#ifdef BE_TEST
66
                               ,
67
                               int num_parallel_instances
68
#endif
69
                               )
70
289k
        : HasTaskExecutionCtx(state),
71
289k
          _state(state),
72
289k
          _local_state(local_state),
73
289k
          _output_tuple_desc(output_row_descriptor
74
289k
                                     ? output_row_descriptor->tuple_descriptors().front()
75
289k
                                     : output_tuple_desc),
76
289k
          _output_row_descriptor(output_row_descriptor),
77
289k
          _batch_size(state->batch_size()),
78
289k
          limit(limit_),
79
289k
          _shared_scan_limit(shared_scan_limit),
80
289k
          _all_scanners(scanners.begin(), scanners.end()),
81
#ifndef BE_TEST
82
289k
          _scanner_scheduler(local_state->scan_scheduler(state)),
83
          _min_scan_concurrency_of_scan_scheduler(
84
289k
                  _scanner_scheduler->get_min_active_scan_threads()),
85
289k
          _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
86
289k
                                         cast_set<int>(scanners.size()))),
87
#else
88
          _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
89
          _min_scan_concurrency_of_scan_scheduler(0),
90
          _max_scan_concurrency(num_parallel_instances),
91
#endif
92
289k
          _min_scan_concurrency(local_state->min_scanners_concurrency(state)),
93
289k
          _scanner_mem_limiter(limiter),
94
289k
          _mem_share_arb(arb),
95
289k
          _ins_idx(ins_idx),
96
289k
          _enable_adaptive_scanners(enable_adaptive_scan) {
97
289k
    DCHECK(_state != nullptr);
98
289k
    DCHECK(_output_row_descriptor == nullptr ||
99
289k
           _output_row_descriptor->tuple_descriptors().size() == 1);
100
289k
    _query_id = _state->get_query_ctx()->query_id();
101
289k
    _resource_ctx = _state->get_query_ctx()->resource_ctx();
102
289k
    ctx_id = UniqueId::gen_uid().to_string();
103
1.29M
    for (auto& scanner : _all_scanners) {
104
1.29M
        _pending_tasks.push(std::make_shared<ScanTask>(scanner));
105
1.29M
    }
106
289k
    if (limit < 0) {
107
288k
        limit = -1;
108
288k
    }
109
289k
    _dependency = dependency;
110
    // Initialize adaptive processor
111
289k
    _adaptive_processor = ScannerAdaptiveProcessor::create_shared();
112
289k
    DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
113
289k
}
114
115
1.83M
void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t new_value) {
116
1.83M
    if (!_enable_adaptive_scanners) {
117
0
        return;
118
0
    }
119
120
1.83M
    int64_t new_scan_mem_limit = _mem_share_arb->update_mem_bytes(old_value, new_value);
121
1.83M
    _scanner_mem_limiter->update_mem_limit(new_scan_mem_limit);
122
1.83M
    _scanner_mem_limiter->update_arb_mem_bytes(new_value);
123
124
18.4E
    VLOG_DEBUG << fmt::format(
125
18.4E
            "adjust_scan_mem_limit. context = {}, new mem scan limit = {}, scanner mem bytes = {} "
126
18.4E
            "-> {}",
127
18.4E
            debug_string(), new_scan_mem_limit, old_value, new_value);
128
1.83M
}
129
130
1.67M
int ScannerContext::_available_pickup_scanner_count() {
131
1.67M
    if (!_enable_adaptive_scanners) {
132
10.9k
        return _max_scan_concurrency;
133
10.9k
    }
134
135
1.66M
    int min_scanners = std::max(1, _min_scan_concurrency);
136
1.66M
    int max_scanners = _scanner_mem_limiter->available_scanner_count(_ins_idx);
137
1.66M
    max_scanners = std::min(max_scanners, _max_scan_concurrency);
138
1.66M
    min_scanners = std::min(min_scanners, max_scanners);
139
1.66M
    if (_ins_idx == 0) {
140
        // Adjust memory limit via memory share arbitrator
141
1.39M
        _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(),
142
1.39M
                               _scanner_mem_limiter->get_estimated_block_mem_bytes());
143
1.39M
    }
144
145
1.66M
    ScannerAdaptiveProcessor& P = *_adaptive_processor;
146
1.66M
    int& scanners = P.expected_scanners;
147
1.66M
    int64_t now = UnixMillis();
148
    // Avoid frequent adjustment - only adjust every 100ms
149
1.66M
    if (now - P.adjust_scanners_last_timestamp <= config::doris_scanner_dynamic_interval_ms) {
150
1.26M
        return scanners;
151
1.26M
    }
152
404k
    P.adjust_scanners_last_timestamp = now;
153
404k
    auto old_scanners = P.expected_scanners;
154
155
404k
    scanners = std::max(min_scanners, scanners);
156
404k
    scanners = std::min(max_scanners, scanners);
157
18.4E
    VLOG_DEBUG << fmt::format(
158
18.4E
            "_available_pickup_scanner_count. context = {}, old_scanners = {}, scanners = {} "
159
18.4E
            ", min_scanners: {}, max_scanners: {}",
160
18.4E
            debug_string(), old_scanners, scanners, min_scanners, max_scanners);
161
162
    // TODO(gabriel): Scanners are scheduled adaptively based on the memory usage now.
163
404k
    return scanners;
164
1.66M
}
165
166
// After init function call, should not access _parent
167
290k
Status ScannerContext::init() {
168
290k
#ifndef BE_TEST
169
290k
    _scanner_profile = _local_state->_scanner_profile;
170
290k
    _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
171
290k
    _scanner_memory_used_counter = _local_state->_memory_used_counter;
172
173
    // 3. get thread token
174
290k
    if (!_state->get_query_ctx()) {
175
0
        return Status::InternalError("Query context of {} is not set",
176
0
                                     print_id(_state->query_id()));
177
0
    }
178
179
290k
    if (_state->get_query_ctx()->get_scan_scheduler()) {
180
290k
        _should_reset_thread_name = false;
181
290k
    }
182
183
290k
    auto scanner = _all_scanners.front().lock();
184
290k
    DCHECK(scanner != nullptr);
185
186
290k
    if (auto* task_executor_scheduler =
187
290k
                dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
188
290k
        std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
189
290k
        TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id));
190
290k
        _task_handle = DORIS_TRY(task_executor->create_task(
191
290k
                task_id, []() { return 0.0; },
192
290k
                config::task_executor_initial_max_concurrency_per_task > 0
193
290k
                        ? config::task_executor_initial_max_concurrency_per_task
194
290k
                        : std::max(48, CpuInfo::num_cores() * 2),
195
290k
                std::chrono::milliseconds(100), std::nullopt));
196
290k
    }
197
290k
#endif
198
    // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
199
    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
200
    // is larger than 10MB.
201
290k
    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);
202
203
    // Provide more memory for wide tables, increase proportionally by multiples of 300
204
290k
    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
205
206
290k
    if (_all_scanners.empty()) {
207
0
        _is_finished = true;
208
0
        _set_scanner_done();
209
0
    }
210
211
    // Initialize memory limiter if memory-aware scheduling is enabled
212
290k
    if (_enable_adaptive_scanners) {
213
288k
        DCHECK(_scanner_mem_limiter && _mem_share_arb);
214
288k
        int64_t c = _scanner_mem_limiter->update_open_tasks_count(1);
215
        // TODO(gabriel): set estimated block size
216
288k
        _scanner_mem_limiter->reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
217
288k
        _scanner_mem_limiter->update_arb_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
218
288k
        if (c == 0) {
219
            // First scanner context to open, adjust scan memory limit
220
218k
            _adjust_scan_mem_limit(DEFAULT_SCANNER_MEM_BYTES,
221
218k
                                   _scanner_mem_limiter->get_arb_scanner_mem_bytes());
222
218k
        }
223
288k
    }
224
225
    // when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
226
    // becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
227
    // you can refer https://github.com/apache/doris/issues/35340 for details.
228
290k
    const int32_t max_column_reader_num = _state->max_column_reader_num();
229
230
290k
    if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
231
180k
        int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size());
232
180k
        int32_t current_column_num = scan_column_num * _max_scan_concurrency;
233
180k
        if (current_column_num > max_column_reader_num) {
234
0
            int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
235
0
            new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
236
0
            if (new_max_thread_num < _max_scan_concurrency) {
237
0
                int32_t origin_max_thread_num = _max_scan_concurrency;
238
0
                _max_scan_concurrency = new_max_thread_num;
239
0
                LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
240
0
                          << " scan's max_thread_num from " << origin_max_thread_num << " to "
241
0
                          << _max_scan_concurrency << ",column num: " << scan_column_num
242
0
                          << ", max_column_reader_num: " << max_column_reader_num;
243
0
            }
244
0
        }
245
180k
    }
246
247
290k
    COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
248
290k
    COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
249
250
290k
    std::unique_lock<std::mutex> l(_transfer_lock);
251
290k
    RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
252
253
290k
    return Status::OK();
254
290k
}
255
256
290k
ScannerContext::~ScannerContext() {
257
290k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
258
290k
    _completed_tasks.clear();
259
290k
    BlockUPtr block;
260
486k
    while (_free_blocks.try_dequeue(block)) {
261
        // do nothing
262
195k
    }
263
290k
    block.reset();
264
290k
    DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
265
266
    // Cleanup memory limiter if last context closing
267
290k
    if (_enable_adaptive_scanners) {
268
288k
        if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) {
269
            // Last scanner context to close, reset scan memory limit
270
218k
            _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), 0);
271
218k
        }
272
288k
    }
273
274
290k
    if (_task_handle) {
275
0
        if (auto* task_executor_scheduler =
276
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
277
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
278
0
        }
279
0
        _task_handle = nullptr;
280
0
    }
281
290k
}
282
283
1.38M
BlockUPtr ScannerContext::get_free_block(bool force) {
284
1.38M
    BlockUPtr block = nullptr;
285
1.38M
    if (_free_blocks.try_dequeue(block)) {
286
835k
        DCHECK(block->mem_reuse());
287
835k
        _block_memory_usage -= block->allocated_bytes();
288
835k
        _scanner_memory_used_counter->set(_block_memory_usage);
289
        // A free block is reused, so the memory usage should be decreased
290
        // The caller of get_free_block will increase the memory usage
291
835k
    } else if (_block_memory_usage < _max_bytes_in_queue || force) {
292
553k
        _newly_create_free_blocks_num->update(1);
293
553k
        block = Block::create_unique(_output_tuple_desc->slots(), 0);
294
553k
    }
295
1.38M
    return block;
296
1.38M
}
297
298
1.38M
void ScannerContext::return_free_block(BlockUPtr block) {
299
    // If under low memory mode, should not return the freeblock, it will occupy too much memory.
300
1.38M
    if (!_local_state->low_memory_mode() && block->mem_reuse() &&
301
1.38M
        _block_memory_usage < _max_bytes_in_queue) {
302
1.03M
        size_t block_size_to_reuse = block->allocated_bytes();
303
1.03M
        _block_memory_usage += block_size_to_reuse;
304
1.03M
        _scanner_memory_used_counter->set(_block_memory_usage);
305
1.03M
        block->clear_column_data();
306
        // Free blocks is used to improve memory efficiency. Failure during pushing back
307
        // free block will not incur any bad result so just ignore the return value.
308
1.03M
        _free_blocks.enqueue(std::move(block));
309
1.03M
    }
310
1.38M
}
311
312
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
313
1.39M
                                        std::unique_lock<std::mutex>& /*transfer_lock*/) {
314
    // increase _num_finished_scanners no matter the scan_task is submitted successfully or not.
315
    // since if submit failed, it will be added back by ScannerContext::push_back_scan_task
316
    // and _num_finished_scanners will be reduced.
317
    // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task
318
    // see ScannerScheduler::_scanner_scan.
319
1.39M
    _in_flight_tasks_num++;
320
1.39M
    return _scanner_scheduler->submit(shared_from_this(), scan_task);
321
1.39M
}
322
323
0
void ScannerContext::clear_free_blocks() {
324
0
    clear_blocks(_free_blocks);
325
0
}
326
327
1.38M
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
328
1.38M
    if (scan_task->status_ok()) {
329
1.38M
        if (scan_task->cached_block && scan_task->cached_block->rows() > 0) {
330
308k
            Status st = validate_block_schema(scan_task->cached_block.get());
331
308k
            if (!st.ok()) {
332
0
                scan_task->set_status(st);
333
0
            }
334
308k
        }
335
1.38M
    }
336
337
1.38M
    std::lock_guard<std::mutex> l(_transfer_lock);
338
1.38M
    if (!scan_task->status_ok()) {
339
1.38k
        _process_status = scan_task->get_status();
340
1.38k
    }
341
1.38M
    _completed_tasks.push_back(scan_task);
342
1.38M
    _in_flight_tasks_num--;
343
344
1.38M
    _dependency->set_ready();
345
1.38M
}
346
347
1.38M
Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) {
348
1.38M
    if (state->is_cancelled()) {
349
1
        _set_scanner_done();
350
1
        return state->cancel_reason();
351
1
    }
352
1.38M
    std::unique_lock l(_transfer_lock);
353
354
1.38M
    if (!_process_status.ok()) {
355
1.23k
        _set_scanner_done();
356
1.23k
        return _process_status;
357
1.23k
    }
358
359
1.38M
    std::shared_ptr<ScanTask> scan_task = nullptr;
360
361
1.38M
    if (!_completed_tasks.empty() && !done()) {
362
        // https://en.cppreference.com/w/cpp/container/list/front
363
        // The behavior is undefined if the list is empty.
364
1.38M
        scan_task = _completed_tasks.front();
365
1.38M
        _completed_tasks.pop_front();
366
1.38M
    }
367
368
1.38M
    if (scan_task != nullptr) {
369
        // The abnormal status of scanner may come from the execution of the scanner itself,
370
        // or come from the scanner scheduler, such as TooManyTasks.
371
1.38M
        if (!scan_task->status_ok()) {
372
            // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
373
0
            _process_status = scan_task->get_status();
374
0
            _set_scanner_done();
375
0
            return _process_status;
376
0
        }
377
378
1.38M
        if (scan_task->cached_block) {
379
            // No need to worry about small block, block is merged together when they are appended to cached_blocks.
380
1.38M
            auto current_block = std::move(scan_task->cached_block);
381
1.38M
            auto block_size = current_block->allocated_bytes();
382
1.38M
            scan_task->cached_block.reset();
383
1.38M
            _block_memory_usage -= block_size;
384
            // consume current block
385
1.38M
            block->swap(*current_block);
386
1.38M
            return_free_block(std::move(current_block));
387
1.38M
        }
388
18.4E
        VLOG_DEBUG << fmt::format(
389
18.4E
                "ScannerContext {} get block from queue, current scan "
390
18.4E
                "task remaing cached_block size {}, eos {}, scheduled tasks {}",
391
18.4E
                ctx_id, _completed_tasks.size(), scan_task->is_eos(), _in_flight_tasks_num);
392
1.38M
        if (scan_task->is_eos()) {
393
            // 1. if eos, record a finished scanner.
394
1.28M
            _num_finished_scanners++;
395
1.28M
            RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
396
1.28M
        } else {
397
97.7k
            scan_task->set_state(ScanTask::State::IN_FLIGHT);
398
97.7k
            RETURN_IF_ERROR(
399
97.7k
                    _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l));
400
97.7k
        }
401
1.38M
    }
402
403
1.38M
    if (_completed_tasks.empty() &&
404
1.38M
        (_num_finished_scanners == _all_scanners.size() ||
405
1.33M
         (_shared_scan_limit->load(std::memory_order_acquire) == 0 && _in_flight_tasks_num == 0))) {
406
287k
        _set_scanner_done();
407
287k
        _is_finished = true;
408
287k
    }
409
410
1.38M
    *eos = done();
411
412
1.38M
    if (_completed_tasks.empty()) {
413
1.33M
        _dependency->block();
414
1.33M
    }
415
416
1.38M
    return Status::OK();
417
1.38M
}
418
419
308k
Status ScannerContext::validate_block_schema(Block* block) {
420
308k
    size_t index = 0;
421
1.16M
    for (auto& slot : _output_tuple_desc->slots()) {
422
1.16M
        auto& data = block->get_by_position(index++);
423
1.16M
        if (data.column->is_nullable() != data.type->is_nullable()) {
424
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
425
0
                    "column(name: {}) nullable({}) does not match type nullable({}), slot(id: "
426
0
                    "{}, "
427
0
                    "name:{})",
428
0
                    data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(),
429
0
                    slot->col_name());
430
0
        }
431
432
1.16M
        if (data.column->is_nullable() != slot->is_nullable()) {
433
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
434
0
                    "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) "
435
0
                    "nullable({})",
436
0
                    data.name, data.column->is_nullable(), slot->id(), slot->col_name(),
437
0
                    slot->is_nullable());
438
0
        }
439
1.16M
    }
440
308k
    return Status::OK();
441
308k
}
442
443
578k
void ScannerContext::stop_scanners(RuntimeState* state) {
444
578k
    std::lock_guard<std::mutex> l(_transfer_lock);
445
578k
    if (_should_stop) {
446
287k
        return;
447
287k
    }
448
291k
    _should_stop = true;
449
291k
    _set_scanner_done();
450
1.29M
    for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
451
1.29M
        if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
452
1.29M
            sc->_scanner->try_stop();
453
1.29M
        }
454
1.29M
    }
455
291k
    _completed_tasks.clear();
456
291k
    if (_task_handle) {
457
291k
        if (auto* task_executor_scheduler =
458
291k
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
459
291k
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
460
291k
        }
461
291k
        _task_handle = nullptr;
462
291k
    }
463
    // TODO yiguolei, call mark close to scanners
464
291k
    if (state->enable_profile()) {
465
1.74k
        std::stringstream scanner_statistics;
466
1.74k
        std::stringstream scanner_rows_read;
467
1.74k
        std::stringstream scanner_wait_worker_time;
468
1.74k
        std::stringstream scanner_projection;
469
1.74k
        std::stringstream scanner_prepare_time;
470
1.74k
        std::stringstream scanner_open_time;
471
1.74k
        scanner_statistics << "[";
472
1.74k
        scanner_rows_read << "[";
473
1.74k
        scanner_wait_worker_time << "[";
474
1.74k
        scanner_projection << "[";
475
1.74k
        scanner_prepare_time << "[";
476
1.74k
        scanner_open_time << "[";
477
        // Scanners can in 3 state
478
        //  state 1: in scanner context, not scheduled
479
        //  state 2: in scanner worker pool's queue, scheduled but not running
480
        //  state 3: scanner is running.
481
2.95k
        for (auto& scanner_ref : _all_scanners) {
482
2.95k
            auto scanner = scanner_ref.lock();
483
2.95k
            if (scanner == nullptr) {
484
0
                continue;
485
0
            }
486
            // Add per scanner running time before close them
487
2.95k
            scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
488
2.95k
                                                       TUnit::TIME_NS)
489
2.95k
                               << ", ";
490
2.95k
            scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(),
491
2.95k
                                                       TUnit::TIME_NS)
492
2.95k
                               << ", ";
493
2.95k
            scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(),
494
2.95k
                                                      TUnit::UNIT)
495
2.95k
                              << ", ";
496
2.95k
            scanner_wait_worker_time
497
2.95k
                    << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
498
2.95k
                                            TUnit::TIME_NS)
499
2.95k
                    << ", ";
500
2.95k
            scanner_prepare_time << PrettyPrinter::print(
501
2.95k
                                            scanner->_scanner->get_prepare_time_cost_ns(),
502
2.95k
                                            TUnit::TIME_NS)
503
2.95k
                                 << ", ";
504
2.95k
            scanner_open_time << PrettyPrinter::print(scanner->_scanner->get_open_time_cost_ns(),
505
2.95k
                                                      TUnit::TIME_NS)
506
2.95k
                              << ", ";
507
            // since there are all scanners, some scanners is running, so that could not call scanner
508
            // close here.
509
2.95k
        }
510
1.74k
        scanner_statistics << "]";
511
1.74k
        scanner_rows_read << "]";
512
1.74k
        scanner_wait_worker_time << "]";
513
1.74k
        scanner_projection << "]";
514
1.74k
        scanner_prepare_time << "]";
515
1.74k
        scanner_open_time << "]";
516
1.74k
        _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str());
517
1.74k
        _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
518
1.74k
        _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str());
519
1.74k
        _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str());
520
1.74k
        _scanner_profile->add_info_string("PerScannerPrepareTime", scanner_prepare_time.str());
521
1.74k
        _scanner_profile->add_info_string("PerScannerOpenTime", scanner_open_time.str());
522
1.74k
    }
523
291k
}
524
525
20
std::string ScannerContext::debug_string() {
526
20
    return fmt::format(
527
20
            "_query_id: {}, id: {}, total scanners: {}, pending tasks: {}, completed tasks: {},"
528
20
            " _should_stop: {}, _is_finished: {}, free blocks: {},"
529
20
            " limit: {}, _in_flight_tasks_num: {}, remaining_limit: {}, _num_running_scanners: {}, "
530
20
            "_max_thread_num: {},"
531
20
            " _max_bytes_in_queue: {}, _ins_idx: {}, _enable_adaptive_scanners: {}, "
532
20
            "_mem_share_arb: {}, _scanner_mem_limiter: {}",
533
20
            print_id(_query_id), ctx_id, _all_scanners.size(), _pending_tasks.size(),
534
20
            _completed_tasks.size(), _should_stop, _is_finished, _free_blocks.size_approx(), limit,
535
20
            _shared_scan_limit->load(std::memory_order_relaxed), _in_flight_tasks_num,
536
20
            _num_finished_scanners, _max_scan_concurrency, _max_bytes_in_queue, _ins_idx,
537
20
            _enable_adaptive_scanners,
538
20
            _enable_adaptive_scanners ? _mem_share_arb->debug_string() : "NULL",
539
20
            _enable_adaptive_scanners ? _scanner_mem_limiter->debug_string() : "NULL");
540
20
}
541
542
579k
void ScannerContext::_set_scanner_done() {
543
579k
    _dependency->set_always_ready();
544
579k
}
545
546
2.78M
void ScannerContext::update_peak_running_scanner(int num) {
547
2.78M
#ifndef BE_TEST
548
2.78M
    _local_state->_peak_running_scanner->add(num);
549
2.78M
#endif
550
2.78M
    if (_enable_adaptive_scanners) {
551
2.76M
        _scanner_mem_limiter->update_running_tasks_count(num);
552
2.76M
    }
553
2.78M
}
554
555
1.38M
void ScannerContext::reestimated_block_mem_bytes(int64_t num) {
556
1.38M
    if (_enable_adaptive_scanners) {
557
1.37M
        _scanner_mem_limiter->reestimated_block_mem_bytes(num);
558
1.37M
    }
559
1.38M
}
560
561
int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
562
1.67M
                                    std::unique_lock<std::shared_mutex>& scheduler_lock) {
563
    // Get effective max concurrency considering adaptive scheduling
564
1.67M
    int32_t effective_max_concurrency = _available_pickup_scanner_count();
565
1.67M
    DCHECK_LE(effective_max_concurrency, _max_scan_concurrency);
566
567
    // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks.
568
1.67M
    int32_t margin_1 = _min_scan_concurrency -
569
1.67M
                       (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num);
570
571
    // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks.
572
1.67M
    int32_t margin_2 =
573
1.67M
            _min_scan_concurrency_of_scan_scheduler -
574
1.67M
            (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());
575
576
    // margin_3 is used to respect adaptive max concurrency limit
577
1.67M
    int32_t margin_3 =
578
1.67M
            std::max(effective_max_concurrency -
579
1.67M
                             (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num),
580
1.67M
                     1);
581
582
1.67M
    if (margin_1 <= 0 && margin_2 <= 0) {
583
1
        return 0;
584
1
    }
585
586
1.67M
    int32_t margin = std::max(margin_1, margin_2);
587
1.67M
    if (_enable_adaptive_scanners) {
588
1.66M
        margin = std::min(margin, margin_3); // Cap by adaptive limit
589
1.66M
    }
590
591
1.67M
    if (low_memory_mode()) {
592
        // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`.
593
        // So that we will not submit too many scan tasks to scheduler.
594
0
        margin = std::min(low_memory_mode_scanners() - _in_flight_tasks_num, margin);
595
0
    }
596
597
1.67M
    VLOG_DEBUG << fmt::format(
598
1
            "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - "
599
1
            "({} + {}), margin_3: {} = {} - ({} + {}), margin: {}, adaptive: {}",
600
1
            print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _completed_tasks.size(),
601
1
            _in_flight_tasks_num, margin_2, _min_scan_concurrency_of_scan_scheduler,
602
1
            _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(),
603
1
            margin_3, effective_max_concurrency, _completed_tasks.size(), _in_flight_tasks_num,
604
1
            margin, _enable_adaptive_scanners);
605
606
1.67M
    return margin;
607
1.67M
}
608
609
// This function must be called with:
610
// 1. _transfer_lock held.
611
// 2. ScannerScheduler::_lock held.
612
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
613
                                          std::unique_lock<std::mutex>& transfer_lock,
614
1.67M
                                          std::unique_lock<std::shared_mutex>& scheduler_lock) {
615
1.67M
    if (current_scan_task &&
616
1.67M
        (current_scan_task->cached_block != nullptr || current_scan_task->is_eos())) {
617
1
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
618
1
    }
619
620
1.67M
    std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
621
622
1.67M
    int32_t margin = _get_margin(transfer_lock, scheduler_lock);
623
624
    // margin is less than zero. Means this scan operator could not submit any scan task for now.
625
1.67M
    if (margin <= 0) {
626
        // Be careful with current scan task.
627
        // We need to add it back to task queue to make sure it could be resubmitted.
628
0
        if (current_scan_task) {
629
            // This usually happens when we should downgrade the concurrency.
630
0
            current_scan_task->set_state(ScanTask::State::PENDING);
631
0
            _pending_tasks.push(current_scan_task);
632
0
            VLOG_DEBUG << fmt::format(
633
0
                    "{} push back scanner to task queue, because diff <= 0, _completed_tasks size "
634
0
                    "{}, _in_flight_tasks_num {}",
635
0
                    ctx_id, _completed_tasks.size(), _in_flight_tasks_num);
636
0
        }
637
638
0
#ifndef NDEBUG
639
        // This DCHECK is necessary.
640
        // We need to make sure each scan operator could have at least 1 scan tasks.
641
        // Or this scan operator will not be re-scheduled.
642
0
        if (!_pending_tasks.empty() && _in_flight_tasks_num == 0 && _completed_tasks.empty()) {
643
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
644
0
        }
645
0
#endif
646
647
0
        return Status::OK();
648
0
    }
649
650
1.67M
    bool first_pull = true;
651
652
3.06M
    while (margin-- > 0) {
653
1.75M
        std::shared_ptr<ScanTask> task_to_run;
654
1.75M
        const int32_t current_concurrency = cast_set<int32_t>(
655
1.75M
                _completed_tasks.size() + _in_flight_tasks_num + tasks_to_submit.size());
656
18.4E
        VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id,
657
18.4E
                                  current_concurrency, _completed_tasks.size(),
658
18.4E
                                  _in_flight_tasks_num, tasks_to_submit.size());
659
1.75M
        if (first_pull) {
660
1.67M
            task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency);
661
1.67M
            if (task_to_run == nullptr) {
662
                // In two situations we will get nullptr.
663
                // 1. current_concurrency already reached _max_scan_concurrency.
664
                // 2. all scanners are finished.
665
346k
                if (current_scan_task) {
666
9
                    DCHECK(current_scan_task->cached_block == nullptr);
667
9
                    DCHECK(!current_scan_task->is_eos());
668
9
                    if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) {
669
                        // This should not happen.
670
0
                        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
671
0
                                               "Scanner scheduler logical error.");
672
0
                    }
673
                    // Current scan task is not scheduled, we need to add it back to task queue to make sure it could be resubmitted.
674
9
                    current_scan_task->set_state(ScanTask::State::PENDING);
675
9
                    _pending_tasks.push(current_scan_task);
676
9
                }
677
346k
            }
678
1.67M
            first_pull = false;
679
1.67M
        } else {
680
77.0k
            task_to_run = _pull_next_scan_task(nullptr, current_concurrency);
681
77.0k
        }
682
683
1.75M
        if (task_to_run) {
684
1.39M
            tasks_to_submit.push_back(task_to_run);
685
1.39M
        } else {
686
364k
            break;
687
364k
        }
688
1.75M
    }
689
690
1.67M
    if (tasks_to_submit.empty()) {
691
346k
        return Status::OK();
692
346k
    }
693
694
18.4E
    VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}",
695
18.4E
                              print_id(_query_id), ctx_id, tasks_to_submit.size(),
696
18.4E
                              _pending_tasks.size());
697
698
1.39M
    for (auto& scan_task_iter : tasks_to_submit) {
699
1.39M
        Status submit_status = submit_scan_task(scan_task_iter, transfer_lock);
700
1.39M
        if (!submit_status.ok()) {
701
0
            _process_status = submit_status;
702
0
            _set_scanner_done();
703
0
            return _process_status;
704
0
        }
705
1.39M
    }
706
707
1.33M
    return Status::OK();
708
1.33M
}
709
710
std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
711
1.75M
        std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) {
712
1.75M
    int32_t effective_max_concurrency = _max_scan_concurrency;
713
1.75M
    if (_enable_adaptive_scanners) {
714
1.73M
        effective_max_concurrency = _adaptive_processor->expected_scanners > 0
715
1.73M
                                            ? _adaptive_processor->expected_scanners
716
1.73M
                                            : _max_scan_concurrency;
717
1.73M
    }
718
719
1.75M
    if (current_concurrency >= effective_max_concurrency) {
720
8.37k
        VLOG_DEBUG << fmt::format(
721
0
                "ScannerContext {} current concurrency {} >= effective_max_concurrency {}, skip "
722
0
                "pull",
723
0
                ctx_id, current_concurrency, effective_max_concurrency);
724
8.37k
        return nullptr;
725
8.37k
    }
726
727
1.74M
    if (current_scan_task != nullptr) {
728
97.9k
        if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) {
729
            // This should not happen.
730
2
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
731
2
        }
732
97.9k
        return current_scan_task;
733
97.9k
    }
734
735
1.64M
    if (!_pending_tasks.empty()) {
736
        // Skip submitting more pending scanners once the LIMIT budget is
737
        // exhausted; they would only open and immediately EOF.
738
1.29M
        if (_shared_scan_limit->load(std::memory_order_acquire) == 0) {
739
103
            return nullptr;
740
103
        }
741
1.29M
        std::shared_ptr<ScanTask> next_scan_task;
742
1.29M
        next_scan_task = _pending_tasks.top();
743
1.29M
        _pending_tasks.pop();
744
1.29M
        return next_scan_task;
745
1.29M
    } else {
746
355k
        return nullptr;
747
355k
    }
748
1.64M
}
749
750
5.83M
bool ScannerContext::low_memory_mode() const {
751
5.83M
    return _local_state->low_memory_mode();
752
5.83M
}
753
} // namespace doris