be/src/exec/scan/scanner_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bthread/types.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <atomic> |
24 | | #include <cstdint> |
25 | | #include <list> |
26 | | #include <memory> |
27 | | #include <mutex> |
28 | | #include <stack> |
29 | | #include <string> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/config.h" |
34 | | #include "common/factory_creator.h" |
35 | | #include "common/metrics/doris_metrics.h" |
36 | | #include "common/status.h" |
37 | | #include "concurrentqueue.h" |
38 | | #include "core/block/block.h" |
39 | | #include "exec/common/memory.h" |
40 | | #include "exec/scan/scanner.h" |
41 | | #include "exec/scan/task_executor/split_runner.h" |
42 | | #include "runtime/runtime_profile.h" |
43 | | |
44 | | namespace doris { |
45 | | |
46 | | class RuntimeState; |
47 | | class TupleDescriptor; |
48 | | class WorkloadGroup; |
49 | | |
50 | | class ScanLocalStateBase; |
51 | | class Dependency; |
52 | | |
53 | | class Scanner; |
54 | | class ScannerDelegate; |
55 | | class ScannerScheduler; |
56 | | class TaskExecutor; |
57 | | class TaskHandle; |
58 | | |
59 | | // Adaptive processor for dynamic scanner concurrency adjustment |
60 | | struct ScannerAdaptiveProcessor { |
61 | 1.25M | ENABLE_FACTORY_CREATOR(ScannerAdaptiveProcessor) |
62 | 1.25M | ScannerAdaptiveProcessor() = default; |
63 | 1.25M | ~ScannerAdaptiveProcessor() = default; |
64 | 1.25M | // Expected scanners in this cycle |
65 | | |
66 | 1.25M | int expected_scanners = 0; |
67 | 1.25M | // Timing metrics |
68 | 1.25M | // int64_t context_start_time = 0; |
69 | 1.25M | // int64_t scanner_total_halt_time = 0; |
70 | 1.25M | // int64_t scanner_gen_blocks_time = 0; |
71 | | // std::atomic_int64_t scanner_total_io_time = 0; |
72 | | // std::atomic_int64_t scanner_total_running_time = 0; |
73 | | // std::atomic_int64_t scanner_total_scan_bytes = 0; |
74 | | |
75 | | // Timestamps |
76 | | // std::atomic_int64_t last_scanner_finish_timestamp = 0; |
77 | | // int64_t check_all_scanners_last_timestamp = 0; |
78 | | // int64_t last_driver_output_full_timestamp = 0; |
79 | | int64_t adjust_scanners_last_timestamp = 0; |
80 | | |
81 | | // Adjustment strategy fields |
82 | | // bool try_add_scanners = false; |
83 | | // double expected_speedup_ratio = 0; |
84 | | // double last_scanner_scan_speed = 0; |
85 | | // int64_t last_scanner_total_scan_bytes = 0; |
86 | | // int try_add_scanners_fail_count = 0; |
87 | 2.79k | // int check_slow_io = 0; |
88 | 2.79k | // int32_t slow_io_latency_ms = 100; // Default from config |
89 | | }; |
90 | 0 |
|
91 | 0 | class ScanTask { |
92 | 2.79k | public: |
93 | 2.79k | enum class State : int { |
94 | 1.39k | PENDING, // not scheduled yet |
95 | 3.82M | IN_FLIGHT, // scheduled and running |
96 | 2.53M | COMPLETED, // finished with result or error, waiting to be collected by scan node |
97 | 1.26M | EOS, // finished and no more data, waiting to be collected by scan node |
98 | | }; |
99 | | ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) { |
100 | | _resource_ctx = thread_context()->resource_ctx(); |
101 | | DorisMetrics::instance()->scanner_task_cnt->increment(1); |
102 | | } |
103 | | |
104 | | ~ScanTask() { |
105 | | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); |
106 | | DorisMetrics::instance()->scanner_task_cnt->increment(-1); |
107 | | cached_block.reset(); |
108 | | } |
109 | | |
110 | | private: |
111 | | // whether current scanner is finished |
112 | | Status status = Status::OK(); |
113 | | std::shared_ptr<ResourceContext> _resource_ctx; |
114 | | State _state = State::PENDING; |
115 | | |
116 | | public: |
117 | | std::weak_ptr<ScannerDelegate> scanner; |
118 | | BlockUPtr cached_block = nullptr; |
119 | | bool is_first_schedule = true; |
120 | | // Use weak_ptr to avoid circular references and potential memory leaks with SplitRunner. |
121 | | // ScannerContext only needs to observe the lifetime of SplitRunner without owning it. |
122 | | // When SplitRunner is destroyed, split_runner.lock() will return nullptr, ensuring safe access. |
123 | | std::weak_ptr<SplitRunner> split_runner; |
124 | | |
125 | | void set_status(Status _status) { |
126 | | if (_status.is<ErrorCode::END_OF_FILE>()) { |
127 | | // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error |
128 | | _state = State::EOS; |
129 | | } |
130 | | status = _status; |
131 | 1.35M | } |
132 | | Status get_status() const { return status; } |
133 | 24.1M | bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); } |
134 | | bool is_eos() const { return _state == State::EOS; } |
135 | | void set_state(State state) { |
136 | | switch (state) { |
137 | | case State::PENDING: |
138 | | DCHECK(_state == State::PENDING || _state == State::IN_FLIGHT) << (int)_state; |
139 | | DCHECK(cached_block == nullptr); |
140 | | break; |
141 | | case State::IN_FLIGHT: |
142 | | DCHECK(_state == State::COMPLETED || _state == State::PENDING || |
143 | | _state == State::IN_FLIGHT) |
144 | | << (int)_state; |
145 | | DCHECK(cached_block == nullptr); |
146 | | break; |
147 | | case State::COMPLETED: |
148 | | DCHECK(_state == State::IN_FLIGHT) << (int)_state; |
149 | | DCHECK(cached_block != nullptr); |
150 | | break; |
151 | | case State::EOS: |
152 | | DCHECK(_state == State::IN_FLIGHT || status.is<ErrorCode::END_OF_FILE>()) |
153 | | << (int)_state; |
154 | 5.22M | break; |
155 | | default: |
156 | | break; |
157 | | } |
158 | 1.26M | |
159 | | _state = state; |
160 | 0 | } |
161 | | }; |
162 | 2.52M | |
163 | | // ScannerContext is responsible for recording the execution status |
164 | | // of a group of Scanners corresponding to a ScanNode. |
165 | | // Including how many scanners are being scheduled, and maintaining |
166 | 461k | // a producer-consumer blocks queue between scanners and scan nodes. |
167 | | // |
168 | | // ScannerContext is also the scheduling unit of ScannerScheduler. |
169 | | // ScannerScheduler schedules a ScannerContext at a time, |
170 | | // and submits the Scanners to the scanner thread pool for data scanning. |
171 | | class ScannerContext : public std::enable_shared_from_this<ScannerContext>, |
172 | | public HasTaskExecutionCtx { |
173 | 5 | ENABLE_FACTORY_CREATOR(ScannerContext); |
174 | 5 | friend class ScannerScheduler; |
175 | 5 | |
176 | | public: |
177 | 4 | ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, |
178 | | const TupleDescriptor* output_tuple_desc, |
179 | 1 | const RowDescriptor* output_row_descriptor, |
180 | | const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, |
181 | | std::shared_ptr<Dependency> dependency, std::atomic<int64_t>* shared_scan_limit, |
182 | | std::shared_ptr<MemShareArbitrator> arb, std::shared_ptr<MemLimiter> limiter, |
183 | | int ins_idx, bool enable_adaptive_scan |
184 | | #ifdef BE_TEST |
185 | | , |
186 | | int num_parallel_instances |
187 | 0 | #endif |
188 | 0 | ); |
189 | 0 |
|
190 | 0 | ~ScannerContext() override; |
191 | | Status init(); |
192 | | |
193 | | // TODO(gabriel): we can also consider to return a list of blocks to reduce the scheduling overhead, but it may cause larger memory usage and more complex logic of block management. |
194 | | BlockUPtr get_free_block(bool force); |
195 | | void return_free_block(BlockUPtr block); |
196 | | void clear_free_blocks(); |
197 | | inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; } |
198 | | |
199 | | int64_t block_memory_usage() { return _block_memory_usage; } |
200 | | |
201 | | // Caller should make sure the pipeline task is still running when calling this function |
202 | | void update_peak_running_scanner(int num); |
203 | | void reestimated_block_mem_bytes(int64_t num); |
204 | | |
205 | | // Get next block from blocks queue. Called by ScanNode/ScanOperator |
206 | | // Set eos to true if there is no more data to read. |
207 | | Status get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id); |
208 | | |
209 | | [[nodiscard]] Status validate_block_schema(Block* block); |
210 | | |
211 | | // submit the running scanner to thread pool in `ScannerScheduler` |
212 | | // set the next scanned block to `ScanTask::current_block` |
213 | | // set the error state to `ScanTask::status` |
214 | | // set the `eos` to `ScanTask::eos` if there is no more data in current scanner |
215 | | Status submit_scan_task(std::shared_ptr<ScanTask> scan_task, std::unique_lock<std::mutex>&); |
216 | | |
217 | | // Push back a scan task. |
218 | | void push_back_scan_task(std::shared_ptr<ScanTask> scan_task); |
219 | | |
220 | | // Return true if this ScannerContext need no more process |
221 | | bool done() const { return _is_finished || _should_stop; } |
222 | | |
223 | | std::string debug_string(); |
224 | | |
225 | | std::shared_ptr<TaskHandle> task_handle() const { return _task_handle; } |
226 | | |
227 | | std::shared_ptr<ResourceContext> resource_ctx() const { return _resource_ctx; } |
228 | | |
229 | 1.36M | RuntimeState* state() { return _state; } |
230 | | |
231 | | void stop_scanners(RuntimeState* state); |
232 | | |
233 | | int batch_size() const { return _batch_size; } |
234 | | |
235 | | // During low memory mode, there will be at most 4 scanners running and every scanner will |
236 | | // cache at most 1MB data. So that every instance will keep 8MB buffer. |
237 | | bool low_memory_mode() const; |
238 | | |
239 | | // TODO(yiguolei) add this as session variable |
240 | | int32_t low_memory_mode_scan_bytes_per_scanner() const { |
241 | | return 1 * 1024 * 1024; // 1MB |
242 | | } |
243 | | |
244 | | int32_t low_memory_mode_scanners() const { return 4; } |
245 | | |
246 | | ScanLocalStateBase* local_state() const { return _local_state; } |
247 | | |
248 | | // the unique id of this context |
249 | | std::string ctx_id; |
250 | | TUniqueId _query_id; |
251 | | |
252 | | bool _should_reset_thread_name = true; |
253 | | |
254 | | int32_t num_scheduled_scanners() { |
255 | | std::lock_guard<std::mutex> l(_transfer_lock); |
256 | | return _in_flight_tasks_num; |
257 | | } |
258 | | |
259 | | Status schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
260 | | std::unique_lock<std::mutex>& transfer_lock, |
261 | | std::unique_lock<std::shared_mutex>& scheduler_lock); |
262 | | |
263 | | protected: |
264 | | /// Four criteria to determine whether to increase the parallelism of the scanners |
265 | | /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up |
266 | | /// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks |
267 | | /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up |
268 | | /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num` |
269 | | void _set_scanner_done(); |
270 | | |
271 | | RuntimeState* _state = nullptr; |
272 | | ScanLocalStateBase* _local_state = nullptr; |
273 | | |
274 | | // the comment of same fields in VScanNode |
275 | | const TupleDescriptor* _output_tuple_desc = nullptr; |
276 | | const RowDescriptor* _output_row_descriptor = nullptr; |
277 | | |
278 | | Status _process_status = Status::OK(); |
279 | | std::atomic_bool _should_stop = false; |
280 | | std::atomic_bool _is_finished = false; |
281 | | |
282 | | // Lazy-allocated blocks for all scanners to share, for memory reuse. |
283 | | moodycamel::ConcurrentQueue<BlockUPtr> _free_blocks; |
284 | | |
285 | | int _batch_size; |
286 | | // The limit from SQL's limit clause |
287 | | int64_t limit; |
288 | | // Points to the shared remaining limit on ScanOperatorX, shared across all |
289 | | // parallel instances and their scanners. -1 means no limit. |
290 | | std::atomic<int64_t>* _shared_scan_limit = nullptr; |
291 | | // Atomically acquire up to `desired` rows. Returns actual granted count (0 = exhausted). |
292 | | int64_t acquire_limit_quota(int64_t desired); |
293 | | int64_t remaining_limit() const { return _shared_scan_limit->load(std::memory_order_acquire); } |
294 | | |
295 | | int64_t _max_bytes_in_queue = 0; |
296 | | // _transfer_lock protects _completed_tasks, _pending_tasks, and all other shared state |
297 | | // accessed by both the scanner thread pool and the operator (get_block_from_queue). |
298 | | std::mutex _transfer_lock; |
299 | | |
300 | | // Together, _completed_tasks and _in_flight_tasks_num represent all "occupied" concurrency |
301 | | // slots. The scheduler uses their sum as the current concurrency: |
302 | | // |
303 | | // current_concurrency = _completed_tasks.size() + _in_flight_tasks_num |
304 | | // |
305 | | // Lifecycle of a ScanTask: |
306 | | // _pending_tasks --(submit_scan_task)--> [thread pool] --(push_back_scan_task)--> |
307 | | // _completed_tasks --(get_block_from_queue)--> operator |
308 | | // After consumption: non-EOS task goes back to _pending_tasks; EOS increments |
309 | | // _num_finished_scanners. |
310 | | |
311 | | // Completed scan tasks whose cached_block is ready for the operator to consume. |
312 | | // Protected by _transfer_lock. Written by push_back_scan_task() (scanner thread), |
313 | | // read/popped by get_block_from_queue() (operator thread). |
314 | | std::list<std::shared_ptr<ScanTask>> _completed_tasks; |
315 | | |
316 | | // Scanners waiting to be submitted to the scheduler thread pool. Stored as a stack |
317 | | // (LIFO) so that recently-used scanners are re-scheduled first, which is more likely |
318 | | // to be cache-friendly. Protected by _transfer_lock. Populated in the constructor |
319 | | // and by schedule_scan_task() when the concurrency limit is reached; drained by |
320 | | // _pull_next_scan_task() during scheduling. |
321 | | std::stack<std::shared_ptr<ScanTask>> _pending_tasks; |
322 | | |
323 | | // Number of scan tasks currently submitted to the scanner scheduler thread pool |
324 | | // (i.e. in-flight). Incremented by submit_scan_task() before submission and |
325 | | // decremented by push_back_scan_task() when the thread pool returns the task. |
326 | | // Declared atomic so it can be read without _transfer_lock in non-critical paths, |
327 | | // but must be read under _transfer_lock whenever combined with _completed_tasks.size() |
328 | | // to form a consistent concurrency snapshot. |
329 | | std::atomic_int _in_flight_tasks_num = 0; |
330 | | // Scanner that is eos or error. |
331 | | int32_t _num_finished_scanners = 0; |
332 | | // weak pointer for _scanners, used in stop function |
333 | | std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners; |
334 | | std::shared_ptr<RuntimeProfile> _scanner_profile; |
335 | | // This counter refers to scan operator's local state |
336 | | RuntimeProfile::Counter* _scanner_memory_used_counter = nullptr; |
337 | | RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; |
338 | | RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; |
339 | | std::shared_ptr<ResourceContext> _resource_ctx; |
340 | | std::shared_ptr<Dependency> _dependency = nullptr; |
341 | | std::shared_ptr<doris::TaskHandle> _task_handle; |
342 | | |
343 | | std::atomic<int64_t> _block_memory_usage = 0; |
344 | | |
345 | | // adaptive scan concurrency related |
346 | | |
347 | | ScannerScheduler* _scanner_scheduler = nullptr; |
348 | | MOCK_REMOVE(const) int32_t _min_scan_concurrency_of_scan_scheduler = 0; |
349 | | // The overall target of our system is to make full utilization of the resources. |
350 | | // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary. |
351 | | // Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource. |
352 | | // So that for a single query, we can make sure it could make full utilization of the resource. |
353 | | int32_t _max_scan_concurrency = 0; |
354 | | MOCK_REMOVE(const) int32_t _min_scan_concurrency = 1; |
355 | | |
356 | | std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
357 | | int32_t current_concurrency); |
358 | | |
359 | | int32_t _get_margin(std::unique_lock<std::mutex>& transfer_lock, |
360 | | std::unique_lock<std::shared_mutex>& scheduler_lock); |
361 | | |
362 | | // Memory-aware adaptive scheduling |
363 | | std::shared_ptr<MemLimiter> _scanner_mem_limiter = nullptr; |
364 | | std::shared_ptr<MemShareArbitrator> _mem_share_arb = nullptr; |
365 | | std::shared_ptr<ScannerAdaptiveProcessor> _adaptive_processor = nullptr; |
366 | | const int _ins_idx; |
367 | | const bool _enable_adaptive_scanners = false; |
368 | | |
369 | | // Adjust scan memory limit based on arbitrator feedback |
370 | | void _adjust_scan_mem_limit(int64_t old_scanner_mem_bytes, int64_t new_scanner_mem_bytes); |
371 | | |
372 | | // Calculate available scanner count for adaptive scheduling |
373 | | int _available_pickup_scanner_count(); |
374 | | |
375 | | // TODO: Add implementation of runtime_info_feed_back |
376 | | // adaptive scan concurrency related end |
377 | | }; |
378 | | } // namespace doris |