Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/types.h>
21
#include <stdint.h>
22
23
#include <atomic>
24
#include <cstdint>
25
#include <list>
26
#include <memory>
27
#include <mutex>
28
#include <stack>
29
#include <string>
30
#include <utility>
31
#include <vector>
32
33
#include "common/config.h"
34
#include "common/factory_creator.h"
35
#include "common/metrics/doris_metrics.h"
36
#include "common/status.h"
37
#include "concurrentqueue.h"
38
#include "core/block/block.h"
39
#include "exec/scan/scanner.h"
40
#include "exec/scan/task_executor/split_runner.h"
41
#include "runtime/runtime_profile.h"
42
43
namespace doris {
44
45
class RuntimeState;
46
class TupleDescriptor;
47
class WorkloadGroup;
48
49
class ScanLocalStateBase;
50
class Dependency;
51
52
class Scanner;
53
class ScannerDelegate;
54
class ScannerScheduler;
55
class ScannerScheduler;
56
class TaskExecutor;
57
class TaskHandle;
58
59
class ScanTask {
60
public:
61
405k
    ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) {
62
405k
        _resource_ctx = thread_context()->resource_ctx();
63
405k
        DorisMetrics::instance()->scanner_task_cnt->increment(1);
64
405k
    }
65
66
406k
    ~ScanTask() {
67
406k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
68
406k
        cached_blocks.clear();
69
406k
        DorisMetrics::instance()->scanner_task_cnt->increment(-1);
70
406k
    }
71
72
private:
73
    // whether current scanner is finished
74
    bool eos = false;
75
    Status status = Status::OK();
76
    std::shared_ptr<ResourceContext> _resource_ctx;
77
78
public:
79
    std::weak_ptr<ScannerDelegate> scanner;
80
    std::list<std::pair<BlockUPtr, size_t>> cached_blocks;
81
    bool is_first_schedule = true;
82
    // Use weak_ptr to avoid circular references and potential memory leaks with SplitRunner.
83
    // ScannerContext only needs to observe the lifetime of SplitRunner without owning it.
84
    // When SplitRunner is destroyed, split_runner.lock() will return nullptr, ensuring safe access.
85
    std::weak_ptr<SplitRunner> split_runner;
86
87
156
    void set_status(Status _status) {
88
156
        if (_status.is<ErrorCode::END_OF_FILE>()) {
89
            // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error
90
0
            eos = true;
91
0
        }
92
156
        status = _status;
93
156
    }
94
78
    Status get_status() const { return status; }
95
1.27M
    bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); }
96
837k
    bool is_eos() const { return eos; }
97
412k
    void set_eos(bool _eos) { eos = _eos; }
98
};
99
100
// ScannerContext is responsible for recording the execution status
101
// of a group of Scanners corresponding to a ScanNode.
102
// Including how many scanners are being scheduled, and maintaining
103
// a producer-consumer blocks queue between scanners and scan nodes.
104
//
105
// ScannerContext is also the scheduling unit of ScannerScheduler.
106
// ScannerScheduler schedules a ScannerContext at a time,
107
// and submits the Scanners to the scanner thread pool for data scanning.
108
class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
109
                       public HasTaskExecutionCtx {
110
    ENABLE_FACTORY_CREATOR(ScannerContext);
111
    friend class ScannerScheduler;
112
113
public:
114
    ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
115
                   const TupleDescriptor* output_tuple_desc,
116
                   const RowDescriptor* output_row_descriptor,
117
                   const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_,
118
                   std::shared_ptr<Dependency> dependency
119
#ifdef BE_TEST
120
                   ,
121
                   int num_parallel_instances
122
#endif
123
    );
124
125
    ~ScannerContext() override;
126
    Status init();
127
128
    BlockUPtr get_free_block(bool force);
129
    void return_free_block(BlockUPtr block);
130
    void clear_free_blocks();
131
475k
    inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
132
133
448k
    int64_t block_memory_usage() { return _block_memory_usage; }
134
135
    // Caller should make sure the pipeline task is still running when calling this function
136
    void update_peak_running_scanner(int num);
137
138
    // Get next block from blocks queue. Called by ScanNode/ScanOperator
139
    // Set eos to true if there is no more data to read.
140
    Status get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id);
141
142
    [[nodiscard]] Status validate_block_schema(Block* block);
143
144
    // submit the running scanner to thread pool in `ScannerScheduler`
145
    // set the next scanned block to `ScanTask::current_block`
146
    // set the error state to `ScanTask::status`
147
    // set the `eos` to `ScanTask::eos` if there is no more data in current scanner
148
    Status submit_scan_task(std::shared_ptr<ScanTask> scan_task, std::unique_lock<std::mutex>&);
149
150
    // Push back a scan task.
151
    void push_back_scan_task(std::shared_ptr<ScanTask> scan_task);
152
153
    // Return true if this ScannerContext need no more process
154
1.78M
    bool done() const { return _is_finished || _should_stop; }
155
156
    std::string debug_string();
157
158
412k
    std::shared_ptr<TaskHandle> task_handle() const { return _task_handle; }
159
160
0
    std::shared_ptr<ResourceContext> resource_ctx() const { return _resource_ctx; }
161
162
824k
    RuntimeState* state() { return _state; }
163
164
    void stop_scanners(RuntimeState* state);
165
166
202k
    int batch_size() const { return _batch_size; }
167
168
    // During low memory mode, there will be at most 4 scanners running and every scanner will
169
    // cache at most 1MB data. So that every instance will keep 8MB buffer.
170
    bool low_memory_mode() const;
171
172
    // TODO(yiguolei) add this as session variable
173
4
    int32_t low_memory_mode_scan_bytes_per_scanner() const {
174
4
        return 1 * 1024 * 1024; // 1MB
175
4
    }
176
177
5
    int32_t low_memory_mode_scanners() const { return 4; }
178
179
0
    ScanLocalStateBase* local_state() const { return _local_state; }
180
181
    // the unique id of this context
182
    std::string ctx_id;
183
    TUniqueId _query_id;
184
185
    bool _should_reset_thread_name = true;
186
187
0
    int32_t num_scheduled_scanners() {
188
0
        std::lock_guard<std::mutex> l(_transfer_lock);
189
0
        return _num_scheduled_scanners;
190
0
    }
191
192
    Status schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
193
                              std::unique_lock<std::mutex>& transfer_lock,
194
                              std::unique_lock<std::shared_mutex>& scheduler_lock);
195
196
protected:
197
    /// Four criteria to determine whether to increase the parallelism of the scanners
198
    /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
199
    /// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
200
    /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
201
    /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
202
    void _set_scanner_done();
203
204
    RuntimeState* _state = nullptr;
205
    ScanLocalStateBase* _local_state = nullptr;
206
207
    // the comment of same fields in VScanNode
208
    const TupleDescriptor* _output_tuple_desc = nullptr;
209
    const RowDescriptor* _output_row_descriptor = nullptr;
210
211
    std::mutex _transfer_lock;
212
    std::list<std::shared_ptr<ScanTask>> _tasks_queue;
213
214
    Status _process_status = Status::OK();
215
    std::atomic_bool _should_stop = false;
216
    std::atomic_bool _is_finished = false;
217
218
    // Lazy-allocated blocks for all scanners to share, for memory reuse.
219
    moodycamel::ConcurrentQueue<BlockUPtr> _free_blocks;
220
221
    int _batch_size;
222
    // The limit from SQL's limit clause
223
    int64_t limit;
224
225
    int64_t _max_bytes_in_queue = 0;
226
    // Using stack so that we can resubmit scanner in a LIFO order, maybe more cache friendly
227
    std::stack<std::shared_ptr<ScanTask>> _pending_scanners;
228
    // Scanner that is submitted to the scheduler.
229
    std::atomic_int _num_scheduled_scanners = 0;
230
    // Scanner that is eos or error.
231
    int32_t _num_finished_scanners = 0;
232
    // weak pointer for _scanners, used in stop function
233
    std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
234
    std::shared_ptr<RuntimeProfile> _scanner_profile;
235
    // This counter refers to scan operator's local state
236
    RuntimeProfile::Counter* _scanner_memory_used_counter = nullptr;
237
    RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
238
    RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
239
    std::shared_ptr<ResourceContext> _resource_ctx;
240
    std::shared_ptr<Dependency> _dependency = nullptr;
241
    std::shared_ptr<doris::TaskHandle> _task_handle;
242
243
    std::atomic<int64_t> _block_memory_usage = 0;
244
245
    // adaptive scan concurrency related
246
247
    ScannerScheduler* _scanner_scheduler = nullptr;
248
    MOCK_REMOVE(const) int32_t _min_scan_concurrency_of_scan_scheduler = 0;
249
    // The overall target of our system is to make full utilization of the resources.
250
    // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
251
    // Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
252
    // So that for a single query, we can make sure it could make full utilization of the resource.
253
    int32_t _max_scan_concurrency = 0;
254
    MOCK_REMOVE(const) int32_t _min_scan_concurrency = 1;
255
256
    std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask> current_scan_task,
257
                                                   int32_t current_concurrency);
258
259
    int32_t _get_margin(std::unique_lock<std::mutex>& transfer_lock,
260
                        std::unique_lock<std::shared_mutex>& scheduler_lock);
261
262
    // TODO: Add implementation of runtime_info_feed_back
263
    // adaptive scan concurrency related end
264
};
265
} // namespace doris