be/src/exec/scan/scanner_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bthread/types.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <atomic> |
24 | | #include <cstdint> |
25 | | #include <list> |
26 | | #include <memory> |
27 | | #include <mutex> |
28 | | #include <stack> |
29 | | #include <string> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/config.h" |
34 | | #include "common/factory_creator.h" |
35 | | #include "common/metrics/doris_metrics.h" |
36 | | #include "common/status.h" |
37 | | #include "concurrentqueue.h" |
38 | | #include "core/block/block.h" |
39 | | #include "exec/scan/scanner.h" |
40 | | #include "exec/scan/task_executor/split_runner.h" |
41 | | #include "runtime/runtime_profile.h" |
42 | | |
43 | | namespace doris { |
44 | | |
45 | | class RuntimeState; |
46 | | class TupleDescriptor; |
47 | | class WorkloadGroup; |
48 | | |
49 | | class ScanLocalStateBase; |
50 | | class Dependency; |
51 | | |
52 | | class Scanner; |
53 | | class ScannerDelegate; |
54 | | class ScannerScheduler; |
55 | | class ScannerScheduler; |
56 | | class TaskExecutor; |
57 | | class TaskHandle; |
58 | | |
59 | | class ScanTask { |
60 | | public: |
61 | 405k | ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) { |
62 | 405k | _resource_ctx = thread_context()->resource_ctx(); |
63 | 405k | DorisMetrics::instance()->scanner_task_cnt->increment(1); |
64 | 405k | } |
65 | | |
66 | 406k | ~ScanTask() { |
67 | 406k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); |
68 | 406k | cached_blocks.clear(); |
69 | 406k | DorisMetrics::instance()->scanner_task_cnt->increment(-1); |
70 | 406k | } |
71 | | |
72 | | private: |
73 | | // whether current scanner is finished |
74 | | bool eos = false; |
75 | | Status status = Status::OK(); |
76 | | std::shared_ptr<ResourceContext> _resource_ctx; |
77 | | |
78 | | public: |
79 | | std::weak_ptr<ScannerDelegate> scanner; |
80 | | std::list<std::pair<BlockUPtr, size_t>> cached_blocks; |
81 | | bool is_first_schedule = true; |
82 | | // Use weak_ptr to avoid circular references and potential memory leaks with SplitRunner. |
83 | | // ScannerContext only needs to observe the lifetime of SplitRunner without owning it. |
84 | | // When SplitRunner is destroyed, split_runner.lock() will return nullptr, ensuring safe access. |
85 | | std::weak_ptr<SplitRunner> split_runner; |
86 | | |
87 | 156 | void set_status(Status _status) { |
88 | 156 | if (_status.is<ErrorCode::END_OF_FILE>()) { |
89 | | // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error |
90 | 0 | eos = true; |
91 | 0 | } |
92 | 156 | status = _status; |
93 | 156 | } |
94 | 78 | Status get_status() const { return status; } |
95 | 1.27M | bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); } |
96 | 837k | bool is_eos() const { return eos; } |
97 | 412k | void set_eos(bool _eos) { eos = _eos; } |
98 | | }; |
99 | | |
100 | | // ScannerContext is responsible for recording the execution status |
101 | | // of a group of Scanners corresponding to a ScanNode. |
102 | | // Including how many scanners are being scheduled, and maintaining |
103 | | // a producer-consumer blocks queue between scanners and scan nodes. |
104 | | // |
105 | | // ScannerContext is also the scheduling unit of ScannerScheduler. |
106 | | // ScannerScheduler schedules a ScannerContext at a time, |
107 | | // and submits the Scanners to the scanner thread pool for data scanning. |
108 | | class ScannerContext : public std::enable_shared_from_this<ScannerContext>, |
109 | | public HasTaskExecutionCtx { |
110 | | ENABLE_FACTORY_CREATOR(ScannerContext); |
111 | | friend class ScannerScheduler; |
112 | | |
113 | | public: |
114 | | ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, |
115 | | const TupleDescriptor* output_tuple_desc, |
116 | | const RowDescriptor* output_row_descriptor, |
117 | | const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, |
118 | | std::shared_ptr<Dependency> dependency |
119 | | #ifdef BE_TEST |
120 | | , |
121 | | int num_parallel_instances |
122 | | #endif |
123 | | ); |
124 | | |
125 | | ~ScannerContext() override; |
126 | | Status init(); |
127 | | |
128 | | BlockUPtr get_free_block(bool force); |
129 | | void return_free_block(BlockUPtr block); |
130 | | void clear_free_blocks(); |
131 | 475k | inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; } |
132 | | |
133 | 448k | int64_t block_memory_usage() { return _block_memory_usage; } |
134 | | |
135 | | // Caller should make sure the pipeline task is still running when calling this function |
136 | | void update_peak_running_scanner(int num); |
137 | | |
138 | | // Get next block from blocks queue. Called by ScanNode/ScanOperator |
139 | | // Set eos to true if there is no more data to read. |
140 | | Status get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id); |
141 | | |
142 | | [[nodiscard]] Status validate_block_schema(Block* block); |
143 | | |
144 | | // submit the running scanner to thread pool in `ScannerScheduler` |
145 | | // set the next scanned block to `ScanTask::current_block` |
146 | | // set the error state to `ScanTask::status` |
147 | | // set the `eos` to `ScanTask::eos` if there is no more data in current scanner |
148 | | Status submit_scan_task(std::shared_ptr<ScanTask> scan_task, std::unique_lock<std::mutex>&); |
149 | | |
150 | | // Push back a scan task. |
151 | | void push_back_scan_task(std::shared_ptr<ScanTask> scan_task); |
152 | | |
153 | | // Return true if this ScannerContext need no more process |
154 | 1.78M | bool done() const { return _is_finished || _should_stop; } |
155 | | |
156 | | std::string debug_string(); |
157 | | |
158 | 412k | std::shared_ptr<TaskHandle> task_handle() const { return _task_handle; } |
159 | | |
160 | 0 | std::shared_ptr<ResourceContext> resource_ctx() const { return _resource_ctx; } |
161 | | |
162 | 824k | RuntimeState* state() { return _state; } |
163 | | |
164 | | void stop_scanners(RuntimeState* state); |
165 | | |
166 | 202k | int batch_size() const { return _batch_size; } |
167 | | |
168 | | // During low memory mode, there will be at most 4 scanners running and every scanner will |
169 | | // cache at most 1MB data. So that every instance will keep 8MB buffer. |
170 | | bool low_memory_mode() const; |
171 | | |
172 | | // TODO(yiguolei) add this as session variable |
173 | 4 | int32_t low_memory_mode_scan_bytes_per_scanner() const { |
174 | 4 | return 1 * 1024 * 1024; // 1MB |
175 | 4 | } |
176 | | |
177 | 5 | int32_t low_memory_mode_scanners() const { return 4; } |
178 | | |
179 | 0 | ScanLocalStateBase* local_state() const { return _local_state; } |
180 | | |
181 | | // the unique id of this context |
182 | | std::string ctx_id; |
183 | | TUniqueId _query_id; |
184 | | |
185 | | bool _should_reset_thread_name = true; |
186 | | |
187 | 0 | int32_t num_scheduled_scanners() { |
188 | 0 | std::lock_guard<std::mutex> l(_transfer_lock); |
189 | 0 | return _num_scheduled_scanners; |
190 | 0 | } |
191 | | |
192 | | Status schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
193 | | std::unique_lock<std::mutex>& transfer_lock, |
194 | | std::unique_lock<std::shared_mutex>& scheduler_lock); |
195 | | |
196 | | protected: |
197 | | /// Four criteria to determine whether to increase the parallelism of the scanners |
198 | | /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up |
199 | | /// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks |
200 | | /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up |
201 | | /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num` |
202 | | void _set_scanner_done(); |
203 | | |
204 | | RuntimeState* _state = nullptr; |
205 | | ScanLocalStateBase* _local_state = nullptr; |
206 | | |
207 | | // the comment of same fields in VScanNode |
208 | | const TupleDescriptor* _output_tuple_desc = nullptr; |
209 | | const RowDescriptor* _output_row_descriptor = nullptr; |
210 | | |
211 | | std::mutex _transfer_lock; |
212 | | std::list<std::shared_ptr<ScanTask>> _tasks_queue; |
213 | | |
214 | | Status _process_status = Status::OK(); |
215 | | std::atomic_bool _should_stop = false; |
216 | | std::atomic_bool _is_finished = false; |
217 | | |
218 | | // Lazy-allocated blocks for all scanners to share, for memory reuse. |
219 | | moodycamel::ConcurrentQueue<BlockUPtr> _free_blocks; |
220 | | |
221 | | int _batch_size; |
222 | | // The limit from SQL's limit clause |
223 | | int64_t limit; |
224 | | |
225 | | int64_t _max_bytes_in_queue = 0; |
226 | | // Using stack so that we can resubmit scanner in a LIFO order, maybe more cache friendly |
227 | | std::stack<std::shared_ptr<ScanTask>> _pending_scanners; |
228 | | // Scanner that is submitted to the scheduler. |
229 | | std::atomic_int _num_scheduled_scanners = 0; |
230 | | // Scanner that is eos or error. |
231 | | int32_t _num_finished_scanners = 0; |
232 | | // weak pointer for _scanners, used in stop function |
233 | | std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners; |
234 | | std::shared_ptr<RuntimeProfile> _scanner_profile; |
235 | | // This counter refers to scan operator's local state |
236 | | RuntimeProfile::Counter* _scanner_memory_used_counter = nullptr; |
237 | | RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; |
238 | | RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; |
239 | | std::shared_ptr<ResourceContext> _resource_ctx; |
240 | | std::shared_ptr<Dependency> _dependency = nullptr; |
241 | | std::shared_ptr<doris::TaskHandle> _task_handle; |
242 | | |
243 | | std::atomic<int64_t> _block_memory_usage = 0; |
244 | | |
245 | | // adaptive scan concurrency related |
246 | | |
247 | | ScannerScheduler* _scanner_scheduler = nullptr; |
248 | | MOCK_REMOVE(const) int32_t _min_scan_concurrency_of_scan_scheduler = 0; |
249 | | // The overall target of our system is to make full utilization of the resources. |
250 | | // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary. |
251 | | // Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource. |
252 | | // So that for a single query, we can make sure it could make full utilization of the resource. |
253 | | int32_t _max_scan_concurrency = 0; |
254 | | MOCK_REMOVE(const) int32_t _min_scan_concurrency = 1; |
255 | | |
256 | | std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
257 | | int32_t current_concurrency); |
258 | | |
259 | | int32_t _get_margin(std::unique_lock<std::mutex>& transfer_lock, |
260 | | std::unique_lock<std::shared_mutex>& scheduler_lock); |
261 | | |
262 | | // TODO: Add implementation of runtime_info_feed_back |
263 | | // adaptive scan concurrency related end |
264 | | }; |
265 | | } // namespace doris |