Coverage Report

Created: 2026-03-21 11:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/scanner_context.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <glog/logging.h>
23
#include <zconf.h>
24
25
#include <cstdint>
26
#include <ctime>
27
#include <memory>
28
#include <mutex>
29
#include <ostream>
30
#include <shared_mutex>
31
#include <tuple>
32
#include <utility>
33
34
#include "common/config.h"
35
#include "common/exception.h"
36
#include "common/logging.h"
37
#include "common/metrics/doris_metrics.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/operator/scan_operator.h"
41
#include "exec/scan/scan_node.h"
42
#include "exec/scan/scanner_scheduler.h"
43
#include "runtime/descriptors.h"
44
#include "runtime/exec_env.h"
45
#include "runtime/runtime_profile.h"
46
#include "runtime/runtime_state.h"
47
#include "storage/tablet/tablet.h"
48
#include "util/time.h"
49
#include "util/uid_util.h"
50
51
namespace doris {
52
53
using namespace std::chrono_literals;
54
#include "common/compile_check_begin.h"
55
// ==================== MemShareArbitrator ====================
56
static constexpr int64_t DEFAULT_SCANNER_MEM_BYTES = 64 * 1024 * 1024; // 64MB default
57
58
MemShareArbitrator::MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit,
59
                                       double max_scan_ratio)
60
122k
        : query_id(qid),
61
122k
          query_mem_limit(query_mem_limit),
62
122k
          mem_limit(std::max<int64_t>(
63
122k
                  1, static_cast<int64_t>(static_cast<double>(query_mem_limit) * max_scan_ratio))) {
64
122k
}
65
66
2
void MemShareArbitrator::register_scan_node() {
67
2
    total_mem_bytes.fetch_add(DEFAULT_SCANNER_MEM_BYTES);
68
2
}
69
70
9
int64_t MemShareArbitrator::update_mem_bytes(int64_t old_value, int64_t new_value) {
71
9
    int64_t diff = new_value - old_value;
72
9
    int64_t total = total_mem_bytes.fetch_add(diff) + diff;
73
9
    if (new_value == 0) return 0;
74
4
    if (total <= 0) return mem_limit;
75
    // Proportional sharing: allocate based on this context's share of total usage
76
4
    double ratio = static_cast<double>(new_value) / static_cast<double>(std::max(total, new_value));
77
4
    return static_cast<int64_t>(static_cast<double>(mem_limit) * ratio);
78
4
}
79
80
// ==================== MemLimiter ====================
81
5
int MemLimiter::available_scanner_count(int ins_idx) const {
82
5
    int64_t mem_limit_value = mem_limit.load();
83
5
    int64_t running_tasks_count_value = running_tasks_count.load();
84
5
    int64_t estimated_block_mem_bytes_value = get_estimated_block_mem_bytes();
85
86
5
    int64_t max_count = std::max(1L, mem_limit_value / estimated_block_mem_bytes_value);
87
5
    int64_t avail_count = max_count;
88
5
    int64_t per_count = avail_count / parallelism;
89
5
    if (serial_operator) {
90
1
        per_count += (avail_count - per_count * parallelism);
91
4
    } else if (ins_idx < avail_count - per_count * parallelism) {
92
2
        per_count += 1;
93
2
    }
94
95
5
    VLOG_DEBUG << "available_scanner_count. max_count=" << max_count << "("
96
0
               << running_tasks_count_value << "/" << estimated_block_mem_bytes_value
97
0
               << "), operator_mem_limit = " << operator_mem_limit
98
0
               << ", running_tasks_count = " << running_tasks_count_value
99
0
               << ", parallelism = " << parallelism << ", avail_count = " << avail_count
100
0
               << ", ins_id = " << ins_idx << ", per_count = " << per_count
101
0
               << " debug_string: " << debug_string();
102
103
5
    return cast_set<int>(per_count);
104
5
}
105
106
9
void MemLimiter::reestimated_block_mem_bytes(int64_t value) {
107
9
    if (value == 0) return;
108
8
    value = std::min(value, operator_mem_limit);
109
110
8
    std::lock_guard<std::mutex> L(lock);
111
8
    auto old_value = estimated_block_mem_bytes.load();
112
8
    int64_t total =
113
8
            get_estimated_block_mem_bytes() * estimated_block_mem_bytes_update_count + value;
114
8
    estimated_block_mem_bytes_update_count += 1;
115
8
    estimated_block_mem_bytes = total / estimated_block_mem_bytes_update_count;
116
8
    VLOG_DEBUG << fmt::format(
117
0
            "reestimated_block_mem_bytes. MemLimiter = {}, estimated_block_mem_bytes = {}, "
118
0
            "old_value = {}, value: {}",
119
0
            debug_string(), estimated_block_mem_bytes, old_value, value);
120
8
}
121
122
// ==================== ScannerContext ====================
123
ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
124
                               const TupleDescriptor* output_tuple_desc,
125
                               const RowDescriptor* output_row_descriptor,
126
                               const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
127
                               int64_t limit_, std::shared_ptr<Dependency> dependency,
128
                               std::shared_ptr<MemShareArbitrator> arb,
129
                               std::shared_ptr<MemLimiter> limiter, int ins_idx,
130
                               bool enable_adaptive_scan
131
#ifdef BE_TEST
132
                               ,
133
                               int num_parallel_instances
134
#endif
135
                               )
136
18
        : HasTaskExecutionCtx(state),
137
18
          _state(state),
138
18
          _local_state(local_state),
139
18
          _output_tuple_desc(output_row_descriptor
140
18
                                     ? output_row_descriptor->tuple_descriptors().front()
141
18
                                     : output_tuple_desc),
142
18
          _output_row_descriptor(output_row_descriptor),
143
18
          _batch_size(state->batch_size()),
144
18
          limit(limit_),
145
18
          _all_scanners(scanners.begin(), scanners.end()),
146
#ifndef BE_TEST
147
          _scanner_scheduler(local_state->scan_scheduler(state)),
148
          _min_scan_concurrency_of_scan_scheduler(
149
                  _scanner_scheduler->get_min_active_scan_threads()),
150
          _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
151
                                         cast_set<int>(scanners.size()))),
152
#else
153
18
          _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
154
18
          _min_scan_concurrency_of_scan_scheduler(0),
155
18
          _max_scan_concurrency(num_parallel_instances),
156
#endif
157
18
          _min_scan_concurrency(local_state->min_scanners_concurrency(state)),
158
18
          _scanner_mem_limiter(limiter),
159
18
          _mem_share_arb(arb),
160
18
          _ins_idx(ins_idx),
161
18
          _enable_adaptive_scanners(enable_adaptive_scan) {
162
18
    DCHECK(_state != nullptr);
163
18
    DCHECK(_output_row_descriptor == nullptr ||
164
18
           _output_row_descriptor->tuple_descriptors().size() == 1);
165
18
    _query_id = _state->get_query_ctx()->query_id();
166
18
    _resource_ctx = _state->get_query_ctx()->resource_ctx();
167
18
    ctx_id = UniqueId::gen_uid().to_string();
168
171
    for (auto& scanner : _all_scanners) {
169
171
        _pending_tasks.push(std::make_shared<ScanTask>(scanner));
170
171
    }
171
18
    if (limit < 0) {
172
0
        limit = -1;
173
0
    }
174
18
    _dependency = dependency;
175
18
    DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
176
18
    if (auto ctx = task_exec_ctx(); ctx) {
177
0
        ctx->ref_task_execution_ctx();
178
0
    }
179
180
    // Initialize adaptive processor
181
18
    _adaptive_processor = ScannerAdaptiveProcessor::create_shared();
182
18
}
183
184
5
void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t new_value) {
185
5
    if (!_enable_adaptive_scanners) {
186
0
        return;
187
0
    }
188
189
5
    int64_t new_scan_mem_limit = _mem_share_arb->update_mem_bytes(old_value, new_value);
190
5
    _scanner_mem_limiter->update_mem_limit(new_scan_mem_limit);
191
5
    _scanner_mem_limiter->update_arb_mem_bytes(new_value);
192
193
5
    VLOG_DEBUG << fmt::format(
194
0
            "adjust_scan_mem_limit. context = {}, new mem scan limit = {}, scanner mem bytes = {} "
195
0
            "-> {}",
196
0
            debug_string(), new_scan_mem_limit, old_value, new_value);
197
5
}
198
199
12
int ScannerContext::_available_pickup_scanner_count() {
200
12
    if (!_enable_adaptive_scanners) {
201
12
        return _max_scan_concurrency;
202
12
    }
203
204
0
    int min_scanners = std::max(1, _min_scan_concurrency);
205
0
    int max_scanners = _scanner_mem_limiter->available_scanner_count(_ins_idx);
206
0
    max_scanners = std::min(max_scanners, _max_scan_concurrency);
207
0
    min_scanners = std::min(min_scanners, max_scanners);
208
0
    if (_ins_idx == 0) {
209
        // Adjust memory limit via memory share arbitrator
210
0
        _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(),
211
0
                               _scanner_mem_limiter->get_estimated_block_mem_bytes());
212
0
    }
213
214
0
    ScannerAdaptiveProcessor& P = *_adaptive_processor;
215
0
    int& scanners = P.expected_scanners;
216
0
    int64_t now = UnixMillis();
217
    // Avoid frequent adjustment - only adjust every 100ms
218
0
    if (now - P.adjust_scanners_last_timestamp <= config::doris_scanner_dynamic_interval_ms) {
219
0
        return scanners;
220
0
    }
221
0
    P.adjust_scanners_last_timestamp = now;
222
0
    auto old_scanners = P.expected_scanners;
223
224
0
    scanners = std::max(min_scanners, scanners);
225
0
    scanners = std::min(max_scanners, scanners);
226
0
    VLOG_DEBUG << fmt::format(
227
0
            "_available_pickup_scanner_count. context = {}, old_scanners = {}, scanners = {} "
228
0
            ", min_scanners: {}, max_scanners: {}",
229
0
            debug_string(), old_scanners, scanners, min_scanners, max_scanners);
230
231
    // TODO(gabriel): Scanners are scheduled adaptively based on the memory usage now.
232
    // if (_in_flight_tasks_num == 0) {
233
    //     int64_t halt_time = 0;
234
    //     if (P.last_scanner_finish_timestamp != 0) {
235
    //         halt_time = now - P.last_scanner_finish_timestamp;
236
    //     }
237
    //     P.last_scanner_finish_timestamp = now;
238
    //     P.scanner_total_halt_time += halt_time;
239
    // }
240
    // // Calculate performance metrics for adjustment
241
    // P.scanner_gen_blocks_time = (now - P.context_start_time - P.scanner_total_halt_time);
242
    //
243
    // // Blocks per 10ms
244
    // // FIXME:
245
    // double source_speed = static_cast<double>(_tasks_queue.size()) * 1e1 /
246
    //                       static_cast<double>(P.scanner_gen_blocks_time + 1);
247
    // // Scanner scan speed: bytes/ms
248
    // int64_t scanner_total_scan_bytes = P.scanner_total_scan_bytes.load();
249
    // double scanner_scan_speed = static_cast<double>(scanner_total_scan_bytes) /
250
    //                             static_cast<double>(P.scanner_gen_blocks_time + 1);
251
    // // IO latency metrics
252
    // double scanner_total_io_time = static_cast<double>(P.scanner_total_io_time.load()) * 1e-6;
253
    // double scanner_total_scan_bytes_mb =
254
    //         static_cast<double>(scanner_total_scan_bytes) / (1024.0 * 1024.0);
255
    // double io_latency = scanner_total_io_time / (scanner_total_scan_bytes_mb + 1e-3);
256
    //
257
    // // Adjustment routines
258
    // auto try_add_scanners = [&]() {
259
    //     if (!P.try_add_scanners) return true;
260
    //     if (P.last_scanner_total_scan_bytes == scanner_total_scan_bytes) return true;
261
    //     return (scanner_scan_speed > (P.last_scanner_scan_speed * P.expected_speedup_ratio));
262
    // };
263
    //
264
    // auto do_add_scanners = [&]() {
265
    //     P.try_add_scanners = true;
266
    //     const int smooth = 2; // Smoothing factor
267
    //     P.expected_speedup_ratio =
268
    //             static_cast<double>(scanners + 1 + smooth) / static_cast<double>(scanners + smooth);
269
    //     scanners += 1;
270
    // };
271
    //
272
    // auto do_sub_scanners = [&]() {
273
    //     scanners -= 1;
274
    //     P.try_add_scanners = false;
275
    //     P.try_add_scanners_fail_count += 1;
276
    //     if (P.try_add_scanners_fail_count >= 4) {
277
    //         P.try_add_scanners_fail_count = 0;
278
    //         scanners -= 1;
279
    //     }
280
    // };
281
    //
282
    // auto check_slow_io = [&]() {
283
    //     if (((P.check_slow_io++) % 8) != 0) return;
284
    //     if (io_latency >= 2 * P.slow_io_latency_ms) {
285
    //         scanners = std::max(scanners, _max_scan_concurrency / 2);
286
    //     } else if (io_latency >= P.slow_io_latency_ms) {
287
    //         scanners = std::max(scanners, _max_scan_concurrency / 4);
288
    //     }
289
    // };
290
    //
291
    // // Perform adjustment based on feedback
292
    // auto do_adjustment = [&]() {
293
    //     check_slow_io();
294
    //
295
    //     // If source is too slow compared to capacity, add scanners
296
    //     if (source_speed < 0.5) { // Very slow block production
297
    //         do_add_scanners();
298
    //     } else if (try_add_scanners()) {
299
    //         do_add_scanners();
300
    //     } else {
301
    //         do_sub_scanners();
302
    //     }
303
    // };
304
    // P.last_scanner_scan_speed = scanner_scan_speed;
305
    // P.last_scanner_total_scan_bytes = scanner_total_scan_bytes;
306
    //
307
    // do_adjustment();
308
    // scanners = std::min(scanners, max_scanners);
309
    // scanners = std::max(scanners, min_scanners);
310
    //
311
    // VLOG_DEBUG << fmt::format(
312
    //         "available_pickup_scanner_count. ctx_id = {}, scan = {}, source_speed = {}, "
313
    //         "io_latency = {} ms/MB, proposal = {}, current = {}",
314
    //         ctx_id, scanner_scan_speed, source_speed, io_latency, scanners,
315
    //         _in_flight_tasks_num);
316
317
0
    return scanners;
318
0
}
319
320
// After init function call, should not access _parent
321
6
Status ScannerContext::init() {
322
#ifndef BE_TEST
323
    _scanner_profile = _local_state->_scanner_profile;
324
    _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
325
    _scanner_memory_used_counter = _local_state->_memory_used_counter;
326
327
    // 3. get thread token
328
    if (!_state->get_query_ctx()) {
329
        return Status::InternalError("Query context of {} is not set",
330
                                     print_id(_state->query_id()));
331
    }
332
333
    if (_state->get_query_ctx()->get_scan_scheduler()) {
334
        _should_reset_thread_name = false;
335
    }
336
337
    auto scanner = _all_scanners.front().lock();
338
    DCHECK(scanner != nullptr);
339
340
    if (auto* task_executor_scheduler =
341
                dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
342
        std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
343
        TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id));
344
        _task_handle = DORIS_TRY(task_executor->create_task(
345
                task_id, []() { return 0.0; },
346
                config::task_executor_initial_max_concurrency_per_task > 0
347
                        ? config::task_executor_initial_max_concurrency_per_task
348
                        : std::max(48, CpuInfo::num_cores() * 2),
349
                std::chrono::milliseconds(100), std::nullopt));
350
    }
351
#endif
352
    // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
353
    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
354
    // is larger than 10MB.
355
6
    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);
356
357
    // Provide more memory for wide tables, increase proportionally by multiples of 300
358
6
    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
359
360
6
    if (_all_scanners.empty()) {
361
0
        _is_finished = true;
362
0
        _set_scanner_done();
363
0
    }
364
365
    // Initialize memory limiter if memory-aware scheduling is enabled
366
6
    if (_enable_adaptive_scanners) {
367
0
        DCHECK(_scanner_mem_limiter && _mem_share_arb);
368
0
        int64_t c = _scanner_mem_limiter->update_open_tasks_count(1);
369
        // TODO(gabriel): set estimated block size
370
0
        _scanner_mem_limiter->reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
371
0
        _scanner_mem_limiter->update_arb_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
372
0
        if (c == 0) {
373
            // First scanner context to open, adjust scan memory limit
374
0
            _adjust_scan_mem_limit(DEFAULT_SCANNER_MEM_BYTES,
375
0
                                   _scanner_mem_limiter->get_arb_scanner_mem_bytes());
376
0
        }
377
0
    }
378
379
    // when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
380
    // becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
381
    // you can refer https://github.com/apache/doris/issues/35340 for details.
382
6
    const int32_t max_column_reader_num = _state->max_column_reader_num();
383
384
6
    if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
385
0
        int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size());
386
0
        int32_t current_column_num = scan_column_num * _max_scan_concurrency;
387
0
        if (current_column_num > max_column_reader_num) {
388
0
            int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
389
0
            new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
390
0
            if (new_max_thread_num < _max_scan_concurrency) {
391
0
                int32_t origin_max_thread_num = _max_scan_concurrency;
392
0
                _max_scan_concurrency = new_max_thread_num;
393
0
                LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
394
0
                          << " scan's max_thread_num from " << origin_max_thread_num << " to "
395
0
                          << _max_scan_concurrency << ",column num: " << scan_column_num
396
0
                          << ", max_column_reader_num: " << max_column_reader_num;
397
0
            }
398
0
        }
399
0
    }
400
401
6
    COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
402
6
    COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
403
404
6
    std::unique_lock<std::mutex> l(_transfer_lock);
405
6
    RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
406
407
6
    return Status::OK();
408
6
}
409
410
18
ScannerContext::~ScannerContext() {
411
18
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
412
18
    _completed_tasks.clear();
413
18
    BlockUPtr block;
414
19
    while (_free_blocks.try_dequeue(block)) {
415
        // do nothing
416
1
    }
417
18
    block.reset();
418
18
    DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
419
420
    // Cleanup memory limiter if last context closing
421
18
    if (_enable_adaptive_scanners) {
422
4
        if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) {
423
            // Last scanner context to close, reset scan memory limit
424
4
            _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), 0);
425
4
        }
426
4
    }
427
428
18
    if (_task_handle) {
429
0
        if (auto* task_executor_scheduler =
430
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
431
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
432
0
        }
433
0
        _task_handle = nullptr;
434
0
    }
435
18
    if (auto ctx = task_exec_ctx(); ctx) {
436
0
        ctx->unref_task_execution_ctx();
437
0
    }
438
18
}
439
440
3
BlockUPtr ScannerContext::get_free_block(bool force) {
441
3
    BlockUPtr block = nullptr;
442
3
    if (_free_blocks.try_dequeue(block)) {
443
1
        DCHECK(block->mem_reuse());
444
1
        _block_memory_usage -= block->allocated_bytes();
445
1
        _scanner_memory_used_counter->set(_block_memory_usage);
446
        // A free block is reused, so the memory usage should be decreased
447
        // The caller of get_free_block will increase the memory usage
448
2
    } else if (_block_memory_usage < _max_bytes_in_queue || force) {
449
2
        _newly_create_free_blocks_num->update(1);
450
2
        block = Block::create_unique(_output_tuple_desc->slots(), 0);
451
2
    }
452
3
    return block;
453
3
}
454
455
1
void ScannerContext::return_free_block(BlockUPtr block) {
456
    // If under low memory mode, should not return the freeblock, it will occupy too much memory.
457
1
    if (!_local_state->low_memory_mode() && block->mem_reuse() &&
458
1
        _block_memory_usage < _max_bytes_in_queue) {
459
1
        size_t block_size_to_reuse = block->allocated_bytes();
460
1
        _block_memory_usage += block_size_to_reuse;
461
1
        _scanner_memory_used_counter->set(_block_memory_usage);
462
1
        block->clear_column_data();
463
        // Free blocks is used to improve memory efficiency. Failure during pushing back
464
        // free block will not incur any bad result so just ignore the return value.
465
1
        _free_blocks.enqueue(std::move(block));
466
1
    }
467
1
}
468
469
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
470
18
                                        std::unique_lock<std::mutex>& /*transfer_lock*/) {
471
    // increase _num_finished_scanners no matter the scan_task is submitted successfully or not.
472
    // since if submit failed, it will be added back by ScannerContext::push_back_scan_task
473
    // and _num_finished_scanners will be reduced.
474
    // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task
475
    // see ScannerScheduler::_scanner_scan.
476
18
    _in_flight_tasks_num++;
477
18
    return _scanner_scheduler->submit(shared_from_this(), scan_task);
478
18
}
479
480
0
void ScannerContext::clear_free_blocks() {
481
0
    clear_blocks(_free_blocks);
482
0
}
483
484
5
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
485
5
    if (scan_task->status_ok()) {
486
5
        if (scan_task->cached_block && scan_task->cached_block->rows() > 0) {
487
0
            Status st = validate_block_schema(scan_task->cached_block.get());
488
0
            if (!st.ok()) {
489
0
                scan_task->set_status(st);
490
0
            }
491
0
        }
492
5
    }
493
494
5
    std::lock_guard<std::mutex> l(_transfer_lock);
495
5
    if (!scan_task->status_ok()) {
496
0
        _process_status = scan_task->get_status();
497
0
    }
498
5
    _completed_tasks.push_back(scan_task);
499
5
    _in_flight_tasks_num--;
500
501
5
    _dependency->set_ready();
502
5
}
503
504
3
Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) {
505
3
    if (state->is_cancelled()) {
506
1
        _set_scanner_done();
507
1
        return state->cancel_reason();
508
1
    }
509
2
    std::unique_lock l(_transfer_lock);
510
511
2
    if (!_process_status.ok()) {
512
1
        _set_scanner_done();
513
1
        return _process_status;
514
1
    }
515
516
1
    std::shared_ptr<ScanTask> scan_task = nullptr;
517
518
1
    if (!_completed_tasks.empty() && !done()) {
519
        // https://en.cppreference.com/w/cpp/container/list/front
520
        // The behavior is undefined if the list is empty.
521
1
        scan_task = _completed_tasks.front();
522
1
        _completed_tasks.pop_front();
523
1
    }
524
525
1
    if (scan_task != nullptr) {
526
        // The abnormal status of scanner may come from the execution of the scanner itself,
527
        // or come from the scanner scheduler, such as TooManyTasks.
528
1
        if (!scan_task->status_ok()) {
529
            // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
530
0
            _process_status = scan_task->get_status();
531
0
            _set_scanner_done();
532
0
            return _process_status;
533
0
        }
534
535
1
        if (scan_task->cached_block) {
536
            // No need to worry about small block, block is merged together when they are appended to cached_blocks.
537
0
            auto current_block = std::move(scan_task->cached_block);
538
0
            auto block_size = current_block->allocated_bytes();
539
0
            scan_task->cached_block.reset();
540
0
            _block_memory_usage -= block_size;
541
            // consume current block
542
0
            block->swap(*current_block);
543
0
            return_free_block(std::move(current_block));
544
0
        }
545
1
        VLOG_DEBUG << fmt::format(
546
0
                "ScannerContext {} get block from queue, current scan "
547
0
                "task remaing cached_block size {}, eos {}, scheduled tasks {}",
548
0
                ctx_id, _completed_tasks.size(), scan_task->is_eos(), _in_flight_tasks_num);
549
1
        if (scan_task->is_eos()) {
550
            // 1. if eos, record a finished scanner.
551
1
            _num_finished_scanners++;
552
1
            RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
553
1
        } else {
554
0
            scan_task->set_state(ScanTask::State::IN_FLIGHT);
555
0
            RETURN_IF_ERROR(
556
0
                    _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l));
557
0
        }
558
1
    }
559
560
1
    if (_num_finished_scanners == _all_scanners.size() && _completed_tasks.empty()) {
561
0
        _set_scanner_done();
562
0
        _is_finished = true;
563
0
    }
564
565
1
    *eos = done();
566
567
1
    if (_completed_tasks.empty()) {
568
1
        _dependency->block();
569
1
    }
570
571
1
    return Status::OK();
572
1
}
573
574
0
Status ScannerContext::validate_block_schema(Block* block) {
575
0
    size_t index = 0;
576
0
    for (auto& slot : _output_tuple_desc->slots()) {
577
0
        auto& data = block->get_by_position(index++);
578
0
        if (data.column->is_nullable() != data.type->is_nullable()) {
579
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
580
0
                    "column(name: {}) nullable({}) does not match type nullable({}), slot(id: "
581
0
                    "{}, "
582
0
                    "name:{})",
583
0
                    data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(),
584
0
                    slot->col_name());
585
0
        }
586
587
0
        if (data.column->is_nullable() != slot->is_nullable()) {
588
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
589
0
                    "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) "
590
0
                    "nullable({})",
591
0
                    data.name, data.column->is_nullable(), slot->id(), slot->col_name(),
592
0
                    slot->is_nullable());
593
0
        }
594
0
    }
595
0
    return Status::OK();
596
0
}
597
598
0
void ScannerContext::stop_scanners(RuntimeState* state) {
599
0
    std::lock_guard<std::mutex> l(_transfer_lock);
600
0
    if (_should_stop) {
601
0
        return;
602
0
    }
603
0
    _should_stop = true;
604
0
    _set_scanner_done();
605
0
    for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
606
0
        if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
607
0
            sc->_scanner->try_stop();
608
0
        }
609
0
    }
610
0
    _completed_tasks.clear();
611
0
    if (_task_handle) {
612
0
        if (auto* task_executor_scheduler =
613
0
                    dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
614
0
            static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
615
0
        }
616
0
        _task_handle = nullptr;
617
0
    }
618
    // TODO yiguolei, call mark close to scanners
619
0
    if (state->enable_profile()) {
620
0
        std::stringstream scanner_statistics;
621
0
        std::stringstream scanner_rows_read;
622
0
        std::stringstream scanner_wait_worker_time;
623
0
        std::stringstream scanner_projection;
624
0
        scanner_statistics << "[";
625
0
        scanner_rows_read << "[";
626
0
        scanner_wait_worker_time << "[";
627
0
        scanner_projection << "[";
628
        // Scanners can in 3 state
629
        //  state 1: in scanner context, not scheduled
630
        //  state 2: in scanner worker pool's queue, scheduled but not running
631
        //  state 3: scanner is running.
632
0
        for (auto& scanner_ref : _all_scanners) {
633
0
            auto scanner = scanner_ref.lock();
634
0
            if (scanner == nullptr) {
635
0
                continue;
636
0
            }
637
            // Add per scanner running time before close them
638
0
            scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
639
0
                                                       TUnit::TIME_NS)
640
0
                               << ", ";
641
0
            scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(),
642
0
                                                       TUnit::TIME_NS)
643
0
                               << ", ";
644
0
            scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(),
645
0
                                                      TUnit::UNIT)
646
0
                              << ", ";
647
0
            scanner_wait_worker_time
648
0
                    << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
649
0
                                            TUnit::TIME_NS)
650
0
                    << ", ";
651
            // since there are all scanners, some scanners is running, so that could not call scanner
652
            // close here.
653
0
        }
654
0
        scanner_statistics << "]";
655
0
        scanner_rows_read << "]";
656
0
        scanner_wait_worker_time << "]";
657
0
        scanner_projection << "]";
658
0
        _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str());
659
0
        _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
660
0
        _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str());
661
0
        _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str());
662
0
    }
663
0
}
664
665
18
std::string ScannerContext::debug_string() {
666
18
    return fmt::format(
667
18
            "_query_id: {}, id: {}, total scanners: {}, pending tasks: {}, completed tasks: {},"
668
18
            " _should_stop: {}, _is_finished: {}, free blocks: {},"
669
18
            " limit: {}, _in_flight_tasks_num: {}, _num_finished_scanners: {}, _max_thread_num: {},"
670
18
            " _max_bytes_in_queue: {}, query_id: {}, _ins_idx: {}, _enable_adaptive_scanners: {}, "
671
18
            "_mem_share_arb: {}, _scanner_mem_limiter: {}",
672
18
            print_id(_query_id), ctx_id, _all_scanners.size(), _pending_tasks.size(),
673
18
            _completed_tasks.size(), _should_stop, _is_finished, _free_blocks.size_approx(), limit,
674
18
            _in_flight_tasks_num, _num_finished_scanners, _max_scan_concurrency,
675
18
            _max_bytes_in_queue, print_id(_query_id), _ins_idx, _enable_adaptive_scanners,
676
18
            _enable_adaptive_scanners ? _mem_share_arb->debug_string() : "NULL",
677
18
            _enable_adaptive_scanners ? _scanner_mem_limiter->debug_string() : "NULL");
678
18
}
679
680
2
void ScannerContext::_set_scanner_done() {
681
2
    _dependency->set_always_ready();
682
2
}
683
684
1
void ScannerContext::update_peak_running_scanner(int num) {
685
#ifndef BE_TEST
686
    _local_state->_peak_running_scanner->add(num);
687
#endif
688
1
    if (_enable_adaptive_scanners) {
689
1
        _scanner_mem_limiter->update_running_tasks_count(num);
690
1
    }
691
1
}
692
693
1
void ScannerContext::reestimated_block_mem_bytes(int64_t num) {
694
1
    if (_enable_adaptive_scanners) {
695
1
        _scanner_mem_limiter->reestimated_block_mem_bytes(num);
696
1
    }
697
1
}
698
699
int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
700
12
                                    std::unique_lock<std::shared_mutex>& scheduler_lock) {
701
    // Get effective max concurrency considering adaptive scheduling
702
12
    int32_t effective_max_concurrency = _available_pickup_scanner_count();
703
12
    DCHECK_LE(effective_max_concurrency, _max_scan_concurrency);
704
705
    // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks.
706
12
    int32_t margin_1 = _min_scan_concurrency -
707
12
                       (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num);
708
709
    // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks.
710
12
    int32_t margin_2 =
711
12
            _min_scan_concurrency_of_scan_scheduler -
712
12
            (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());
713
714
    // margin_3 is used to respect adaptive max concurrency limit
715
12
    int32_t margin_3 =
716
12
            std::max(effective_max_concurrency -
717
12
                             (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num),
718
12
                     1);
719
720
12
    if (margin_1 <= 0 && margin_2 <= 0) {
721
1
        return 0;
722
1
    }
723
724
11
    int32_t margin = std::max(margin_1, margin_2);
725
11
    if (_enable_adaptive_scanners) {
726
0
        margin = std::min(margin, margin_3); // Cap by adaptive limit
727
0
    }
728
729
11
    if (low_memory_mode()) {
730
        // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`.
731
        // So that we will not submit too many scan tasks to scheduler.
732
0
        margin = std::min(low_memory_mode_scanners() - _in_flight_tasks_num, margin);
733
0
    }
734
735
11
    VLOG_DEBUG << fmt::format(
736
0
            "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - "
737
0
            "({} + {}), margin_3: {} = {} - ({} + {}), margin: {}, adaptive: {}",
738
0
            print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _completed_tasks.size(),
739
0
            _in_flight_tasks_num, margin_2, _min_scan_concurrency_of_scan_scheduler,
740
0
            _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(),
741
0
            margin_3, effective_max_concurrency, _completed_tasks.size(), _in_flight_tasks_num,
742
0
            margin, _enable_adaptive_scanners);
743
744
11
    return margin;
745
12
}
746
747
// This function must be called with:
748
// 1. _transfer_lock held.
749
// 2. ScannerScheduler::_lock held.
750
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
751
                                          std::unique_lock<std::mutex>& transfer_lock,
752
7
                                          std::unique_lock<std::shared_mutex>& scheduler_lock) {
753
7
    if (current_scan_task &&
754
7
        (current_scan_task->cached_block != nullptr || current_scan_task->is_eos())) {
755
1
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
756
1
    }
757
758
6
    std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
759
760
6
    int32_t margin = _get_margin(transfer_lock, scheduler_lock);
761
762
    // margin is less than zero. Means this scan operator could not submit any scan task for now.
763
6
    if (margin <= 0) {
764
        // Be careful with current scan task.
765
        // We need to add it back to task queue to make sure it could be resubmitted.
766
0
        if (current_scan_task) {
767
            // This usually happens when we should downgrade the concurrency.
768
0
            current_scan_task->set_state(ScanTask::State::PENDING);
769
0
            _pending_tasks.push(current_scan_task);
770
0
            VLOG_DEBUG << fmt::format(
771
0
                    "{} push back scanner to task queue, because diff <= 0, _completed_tasks size "
772
0
                    "{}, _in_flight_tasks_num {}",
773
0
                    ctx_id, _completed_tasks.size(), _in_flight_tasks_num);
774
0
        }
775
776
0
#ifndef NDEBUG
777
        // This DCHECK is necessary.
778
        // We need to make sure each scan operator could have at least 1 scan tasks.
779
        // Or this scan operator will not be re-scheduled.
780
0
        if (!_pending_tasks.empty() && _in_flight_tasks_num == 0 && _completed_tasks.empty()) {
781
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
782
0
        }
783
0
#endif
784
785
0
        return Status::OK();
786
0
    }
787
788
6
    bool first_pull = true;
789
790
24
    while (margin-- > 0) {
791
24
        std::shared_ptr<ScanTask> task_to_run;
792
24
        const int32_t current_concurrency = cast_set<int32_t>(
793
24
                _completed_tasks.size() + _in_flight_tasks_num + tasks_to_submit.size());
794
24
        VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id,
795
0
                                  current_concurrency, _completed_tasks.size(),
796
0
                                  _in_flight_tasks_num, tasks_to_submit.size());
797
24
        if (first_pull) {
798
6
            task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency);
799
6
            if (task_to_run == nullptr) {
800
                // In two situations we will get nullptr.
801
                // 1. current_concurrency already reached _max_scan_concurrency.
802
                // 2. all scanners are finished.
803
2
                if (current_scan_task) {
804
1
                    DCHECK(current_scan_task->cached_block == nullptr);
805
1
                    DCHECK(!current_scan_task->is_eos());
806
1
                    if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) {
807
                        // This should not happen.
808
0
                        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
809
0
                                               "Scanner scheduler logical error.");
810
0
                    }
811
                    // Current scan task is not scheduled, we need to add it back to task queue to make sure it could be resubmitted.
812
1
                    current_scan_task->set_state(ScanTask::State::PENDING);
813
1
                    _pending_tasks.push(current_scan_task);
814
1
                }
815
2
            }
816
6
            first_pull = false;
817
18
        } else {
818
18
            task_to_run = _pull_next_scan_task(nullptr, current_concurrency);
819
18
        }
820
821
24
        if (task_to_run) {
822
18
            tasks_to_submit.push_back(task_to_run);
823
18
        } else {
824
6
            break;
825
6
        }
826
24
    }
827
828
6
    if (tasks_to_submit.empty()) {
829
2
        return Status::OK();
830
2
    }
831
832
4
    VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}",
833
0
                              print_id(_query_id), ctx_id, tasks_to_submit.size(),
834
0
                              _pending_tasks.size());
835
836
18
    for (auto& scan_task_iter : tasks_to_submit) {
837
18
        Status submit_status = submit_scan_task(scan_task_iter, transfer_lock);
838
18
        if (!submit_status.ok()) {
839
0
            _process_status = submit_status;
840
0
            _set_scanner_done();
841
0
            return _process_status;
842
0
        }
843
18
    }
844
845
4
    return Status::OK();
846
4
}
847
848
std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
849
31
        std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) {
850
31
    int32_t effective_max_concurrency = _max_scan_concurrency;
851
31
    if (_enable_adaptive_scanners) {
852
0
        effective_max_concurrency = _adaptive_processor->expected_scanners > 0
853
0
                                            ? _adaptive_processor->expected_scanners
854
0
                                            : _max_scan_concurrency;
855
0
    }
856
857
31
    if (current_concurrency >= effective_max_concurrency) {
858
7
        VLOG_DEBUG << fmt::format(
859
0
                "ScannerContext {} current concurrency {} >= effective_max_concurrency {}, skip "
860
0
                "pull",
861
0
                ctx_id, current_concurrency, effective_max_concurrency);
862
7
        return nullptr;
863
7
    }
864
865
24
    if (current_scan_task != nullptr) {
866
3
        if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) {
867
            // This should not happen.
868
2
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
869
2
        }
870
1
        return current_scan_task;
871
3
    }
872
873
21
    if (!_pending_tasks.empty()) {
874
19
        std::shared_ptr<ScanTask> next_scan_task;
875
19
        next_scan_task = _pending_tasks.top();
876
19
        _pending_tasks.pop();
877
19
        return next_scan_task;
878
19
    } else {
879
2
        return nullptr;
880
2
    }
881
21
}
882
883
11
bool ScannerContext::low_memory_mode() const {
884
11
    return _local_state->low_memory_mode();
885
11
}
886
#include "common/compile_check_end.h"
887
} // namespace doris