Coverage Report

Created: 2026-04-01 07:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/types.h>
21
#include <stdint.h>
22
23
#include <atomic>
24
#include <cstdint>
25
#include <list>
26
#include <memory>
27
#include <mutex>
28
#include <stack>
29
#include <string>
30
#include <utility>
31
#include <vector>
32
33
#include "common/config.h"
34
#include "common/factory_creator.h"
35
#include "common/metrics/doris_metrics.h"
36
#include "common/status.h"
37
#include "concurrentqueue.h"
38
#include "core/block/block.h"
39
#include "exec/common/memory.h"
40
#include "exec/scan/scanner.h"
41
#include "exec/scan/task_executor/split_runner.h"
42
#include "runtime/runtime_profile.h"
43
44
namespace doris {
45
46
class RuntimeState;
47
class TupleDescriptor;
48
class WorkloadGroup;
49
50
class ScanLocalStateBase;
51
class Dependency;
52
53
class Scanner;
54
class ScannerDelegate;
55
class ScannerScheduler;
56
class TaskExecutor;
57
class TaskHandle;
58
59
// Adaptive processor for dynamic scanner concurrency adjustment
60
struct ScannerAdaptiveProcessor {
61
1.25M
    ENABLE_FACTORY_CREATOR(ScannerAdaptiveProcessor)
62
1.25M
    ScannerAdaptiveProcessor() = default;
63
1.25M
    ~ScannerAdaptiveProcessor() = default;
64
1.25M
    // Expected scanners in this cycle
65
66
1.25M
    int expected_scanners = 0;
67
1.25M
    // Timing metrics
68
1.25M
    // int64_t context_start_time = 0;
69
1.25M
    // int64_t scanner_total_halt_time = 0;
70
1.25M
    // int64_t scanner_gen_blocks_time = 0;
71
    // std::atomic_int64_t scanner_total_io_time = 0;
72
    // std::atomic_int64_t scanner_total_running_time = 0;
73
    // std::atomic_int64_t scanner_total_scan_bytes = 0;
74
75
    // Timestamps
76
    // std::atomic_int64_t last_scanner_finish_timestamp = 0;
77
    // int64_t check_all_scanners_last_timestamp = 0;
78
    // int64_t last_driver_output_full_timestamp = 0;
79
    int64_t adjust_scanners_last_timestamp = 0;
80
81
    // Adjustment strategy fields
82
    // bool try_add_scanners = false;
83
    // double expected_speedup_ratio = 0;
84
    // double last_scanner_scan_speed = 0;
85
    // int64_t last_scanner_total_scan_bytes = 0;
86
    // int try_add_scanners_fail_count = 0;
87
2.79k
    // int check_slow_io = 0;
88
2.79k
    // int32_t slow_io_latency_ms = 100; // Default from config
89
};
90
0
91
0
class ScanTask {
92
2.79k
public:
93
2.79k
    enum class State : int {
94
1.39k
        PENDING,   // not scheduled yet
95
3.82M
        IN_FLIGHT, // scheduled and running
96
2.53M
        COMPLETED, // finished with result or error, waiting to be collected by scan node
97
1.26M
        EOS,       // finished and no more data, waiting to be collected by scan node
98
    };
99
    ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) {
100
        _resource_ctx = thread_context()->resource_ctx();
101
        DorisMetrics::instance()->scanner_task_cnt->increment(1);
102
    }
103
104
    ~ScanTask() {
105
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
106
        DorisMetrics::instance()->scanner_task_cnt->increment(-1);
107
        cached_block.reset();
108
    }
109
110
private:
111
    // whether current scanner is finished
112
    Status status = Status::OK();
113
    std::shared_ptr<ResourceContext> _resource_ctx;
114
    State _state = State::PENDING;
115
116
public:
117
    std::weak_ptr<ScannerDelegate> scanner;
118
    BlockUPtr cached_block = nullptr;
119
    bool is_first_schedule = true;
120
    // Use weak_ptr to avoid circular references and potential memory leaks with SplitRunner.
121
    // ScannerContext only needs to observe the lifetime of SplitRunner without owning it.
122
    // When SplitRunner is destroyed, split_runner.lock() will return nullptr, ensuring safe access.
123
    std::weak_ptr<SplitRunner> split_runner;
124
125
    void set_status(Status _status) {
126
        if (_status.is<ErrorCode::END_OF_FILE>()) {
127
            // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error
128
            _state = State::EOS;
129
        }
130
        status = _status;
131
1.35M
    }
132
    Status get_status() const { return status; }
133
24.1M
    bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); }
134
    bool is_eos() const { return _state == State::EOS; }
135
    void set_state(State state) {
136
        switch (state) {
137
        case State::PENDING:
138
            DCHECK(_state == State::PENDING || _state == State::IN_FLIGHT) << (int)_state;
139
            DCHECK(cached_block == nullptr);
140
            break;
141
        case State::IN_FLIGHT:
142
            DCHECK(_state == State::COMPLETED || _state == State::PENDING ||
143
                   _state == State::IN_FLIGHT)
144
                    << (int)_state;
145
            DCHECK(cached_block == nullptr);
146
            break;
147
        case State::COMPLETED:
148
            DCHECK(_state == State::IN_FLIGHT) << (int)_state;
149
            DCHECK(cached_block != nullptr);
150
            break;
151
        case State::EOS:
152
            DCHECK(_state == State::IN_FLIGHT || status.is<ErrorCode::END_OF_FILE>())
153
                    << (int)_state;
154
5.22M
            break;
155
        default:
156
            break;
157
        }
158
1.26M
159
        _state = state;
160
0
    }
161
};
162
2.52M
163
// ScannerContext is responsible for recording the execution status
164
// of a group of Scanners corresponding to a ScanNode.
165
// Including how many scanners are being scheduled, and maintaining
166
461k
// a producer-consumer blocks queue between scanners and scan nodes.
167
//
168
// ScannerContext is also the scheduling unit of ScannerScheduler.
169
// ScannerScheduler schedules a ScannerContext at a time,
170
// and submits the Scanners to the scanner thread pool for data scanning.
171
class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
172
                       public HasTaskExecutionCtx {
173
5
    ENABLE_FACTORY_CREATOR(ScannerContext);
174
5
    friend class ScannerScheduler;
175
5
176
public:
177
4
    ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
178
                   const TupleDescriptor* output_tuple_desc,
179
1
                   const RowDescriptor* output_row_descriptor,
180
                   const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_,
181
                   std::shared_ptr<Dependency> dependency, std::atomic<int64_t>* shared_scan_limit,
182
                   std::shared_ptr<MemShareArbitrator> arb, std::shared_ptr<MemLimiter> limiter,
183
                   int ins_idx, bool enable_adaptive_scan
184
#ifdef BE_TEST
185
                   ,
186
                   int num_parallel_instances
187
0
#endif
188
0
    );
189
0
190
0
    ~ScannerContext() override;
191
    Status init();
192
193
    // TODO(gabriel): we can also consider to return a list of blocks to reduce the scheduling overhead, but it may cause larger memory usage and more complex logic of block management.
194
    BlockUPtr get_free_block(bool force);
195
    void return_free_block(BlockUPtr block);
196
    void clear_free_blocks();
197
    inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
198
199
    int64_t block_memory_usage() { return _block_memory_usage; }
200
201
    // Caller should make sure the pipeline task is still running when calling this function
202
    void update_peak_running_scanner(int num);
203
    void reestimated_block_mem_bytes(int64_t num);
204
205
    // Get next block from blocks queue. Called by ScanNode/ScanOperator
206
    // Set eos to true if there is no more data to read.
207
    Status get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id);
208
209
    [[nodiscard]] Status validate_block_schema(Block* block);
210
211
    // submit the running scanner to thread pool in `ScannerScheduler`
212
    // set the next scanned block to `ScanTask::current_block`
213
    // set the error state to `ScanTask::status`
214
    // set the `eos` to `ScanTask::eos` if there is no more data in current scanner
215
    Status submit_scan_task(std::shared_ptr<ScanTask> scan_task, std::unique_lock<std::mutex>&);
216
217
    // Push back a scan task.
218
    void push_back_scan_task(std::shared_ptr<ScanTask> scan_task);
219
220
    // Return true if this ScannerContext need no more process
221
    bool done() const { return _is_finished || _should_stop; }
222
223
    std::string debug_string();
224
225
    std::shared_ptr<TaskHandle> task_handle() const { return _task_handle; }
226
227
    std::shared_ptr<ResourceContext> resource_ctx() const { return _resource_ctx; }
228
229
1.36M
    RuntimeState* state() { return _state; }
230
231
    void stop_scanners(RuntimeState* state);
232
233
    int batch_size() const { return _batch_size; }
234
235
    // During low memory mode, there will be at most 4 scanners running and every scanner will
236
    // cache at most 1MB data. So that every instance will keep 8MB buffer.
237
    bool low_memory_mode() const;
238
239
    // TODO(yiguolei) add this as session variable
240
    int32_t low_memory_mode_scan_bytes_per_scanner() const {
241
        return 1 * 1024 * 1024; // 1MB
242
    }
243
244
    int32_t low_memory_mode_scanners() const { return 4; }
245
246
    ScanLocalStateBase* local_state() const { return _local_state; }
247
248
    // the unique id of this context
249
    std::string ctx_id;
250
    TUniqueId _query_id;
251
252
    bool _should_reset_thread_name = true;
253
254
    int32_t num_scheduled_scanners() {
255
        std::lock_guard<std::mutex> l(_transfer_lock);
256
        return _in_flight_tasks_num;
257
    }
258
259
    Status schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
260
                              std::unique_lock<std::mutex>& transfer_lock,
261
                              std::unique_lock<std::shared_mutex>& scheduler_lock);
262
263
protected:
264
    /// Four criteria to determine whether to increase the parallelism of the scanners
265
    /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
266
    /// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
267
    /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
268
    /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
269
    void _set_scanner_done();
270
271
    RuntimeState* _state = nullptr;
272
    ScanLocalStateBase* _local_state = nullptr;
273
274
    // the comment of same fields in VScanNode
275
    const TupleDescriptor* _output_tuple_desc = nullptr;
276
    const RowDescriptor* _output_row_descriptor = nullptr;
277
278
    Status _process_status = Status::OK();
279
    std::atomic_bool _should_stop = false;
280
    std::atomic_bool _is_finished = false;
281
282
    // Lazy-allocated blocks for all scanners to share, for memory reuse.
283
    moodycamel::ConcurrentQueue<BlockUPtr> _free_blocks;
284
285
    int _batch_size;
286
    // The limit from SQL's limit clause
287
    int64_t limit;
288
    // Points to the shared remaining limit on ScanOperatorX, shared across all
289
    // parallel instances and their scanners. -1 means no limit.
290
    std::atomic<int64_t>* _shared_scan_limit = nullptr;
291
    // Atomically acquire up to `desired` rows. Returns actual granted count (0 = exhausted).
292
    int64_t acquire_limit_quota(int64_t desired);
293
    int64_t remaining_limit() const { return _shared_scan_limit->load(std::memory_order_acquire); }
294
295
    int64_t _max_bytes_in_queue = 0;
296
    // _transfer_lock protects _completed_tasks, _pending_tasks, and all other shared state
297
    // accessed by both the scanner thread pool and the operator (get_block_from_queue).
298
    std::mutex _transfer_lock;
299
300
    // Together, _completed_tasks and _in_flight_tasks_num represent all "occupied" concurrency
301
    // slots.  The scheduler uses their sum as the current concurrency:
302
    //
303
    //   current_concurrency = _completed_tasks.size() + _in_flight_tasks_num
304
    //
305
    // Lifecycle of a ScanTask:
306
    //   _pending_tasks  --(submit_scan_task)--> [thread pool]  --(push_back_scan_task)-->
307
    //   _completed_tasks  --(get_block_from_queue)--> operator
308
    //   After consumption: non-EOS task goes back to _pending_tasks; EOS increments
309
    //   _num_finished_scanners.
310
311
    // Completed scan tasks whose cached_block is ready for the operator to consume.
312
    // Protected by _transfer_lock.  Written by push_back_scan_task() (scanner thread),
313
    // read/popped by get_block_from_queue() (operator thread).
314
    std::list<std::shared_ptr<ScanTask>> _completed_tasks;
315
316
    // Scanners waiting to be submitted to the scheduler thread pool.  Stored as a stack
317
    // (LIFO) so that recently-used scanners are re-scheduled first, which is more likely
318
    // to be cache-friendly.  Protected by _transfer_lock.  Populated in the constructor
319
    // and by schedule_scan_task() when the concurrency limit is reached; drained by
320
    // _pull_next_scan_task() during scheduling.
321
    std::stack<std::shared_ptr<ScanTask>> _pending_tasks;
322
323
    // Number of scan tasks currently submitted to the scanner scheduler thread pool
324
    // (i.e. in-flight).  Incremented by submit_scan_task() before submission and
325
    // decremented by push_back_scan_task() when the thread pool returns the task.
326
    // Declared atomic so it can be read without _transfer_lock in non-critical paths,
327
    // but must be read under _transfer_lock whenever combined with _completed_tasks.size()
328
    // to form a consistent concurrency snapshot.
329
    std::atomic_int _in_flight_tasks_num = 0;
330
    // Scanner that is eos or error.
331
    int32_t _num_finished_scanners = 0;
332
    // weak pointer for _scanners, used in stop function
333
    std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
334
    std::shared_ptr<RuntimeProfile> _scanner_profile;
335
    // This counter refers to scan operator's local state
336
    RuntimeProfile::Counter* _scanner_memory_used_counter = nullptr;
337
    RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
338
    RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
339
    std::shared_ptr<ResourceContext> _resource_ctx;
340
    std::shared_ptr<Dependency> _dependency = nullptr;
341
    std::shared_ptr<doris::TaskHandle> _task_handle;
342
343
    std::atomic<int64_t> _block_memory_usage = 0;
344
345
    // adaptive scan concurrency related
346
347
    ScannerScheduler* _scanner_scheduler = nullptr;
348
    MOCK_REMOVE(const) int32_t _min_scan_concurrency_of_scan_scheduler = 0;
349
    // The overall target of our system is to make full utilization of the resources.
350
    // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
351
    // Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
352
    // So that for a single query, we can make sure it could make full utilization of the resource.
353
    int32_t _max_scan_concurrency = 0;
354
    MOCK_REMOVE(const) int32_t _min_scan_concurrency = 1;
355
356
    std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask> current_scan_task,
357
                                                   int32_t current_concurrency);
358
359
    int32_t _get_margin(std::unique_lock<std::mutex>& transfer_lock,
360
                        std::unique_lock<std::shared_mutex>& scheduler_lock);
361
362
    // Memory-aware adaptive scheduling
363
    std::shared_ptr<MemLimiter> _scanner_mem_limiter = nullptr;
364
    std::shared_ptr<MemShareArbitrator> _mem_share_arb = nullptr;
365
    std::shared_ptr<ScannerAdaptiveProcessor> _adaptive_processor = nullptr;
366
    const int _ins_idx;
367
    const bool _enable_adaptive_scanners = false;
368
369
    // Adjust scan memory limit based on arbitrator feedback
370
    void _adjust_scan_mem_limit(int64_t old_scanner_mem_bytes, int64_t new_scanner_mem_bytes);
371
372
    // Calculate available scanner count for adaptive scheduling
373
    int _available_pickup_scanner_count();
374
375
    // TODO: Add implementation of runtime_info_feed_back
376
    // adaptive scan concurrency related end
377
};
378
} // namespace doris