be/src/exec/scan/scanner_context.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/scanner_context.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <glog/logging.h> |
23 | | #include <zconf.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <ctime> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | #include <ostream> |
30 | | #include <shared_mutex> |
31 | | #include <tuple> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/config.h" |
35 | | #include "common/exception.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/metrics/doris_metrics.h" |
38 | | #include "common/status.h" |
39 | | #include "core/block/block.h" |
40 | | #include "exec/operator/scan_operator.h" |
41 | | #include "exec/scan/scan_node.h" |
42 | | #include "exec/scan/scanner_scheduler.h" |
43 | | #include "runtime/descriptors.h" |
44 | | #include "runtime/exec_env.h" |
45 | | #include "runtime/runtime_profile.h" |
46 | | #include "runtime/runtime_state.h" |
47 | | #include "storage/tablet/tablet.h" |
48 | | #include "util/time.h" |
49 | | #include "util/uid_util.h" |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | using namespace std::chrono_literals; |
54 | | #include "common/compile_check_begin.h" |
55 | | // ==================== MemShareArbitrator ==================== |
56 | | static constexpr int64_t DEFAULT_SCANNER_MEM_BYTES = 64 * 1024 * 1024; // 64MB default |
57 | | |
58 | | MemShareArbitrator::MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, |
59 | | double max_scan_ratio) |
60 | 122k | : query_id(qid), |
61 | 122k | query_mem_limit(query_mem_limit), |
62 | 122k | mem_limit(std::max<int64_t>( |
63 | 122k | 1, static_cast<int64_t>(static_cast<double>(query_mem_limit) * max_scan_ratio))) { |
64 | 122k | } |
65 | | |
66 | 2 | void MemShareArbitrator::register_scan_node() { |
67 | 2 | total_mem_bytes.fetch_add(DEFAULT_SCANNER_MEM_BYTES); |
68 | 2 | } |
69 | | |
70 | 9 | int64_t MemShareArbitrator::update_mem_bytes(int64_t old_value, int64_t new_value) { |
71 | 9 | int64_t diff = new_value - old_value; |
72 | 9 | int64_t total = total_mem_bytes.fetch_add(diff) + diff; |
73 | 9 | if (new_value == 0) return 0; |
74 | 4 | if (total <= 0) return mem_limit; |
75 | | // Proportional sharing: allocate based on this context's share of total usage |
76 | 4 | double ratio = static_cast<double>(new_value) / static_cast<double>(std::max(total, new_value)); |
77 | 4 | return static_cast<int64_t>(static_cast<double>(mem_limit) * ratio); |
78 | 4 | } |
79 | | |
80 | | // ==================== MemLimiter ==================== |
81 | 5 | int MemLimiter::available_scanner_count(int ins_idx) const { |
82 | 5 | int64_t mem_limit_value = mem_limit.load(); |
83 | 5 | int64_t running_tasks_count_value = running_tasks_count.load(); |
84 | 5 | int64_t estimated_block_mem_bytes_value = get_estimated_block_mem_bytes(); |
85 | | |
86 | 5 | int64_t max_count = std::max(1L, mem_limit_value / estimated_block_mem_bytes_value); |
87 | 5 | int64_t avail_count = max_count; |
88 | 5 | int64_t per_count = avail_count / parallelism; |
89 | 5 | if (serial_operator) { |
90 | 1 | per_count += (avail_count - per_count * parallelism); |
91 | 4 | } else if (ins_idx < avail_count - per_count * parallelism) { |
92 | 2 | per_count += 1; |
93 | 2 | } |
94 | | |
95 | 5 | VLOG_DEBUG << "available_scanner_count. max_count=" << max_count << "(" |
96 | 0 | << running_tasks_count_value << "/" << estimated_block_mem_bytes_value |
97 | 0 | << "), operator_mem_limit = " << operator_mem_limit |
98 | 0 | << ", running_tasks_count = " << running_tasks_count_value |
99 | 0 | << ", parallelism = " << parallelism << ", avail_count = " << avail_count |
100 | 0 | << ", ins_id = " << ins_idx << ", per_count = " << per_count |
101 | 0 | << " debug_string: " << debug_string(); |
102 | | |
103 | 5 | return cast_set<int>(per_count); |
104 | 5 | } |
105 | | |
106 | 9 | void MemLimiter::reestimated_block_mem_bytes(int64_t value) { |
107 | 9 | if (value == 0) return; |
108 | 8 | value = std::min(value, operator_mem_limit); |
109 | | |
110 | 8 | std::lock_guard<std::mutex> L(lock); |
111 | 8 | auto old_value = estimated_block_mem_bytes.load(); |
112 | 8 | int64_t total = |
113 | 8 | get_estimated_block_mem_bytes() * estimated_block_mem_bytes_update_count + value; |
114 | 8 | estimated_block_mem_bytes_update_count += 1; |
115 | 8 | estimated_block_mem_bytes = total / estimated_block_mem_bytes_update_count; |
116 | 8 | VLOG_DEBUG << fmt::format( |
117 | 0 | "reestimated_block_mem_bytes. MemLimiter = {}, estimated_block_mem_bytes = {}, " |
118 | 0 | "old_value = {}, value: {}", |
119 | 0 | debug_string(), estimated_block_mem_bytes, old_value, value); |
120 | 8 | } |
121 | | |
122 | | // ==================== ScannerContext ==================== |
123 | | ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, |
124 | | const TupleDescriptor* output_tuple_desc, |
125 | | const RowDescriptor* output_row_descriptor, |
126 | | const std::list<std::shared_ptr<ScannerDelegate>>& scanners, |
127 | | int64_t limit_, std::shared_ptr<Dependency> dependency, |
128 | | std::shared_ptr<MemShareArbitrator> arb, |
129 | | std::shared_ptr<MemLimiter> limiter, int ins_idx, |
130 | | bool enable_adaptive_scan |
131 | | #ifdef BE_TEST |
132 | | , |
133 | | int num_parallel_instances |
134 | | #endif |
135 | | ) |
136 | 18 | : HasTaskExecutionCtx(state), |
137 | 18 | _state(state), |
138 | 18 | _local_state(local_state), |
139 | 18 | _output_tuple_desc(output_row_descriptor |
140 | 18 | ? output_row_descriptor->tuple_descriptors().front() |
141 | 18 | : output_tuple_desc), |
142 | 18 | _output_row_descriptor(output_row_descriptor), |
143 | 18 | _batch_size(state->batch_size()), |
144 | 18 | limit(limit_), |
145 | 18 | _all_scanners(scanners.begin(), scanners.end()), |
146 | | #ifndef BE_TEST |
147 | | _scanner_scheduler(local_state->scan_scheduler(state)), |
148 | | _min_scan_concurrency_of_scan_scheduler( |
149 | | _scanner_scheduler->get_min_active_scan_threads()), |
150 | | _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state), |
151 | | cast_set<int>(scanners.size()))), |
152 | | #else |
153 | 18 | _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()), |
154 | 18 | _min_scan_concurrency_of_scan_scheduler(0), |
155 | 18 | _max_scan_concurrency(num_parallel_instances), |
156 | | #endif |
157 | 18 | _min_scan_concurrency(local_state->min_scanners_concurrency(state)), |
158 | 18 | _scanner_mem_limiter(limiter), |
159 | 18 | _mem_share_arb(arb), |
160 | 18 | _ins_idx(ins_idx), |
161 | 18 | _enable_adaptive_scanners(enable_adaptive_scan) { |
162 | 18 | DCHECK(_state != nullptr); |
163 | 18 | DCHECK(_output_row_descriptor == nullptr || |
164 | 18 | _output_row_descriptor->tuple_descriptors().size() == 1); |
165 | 18 | _query_id = _state->get_query_ctx()->query_id(); |
166 | 18 | _resource_ctx = _state->get_query_ctx()->resource_ctx(); |
167 | 18 | ctx_id = UniqueId::gen_uid().to_string(); |
168 | 171 | for (auto& scanner : _all_scanners) { |
169 | 171 | _pending_tasks.push(std::make_shared<ScanTask>(scanner)); |
170 | 171 | } |
171 | 18 | if (limit < 0) { |
172 | 0 | limit = -1; |
173 | 0 | } |
174 | 18 | _dependency = dependency; |
175 | 18 | DorisMetrics::instance()->scanner_ctx_cnt->increment(1); |
176 | 18 | if (auto ctx = task_exec_ctx(); ctx) { |
177 | 0 | ctx->ref_task_execution_ctx(); |
178 | 0 | } |
179 | | |
180 | | // Initialize adaptive processor |
181 | 18 | _adaptive_processor = ScannerAdaptiveProcessor::create_shared(); |
182 | 18 | } |
183 | | |
184 | 5 | void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t new_value) { |
185 | 5 | if (!_enable_adaptive_scanners) { |
186 | 0 | return; |
187 | 0 | } |
188 | | |
189 | 5 | int64_t new_scan_mem_limit = _mem_share_arb->update_mem_bytes(old_value, new_value); |
190 | 5 | _scanner_mem_limiter->update_mem_limit(new_scan_mem_limit); |
191 | 5 | _scanner_mem_limiter->update_arb_mem_bytes(new_value); |
192 | | |
193 | 5 | VLOG_DEBUG << fmt::format( |
194 | 0 | "adjust_scan_mem_limit. context = {}, new mem scan limit = {}, scanner mem bytes = {} " |
195 | 0 | "-> {}", |
196 | 0 | debug_string(), new_scan_mem_limit, old_value, new_value); |
197 | 5 | } |
198 | | |
199 | 12 | int ScannerContext::_available_pickup_scanner_count() { |
200 | 12 | if (!_enable_adaptive_scanners) { |
201 | 12 | return _max_scan_concurrency; |
202 | 12 | } |
203 | | |
204 | 0 | int min_scanners = std::max(1, _min_scan_concurrency); |
205 | 0 | int max_scanners = _scanner_mem_limiter->available_scanner_count(_ins_idx); |
206 | 0 | max_scanners = std::min(max_scanners, _max_scan_concurrency); |
207 | 0 | min_scanners = std::min(min_scanners, max_scanners); |
208 | 0 | if (_ins_idx == 0) { |
209 | | // Adjust memory limit via memory share arbitrator |
210 | 0 | _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), |
211 | 0 | _scanner_mem_limiter->get_estimated_block_mem_bytes()); |
212 | 0 | } |
213 | |
|
214 | 0 | ScannerAdaptiveProcessor& P = *_adaptive_processor; |
215 | 0 | int& scanners = P.expected_scanners; |
216 | 0 | int64_t now = UnixMillis(); |
217 | | // Avoid frequent adjustment - only adjust every 100ms |
218 | 0 | if (now - P.adjust_scanners_last_timestamp <= config::doris_scanner_dynamic_interval_ms) { |
219 | 0 | return scanners; |
220 | 0 | } |
221 | 0 | P.adjust_scanners_last_timestamp = now; |
222 | 0 | auto old_scanners = P.expected_scanners; |
223 | |
|
224 | 0 | scanners = std::max(min_scanners, scanners); |
225 | 0 | scanners = std::min(max_scanners, scanners); |
226 | 0 | VLOG_DEBUG << fmt::format( |
227 | 0 | "_available_pickup_scanner_count. context = {}, old_scanners = {}, scanners = {} " |
228 | 0 | ", min_scanners: {}, max_scanners: {}", |
229 | 0 | debug_string(), old_scanners, scanners, min_scanners, max_scanners); |
230 | | |
231 | | // TODO(gabriel): Scanners are scheduled adaptively based on the memory usage now. |
232 | | // if (_in_flight_tasks_num == 0) { |
233 | | // int64_t halt_time = 0; |
234 | | // if (P.last_scanner_finish_timestamp != 0) { |
235 | | // halt_time = now - P.last_scanner_finish_timestamp; |
236 | | // } |
237 | | // P.last_scanner_finish_timestamp = now; |
238 | | // P.scanner_total_halt_time += halt_time; |
239 | | // } |
240 | | // // Calculate performance metrics for adjustment |
241 | | // P.scanner_gen_blocks_time = (now - P.context_start_time - P.scanner_total_halt_time); |
242 | | // |
243 | | // // Blocks per 10ms |
244 | | // // FIXME: |
245 | | // double source_speed = static_cast<double>(_tasks_queue.size()) * 1e1 / |
246 | | // static_cast<double>(P.scanner_gen_blocks_time + 1); |
247 | | // // Scanner scan speed: bytes/ms |
248 | | // int64_t scanner_total_scan_bytes = P.scanner_total_scan_bytes.load(); |
249 | | // double scanner_scan_speed = static_cast<double>(scanner_total_scan_bytes) / |
250 | | // static_cast<double>(P.scanner_gen_blocks_time + 1); |
251 | | // // IO latency metrics |
252 | | // double scanner_total_io_time = static_cast<double>(P.scanner_total_io_time.load()) * 1e-6; |
253 | | // double scanner_total_scan_bytes_mb = |
254 | | // static_cast<double>(scanner_total_scan_bytes) / (1024.0 * 1024.0); |
255 | | // double io_latency = scanner_total_io_time / (scanner_total_scan_bytes_mb + 1e-3); |
256 | | // |
257 | | // // Adjustment routines |
258 | | // auto try_add_scanners = [&]() { |
259 | | // if (!P.try_add_scanners) return true; |
260 | | // if (P.last_scanner_total_scan_bytes == scanner_total_scan_bytes) return true; |
261 | | // return (scanner_scan_speed > (P.last_scanner_scan_speed * P.expected_speedup_ratio)); |
262 | | // }; |
263 | | // |
264 | | // auto do_add_scanners = [&]() { |
265 | | // P.try_add_scanners = true; |
266 | | // const int smooth = 2; // Smoothing factor |
267 | | // P.expected_speedup_ratio = |
268 | | // static_cast<double>(scanners + 1 + smooth) / static_cast<double>(scanners + smooth); |
269 | | // scanners += 1; |
270 | | // }; |
271 | | // |
272 | | // auto do_sub_scanners = [&]() { |
273 | | // scanners -= 1; |
274 | | // P.try_add_scanners = false; |
275 | | // P.try_add_scanners_fail_count += 1; |
276 | | // if (P.try_add_scanners_fail_count >= 4) { |
277 | | // P.try_add_scanners_fail_count = 0; |
278 | | // scanners -= 1; |
279 | | // } |
280 | | // }; |
281 | | // |
282 | | // auto check_slow_io = [&]() { |
283 | | // if (((P.check_slow_io++) % 8) != 0) return; |
284 | | // if (io_latency >= 2 * P.slow_io_latency_ms) { |
285 | | // scanners = std::max(scanners, _max_scan_concurrency / 2); |
286 | | // } else if (io_latency >= P.slow_io_latency_ms) { |
287 | | // scanners = std::max(scanners, _max_scan_concurrency / 4); |
288 | | // } |
289 | | // }; |
290 | | // |
291 | | // // Perform adjustment based on feedback |
292 | | // auto do_adjustment = [&]() { |
293 | | // check_slow_io(); |
294 | | // |
295 | | // // If source is too slow compared to capacity, add scanners |
296 | | // if (source_speed < 0.5) { // Very slow block production |
297 | | // do_add_scanners(); |
298 | | // } else if (try_add_scanners()) { |
299 | | // do_add_scanners(); |
300 | | // } else { |
301 | | // do_sub_scanners(); |
302 | | // } |
303 | | // }; |
304 | | // P.last_scanner_scan_speed = scanner_scan_speed; |
305 | | // P.last_scanner_total_scan_bytes = scanner_total_scan_bytes; |
306 | | // |
307 | | // do_adjustment(); |
308 | | // scanners = std::min(scanners, max_scanners); |
309 | | // scanners = std::max(scanners, min_scanners); |
310 | | // |
311 | | // VLOG_DEBUG << fmt::format( |
312 | | // "available_pickup_scanner_count. ctx_id = {}, scan = {}, source_speed = {}, " |
313 | | // "io_latency = {} ms/MB, proposal = {}, current = {}", |
314 | | // ctx_id, scanner_scan_speed, source_speed, io_latency, scanners, |
315 | | // _in_flight_tasks_num); |
316 | |
|
317 | 0 | return scanners; |
318 | 0 | } |
319 | | |
320 | | // After init function call, should not access _parent |
321 | 6 | Status ScannerContext::init() { |
322 | | #ifndef BE_TEST |
323 | | _scanner_profile = _local_state->_scanner_profile; |
324 | | _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; |
325 | | _scanner_memory_used_counter = _local_state->_memory_used_counter; |
326 | | |
327 | | // 3. get thread token |
328 | | if (!_state->get_query_ctx()) { |
329 | | return Status::InternalError("Query context of {} is not set", |
330 | | print_id(_state->query_id())); |
331 | | } |
332 | | |
333 | | if (_state->get_query_ctx()->get_scan_scheduler()) { |
334 | | _should_reset_thread_name = false; |
335 | | } |
336 | | |
337 | | auto scanner = _all_scanners.front().lock(); |
338 | | DCHECK(scanner != nullptr); |
339 | | |
340 | | if (auto* task_executor_scheduler = |
341 | | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
342 | | std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor(); |
343 | | TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id)); |
344 | | _task_handle = DORIS_TRY(task_executor->create_task( |
345 | | task_id, []() { return 0.0; }, |
346 | | config::task_executor_initial_max_concurrency_per_task > 0 |
347 | | ? config::task_executor_initial_max_concurrency_per_task |
348 | | : std::max(48, CpuInfo::num_cores() * 2), |
349 | | std::chrono::milliseconds(100), std::nullopt)); |
350 | | } |
351 | | #endif |
352 | | // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator. |
353 | | // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value |
354 | | // is larger than 10MB. |
355 | 6 | _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); |
356 | | |
357 | | // Provide more memory for wide tables, increase proportionally by multiples of 300 |
358 | 6 | _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; |
359 | | |
360 | 6 | if (_all_scanners.empty()) { |
361 | 0 | _is_finished = true; |
362 | 0 | _set_scanner_done(); |
363 | 0 | } |
364 | | |
365 | | // Initialize memory limiter if memory-aware scheduling is enabled |
366 | 6 | if (_enable_adaptive_scanners) { |
367 | 0 | DCHECK(_scanner_mem_limiter && _mem_share_arb); |
368 | 0 | int64_t c = _scanner_mem_limiter->update_open_tasks_count(1); |
369 | | // TODO(gabriel): set estimated block size |
370 | 0 | _scanner_mem_limiter->reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES); |
371 | 0 | _scanner_mem_limiter->update_arb_mem_bytes(DEFAULT_SCANNER_MEM_BYTES); |
372 | 0 | if (c == 0) { |
373 | | // First scanner context to open, adjust scan memory limit |
374 | 0 | _adjust_scan_mem_limit(DEFAULT_SCANNER_MEM_BYTES, |
375 | 0 | _scanner_mem_limiter->get_arb_scanner_mem_bytes()); |
376 | 0 | } |
377 | 0 | } |
378 | | |
379 | | // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. |
380 | | // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. |
381 | | // you can refer https://github.com/apache/doris/issues/35340 for details. |
382 | 6 | const int32_t max_column_reader_num = _state->max_column_reader_num(); |
383 | | |
384 | 6 | if (_max_scan_concurrency != 1 && max_column_reader_num > 0) { |
385 | 0 | int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size()); |
386 | 0 | int32_t current_column_num = scan_column_num * _max_scan_concurrency; |
387 | 0 | if (current_column_num > max_column_reader_num) { |
388 | 0 | int32_t new_max_thread_num = max_column_reader_num / scan_column_num; |
389 | 0 | new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; |
390 | 0 | if (new_max_thread_num < _max_scan_concurrency) { |
391 | 0 | int32_t origin_max_thread_num = _max_scan_concurrency; |
392 | 0 | _max_scan_concurrency = new_max_thread_num; |
393 | 0 | LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) |
394 | 0 | << " scan's max_thread_num from " << origin_max_thread_num << " to " |
395 | 0 | << _max_scan_concurrency << ",column num: " << scan_column_num |
396 | 0 | << ", max_column_reader_num: " << max_column_reader_num; |
397 | 0 | } |
398 | 0 | } |
399 | 0 | } |
400 | | |
401 | 6 | COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency); |
402 | 6 | COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency); |
403 | | |
404 | 6 | std::unique_lock<std::mutex> l(_transfer_lock); |
405 | 6 | RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
406 | | |
407 | 6 | return Status::OK(); |
408 | 6 | } |
409 | | |
410 | 18 | ScannerContext::~ScannerContext() { |
411 | 18 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); |
412 | 18 | _completed_tasks.clear(); |
413 | 18 | BlockUPtr block; |
414 | 19 | while (_free_blocks.try_dequeue(block)) { |
415 | | // do nothing |
416 | 1 | } |
417 | 18 | block.reset(); |
418 | 18 | DorisMetrics::instance()->scanner_ctx_cnt->increment(-1); |
419 | | |
420 | | // Cleanup memory limiter if last context closing |
421 | 18 | if (_enable_adaptive_scanners) { |
422 | 4 | if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) { |
423 | | // Last scanner context to close, reset scan memory limit |
424 | 4 | _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), 0); |
425 | 4 | } |
426 | 4 | } |
427 | | |
428 | 18 | if (_task_handle) { |
429 | 0 | if (auto* task_executor_scheduler = |
430 | 0 | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
431 | 0 | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
432 | 0 | } |
433 | 0 | _task_handle = nullptr; |
434 | 0 | } |
435 | 18 | if (auto ctx = task_exec_ctx(); ctx) { |
436 | 0 | ctx->unref_task_execution_ctx(); |
437 | 0 | } |
438 | 18 | } |
439 | | |
440 | 3 | BlockUPtr ScannerContext::get_free_block(bool force) { |
441 | 3 | BlockUPtr block = nullptr; |
442 | 3 | if (_free_blocks.try_dequeue(block)) { |
443 | 1 | DCHECK(block->mem_reuse()); |
444 | 1 | _block_memory_usage -= block->allocated_bytes(); |
445 | 1 | _scanner_memory_used_counter->set(_block_memory_usage); |
446 | | // A free block is reused, so the memory usage should be decreased |
447 | | // The caller of get_free_block will increase the memory usage |
448 | 2 | } else if (_block_memory_usage < _max_bytes_in_queue || force) { |
449 | 2 | _newly_create_free_blocks_num->update(1); |
450 | 2 | block = Block::create_unique(_output_tuple_desc->slots(), 0); |
451 | 2 | } |
452 | 3 | return block; |
453 | 3 | } |
454 | | |
455 | 1 | void ScannerContext::return_free_block(BlockUPtr block) { |
456 | | // If under low memory mode, should not return the freeblock, it will occupy too much memory. |
457 | 1 | if (!_local_state->low_memory_mode() && block->mem_reuse() && |
458 | 1 | _block_memory_usage < _max_bytes_in_queue) { |
459 | 1 | size_t block_size_to_reuse = block->allocated_bytes(); |
460 | 1 | _block_memory_usage += block_size_to_reuse; |
461 | 1 | _scanner_memory_used_counter->set(_block_memory_usage); |
462 | 1 | block->clear_column_data(); |
463 | | // Free blocks is used to improve memory efficiency. Failure during pushing back |
464 | | // free block will not incur any bad result so just ignore the return value. |
465 | 1 | _free_blocks.enqueue(std::move(block)); |
466 | 1 | } |
467 | 1 | } |
468 | | |
469 | | Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task, |
470 | 18 | std::unique_lock<std::mutex>& /*transfer_lock*/) { |
471 | | // increase _num_finished_scanners no matter the scan_task is submitted successfully or not. |
472 | | // since if submit failed, it will be added back by ScannerContext::push_back_scan_task |
473 | | // and _num_finished_scanners will be reduced. |
474 | | // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task |
475 | | // see ScannerScheduler::_scanner_scan. |
476 | 18 | _in_flight_tasks_num++; |
477 | 18 | return _scanner_scheduler->submit(shared_from_this(), scan_task); |
478 | 18 | } |
479 | | |
480 | 0 | void ScannerContext::clear_free_blocks() { |
481 | 0 | clear_blocks(_free_blocks); |
482 | 0 | } |
483 | | |
484 | 5 | void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) { |
485 | 5 | if (scan_task->status_ok()) { |
486 | 5 | if (scan_task->cached_block && scan_task->cached_block->rows() > 0) { |
487 | 0 | Status st = validate_block_schema(scan_task->cached_block.get()); |
488 | 0 | if (!st.ok()) { |
489 | 0 | scan_task->set_status(st); |
490 | 0 | } |
491 | 0 | } |
492 | 5 | } |
493 | | |
494 | 5 | std::lock_guard<std::mutex> l(_transfer_lock); |
495 | 5 | if (!scan_task->status_ok()) { |
496 | 0 | _process_status = scan_task->get_status(); |
497 | 0 | } |
498 | 5 | _completed_tasks.push_back(scan_task); |
499 | 5 | _in_flight_tasks_num--; |
500 | | |
501 | 5 | _dependency->set_ready(); |
502 | 5 | } |
503 | | |
504 | 3 | Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) { |
505 | 3 | if (state->is_cancelled()) { |
506 | 1 | _set_scanner_done(); |
507 | 1 | return state->cancel_reason(); |
508 | 1 | } |
509 | 2 | std::unique_lock l(_transfer_lock); |
510 | | |
511 | 2 | if (!_process_status.ok()) { |
512 | 1 | _set_scanner_done(); |
513 | 1 | return _process_status; |
514 | 1 | } |
515 | | |
516 | 1 | std::shared_ptr<ScanTask> scan_task = nullptr; |
517 | | |
518 | 1 | if (!_completed_tasks.empty() && !done()) { |
519 | | // https://en.cppreference.com/w/cpp/container/list/front |
520 | | // The behavior is undefined if the list is empty. |
521 | 1 | scan_task = _completed_tasks.front(); |
522 | 1 | _completed_tasks.pop_front(); |
523 | 1 | } |
524 | | |
525 | 1 | if (scan_task != nullptr) { |
526 | | // The abnormal status of scanner may come from the execution of the scanner itself, |
527 | | // or come from the scanner scheduler, such as TooManyTasks. |
528 | 1 | if (!scan_task->status_ok()) { |
529 | | // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while. |
530 | 0 | _process_status = scan_task->get_status(); |
531 | 0 | _set_scanner_done(); |
532 | 0 | return _process_status; |
533 | 0 | } |
534 | | |
535 | 1 | if (scan_task->cached_block) { |
536 | | // No need to worry about small block, block is merged together when they are appended to cached_blocks. |
537 | 0 | auto current_block = std::move(scan_task->cached_block); |
538 | 0 | auto block_size = current_block->allocated_bytes(); |
539 | 0 | scan_task->cached_block.reset(); |
540 | 0 | _block_memory_usage -= block_size; |
541 | | // consume current block |
542 | 0 | block->swap(*current_block); |
543 | 0 | return_free_block(std::move(current_block)); |
544 | 0 | } |
545 | 1 | VLOG_DEBUG << fmt::format( |
546 | 0 | "ScannerContext {} get block from queue, current scan " |
547 | 0 | "task remaing cached_block size {}, eos {}, scheduled tasks {}", |
548 | 0 | ctx_id, _completed_tasks.size(), scan_task->is_eos(), _in_flight_tasks_num); |
549 | 1 | if (scan_task->is_eos()) { |
550 | | // 1. if eos, record a finished scanner. |
551 | 1 | _num_finished_scanners++; |
552 | 1 | RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
553 | 1 | } else { |
554 | 0 | scan_task->set_state(ScanTask::State::IN_FLIGHT); |
555 | 0 | RETURN_IF_ERROR( |
556 | 0 | _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l)); |
557 | 0 | } |
558 | 1 | } |
559 | | |
560 | 1 | if (_num_finished_scanners == _all_scanners.size() && _completed_tasks.empty()) { |
561 | 0 | _set_scanner_done(); |
562 | 0 | _is_finished = true; |
563 | 0 | } |
564 | | |
565 | 1 | *eos = done(); |
566 | | |
567 | 1 | if (_completed_tasks.empty()) { |
568 | 1 | _dependency->block(); |
569 | 1 | } |
570 | | |
571 | 1 | return Status::OK(); |
572 | 1 | } |
573 | | |
574 | 0 | Status ScannerContext::validate_block_schema(Block* block) { |
575 | 0 | size_t index = 0; |
576 | 0 | for (auto& slot : _output_tuple_desc->slots()) { |
577 | 0 | auto& data = block->get_by_position(index++); |
578 | 0 | if (data.column->is_nullable() != data.type->is_nullable()) { |
579 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
580 | 0 | "column(name: {}) nullable({}) does not match type nullable({}), slot(id: " |
581 | 0 | "{}, " |
582 | 0 | "name:{})", |
583 | 0 | data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(), |
584 | 0 | slot->col_name()); |
585 | 0 | } |
586 | | |
587 | 0 | if (data.column->is_nullable() != slot->is_nullable()) { |
588 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
589 | 0 | "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) " |
590 | 0 | "nullable({})", |
591 | 0 | data.name, data.column->is_nullable(), slot->id(), slot->col_name(), |
592 | 0 | slot->is_nullable()); |
593 | 0 | } |
594 | 0 | } |
595 | 0 | return Status::OK(); |
596 | 0 | } |
597 | | |
598 | 0 | void ScannerContext::stop_scanners(RuntimeState* state) { |
599 | 0 | std::lock_guard<std::mutex> l(_transfer_lock); |
600 | 0 | if (_should_stop) { |
601 | 0 | return; |
602 | 0 | } |
603 | 0 | _should_stop = true; |
604 | 0 | _set_scanner_done(); |
605 | 0 | for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) { |
606 | 0 | if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) { |
607 | 0 | sc->_scanner->try_stop(); |
608 | 0 | } |
609 | 0 | } |
610 | 0 | _completed_tasks.clear(); |
611 | 0 | if (_task_handle) { |
612 | 0 | if (auto* task_executor_scheduler = |
613 | 0 | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
614 | 0 | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
615 | 0 | } |
616 | 0 | _task_handle = nullptr; |
617 | 0 | } |
618 | | // TODO yiguolei, call mark close to scanners |
619 | 0 | if (state->enable_profile()) { |
620 | 0 | std::stringstream scanner_statistics; |
621 | 0 | std::stringstream scanner_rows_read; |
622 | 0 | std::stringstream scanner_wait_worker_time; |
623 | 0 | std::stringstream scanner_projection; |
624 | 0 | scanner_statistics << "["; |
625 | 0 | scanner_rows_read << "["; |
626 | 0 | scanner_wait_worker_time << "["; |
627 | 0 | scanner_projection << "["; |
628 | | // Scanners can in 3 state |
629 | | // state 1: in scanner context, not scheduled |
630 | | // state 2: in scanner worker pool's queue, scheduled but not running |
631 | | // state 3: scanner is running. |
632 | 0 | for (auto& scanner_ref : _all_scanners) { |
633 | 0 | auto scanner = scanner_ref.lock(); |
634 | 0 | if (scanner == nullptr) { |
635 | 0 | continue; |
636 | 0 | } |
637 | | // Add per scanner running time before close them |
638 | 0 | scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), |
639 | 0 | TUnit::TIME_NS) |
640 | 0 | << ", "; |
641 | 0 | scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(), |
642 | 0 | TUnit::TIME_NS) |
643 | 0 | << ", "; |
644 | 0 | scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), |
645 | 0 | TUnit::UNIT) |
646 | 0 | << ", "; |
647 | 0 | scanner_wait_worker_time |
648 | 0 | << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), |
649 | 0 | TUnit::TIME_NS) |
650 | 0 | << ", "; |
651 | | // since there are all scanners, some scanners is running, so that could not call scanner |
652 | | // close here. |
653 | 0 | } |
654 | 0 | scanner_statistics << "]"; |
655 | 0 | scanner_rows_read << "]"; |
656 | 0 | scanner_wait_worker_time << "]"; |
657 | 0 | scanner_projection << "]"; |
658 | 0 | _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); |
659 | 0 | _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); |
660 | 0 | _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); |
661 | 0 | _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str()); |
662 | 0 | } |
663 | 0 | } |
664 | | |
665 | 18 | std::string ScannerContext::debug_string() { |
666 | 18 | return fmt::format( |
667 | 18 | "_query_id: {}, id: {}, total scanners: {}, pending tasks: {}, completed tasks: {}," |
668 | 18 | " _should_stop: {}, _is_finished: {}, free blocks: {}," |
669 | 18 | " limit: {}, _in_flight_tasks_num: {}, _num_finished_scanners: {}, _max_thread_num: {}," |
670 | 18 | " _max_bytes_in_queue: {}, query_id: {}, _ins_idx: {}, _enable_adaptive_scanners: {}, " |
671 | 18 | "_mem_share_arb: {}, _scanner_mem_limiter: {}", |
672 | 18 | print_id(_query_id), ctx_id, _all_scanners.size(), _pending_tasks.size(), |
673 | 18 | _completed_tasks.size(), _should_stop, _is_finished, _free_blocks.size_approx(), limit, |
674 | 18 | _in_flight_tasks_num, _num_finished_scanners, _max_scan_concurrency, |
675 | 18 | _max_bytes_in_queue, print_id(_query_id), _ins_idx, _enable_adaptive_scanners, |
676 | 18 | _enable_adaptive_scanners ? _mem_share_arb->debug_string() : "NULL", |
677 | 18 | _enable_adaptive_scanners ? _scanner_mem_limiter->debug_string() : "NULL"); |
678 | 18 | } |
679 | | |
680 | 2 | void ScannerContext::_set_scanner_done() { |
681 | 2 | _dependency->set_always_ready(); |
682 | 2 | } |
683 | | |
684 | 1 | void ScannerContext::update_peak_running_scanner(int num) { |
685 | | #ifndef BE_TEST |
686 | | _local_state->_peak_running_scanner->add(num); |
687 | | #endif |
688 | 1 | if (_enable_adaptive_scanners) { |
689 | 1 | _scanner_mem_limiter->update_running_tasks_count(num); |
690 | 1 | } |
691 | 1 | } |
692 | | |
693 | 1 | void ScannerContext::reestimated_block_mem_bytes(int64_t num) { |
694 | 1 | if (_enable_adaptive_scanners) { |
695 | 1 | _scanner_mem_limiter->reestimated_block_mem_bytes(num); |
696 | 1 | } |
697 | 1 | } |
698 | | |
699 | | int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock, |
700 | 12 | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
701 | | // Get effective max concurrency considering adaptive scheduling |
702 | 12 | int32_t effective_max_concurrency = _available_pickup_scanner_count(); |
703 | 12 | DCHECK_LE(effective_max_concurrency, _max_scan_concurrency); |
704 | | |
705 | | // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks. |
706 | 12 | int32_t margin_1 = _min_scan_concurrency - |
707 | 12 | (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num); |
708 | | |
709 | | // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. |
710 | 12 | int32_t margin_2 = |
711 | 12 | _min_scan_concurrency_of_scan_scheduler - |
712 | 12 | (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); |
713 | | |
714 | | // margin_3 is used to respect adaptive max concurrency limit |
715 | 12 | int32_t margin_3 = |
716 | 12 | std::max(effective_max_concurrency - |
717 | 12 | (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num), |
718 | 12 | 1); |
719 | | |
720 | 12 | if (margin_1 <= 0 && margin_2 <= 0) { |
721 | 1 | return 0; |
722 | 1 | } |
723 | | |
724 | 11 | int32_t margin = std::max(margin_1, margin_2); |
725 | 11 | if (_enable_adaptive_scanners) { |
726 | 0 | margin = std::min(margin, margin_3); // Cap by adaptive limit |
727 | 0 | } |
728 | | |
729 | 11 | if (low_memory_mode()) { |
730 | | // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`. |
731 | | // So that we will not submit too many scan tasks to scheduler. |
732 | 0 | margin = std::min(low_memory_mode_scanners() - _in_flight_tasks_num, margin); |
733 | 0 | } |
734 | | |
735 | 11 | VLOG_DEBUG << fmt::format( |
736 | 0 | "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " |
737 | 0 | "({} + {}), margin_3: {} = {} - ({} + {}), margin: {}, adaptive: {}", |
738 | 0 | print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _completed_tasks.size(), |
739 | 0 | _in_flight_tasks_num, margin_2, _min_scan_concurrency_of_scan_scheduler, |
740 | 0 | _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), |
741 | 0 | margin_3, effective_max_concurrency, _completed_tasks.size(), _in_flight_tasks_num, |
742 | 0 | margin, _enable_adaptive_scanners); |
743 | | |
744 | 11 | return margin; |
745 | 12 | } |
746 | | |
747 | | // This function must be called with: |
748 | | // 1. _transfer_lock held. |
749 | | // 2. ScannerScheduler::_lock held. |
750 | | Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
751 | | std::unique_lock<std::mutex>& transfer_lock, |
752 | 7 | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
753 | 7 | if (current_scan_task && |
754 | 7 | (current_scan_task->cached_block != nullptr || current_scan_task->is_eos())) { |
755 | 1 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
756 | 1 | } |
757 | | |
758 | 6 | std::list<std::shared_ptr<ScanTask>> tasks_to_submit; |
759 | | |
760 | 6 | int32_t margin = _get_margin(transfer_lock, scheduler_lock); |
761 | | |
762 | | // margin is less than zero. Means this scan operator could not submit any scan task for now. |
763 | 6 | if (margin <= 0) { |
764 | | // Be careful with current scan task. |
765 | | // We need to add it back to task queue to make sure it could be resubmitted. |
766 | 0 | if (current_scan_task) { |
767 | | // This usually happens when we should downgrade the concurrency. |
768 | 0 | current_scan_task->set_state(ScanTask::State::PENDING); |
769 | 0 | _pending_tasks.push(current_scan_task); |
770 | 0 | VLOG_DEBUG << fmt::format( |
771 | 0 | "{} push back scanner to task queue, because diff <= 0, _completed_tasks size " |
772 | 0 | "{}, _in_flight_tasks_num {}", |
773 | 0 | ctx_id, _completed_tasks.size(), _in_flight_tasks_num); |
774 | 0 | } |
775 | |
|
776 | 0 | #ifndef NDEBUG |
777 | | // This DCHECK is necessary. |
778 | | // We need to make sure each scan operator could have at least 1 scan tasks. |
779 | | // Or this scan operator will not be re-scheduled. |
780 | 0 | if (!_pending_tasks.empty() && _in_flight_tasks_num == 0 && _completed_tasks.empty()) { |
781 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
782 | 0 | } |
783 | 0 | #endif |
784 | | |
785 | 0 | return Status::OK(); |
786 | 0 | } |
787 | | |
788 | 6 | bool first_pull = true; |
789 | | |
790 | 24 | while (margin-- > 0) { |
791 | 24 | std::shared_ptr<ScanTask> task_to_run; |
792 | 24 | const int32_t current_concurrency = cast_set<int32_t>( |
793 | 24 | _completed_tasks.size() + _in_flight_tasks_num + tasks_to_submit.size()); |
794 | 24 | VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id, |
795 | 0 | current_concurrency, _completed_tasks.size(), |
796 | 0 | _in_flight_tasks_num, tasks_to_submit.size()); |
797 | 24 | if (first_pull) { |
798 | 6 | task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency); |
799 | 6 | if (task_to_run == nullptr) { |
800 | | // In two situations we will get nullptr. |
801 | | // 1. current_concurrency already reached _max_scan_concurrency. |
802 | | // 2. all scanners are finished. |
803 | 2 | if (current_scan_task) { |
804 | 1 | DCHECK(current_scan_task->cached_block == nullptr); |
805 | 1 | DCHECK(!current_scan_task->is_eos()); |
806 | 1 | if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) { |
807 | | // This should not happen. |
808 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
809 | 0 | "Scanner scheduler logical error."); |
810 | 0 | } |
811 | | // Current scan task is not scheduled, we need to add it back to task queue to make sure it could be resubmitted. |
812 | 1 | current_scan_task->set_state(ScanTask::State::PENDING); |
813 | 1 | _pending_tasks.push(current_scan_task); |
814 | 1 | } |
815 | 2 | } |
816 | 6 | first_pull = false; |
817 | 18 | } else { |
818 | 18 | task_to_run = _pull_next_scan_task(nullptr, current_concurrency); |
819 | 18 | } |
820 | | |
821 | 24 | if (task_to_run) { |
822 | 18 | tasks_to_submit.push_back(task_to_run); |
823 | 18 | } else { |
824 | 6 | break; |
825 | 6 | } |
826 | 24 | } |
827 | | |
828 | 6 | if (tasks_to_submit.empty()) { |
829 | 2 | return Status::OK(); |
830 | 2 | } |
831 | | |
832 | 4 | VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}", |
833 | 0 | print_id(_query_id), ctx_id, tasks_to_submit.size(), |
834 | 0 | _pending_tasks.size()); |
835 | | |
836 | 18 | for (auto& scan_task_iter : tasks_to_submit) { |
837 | 18 | Status submit_status = submit_scan_task(scan_task_iter, transfer_lock); |
838 | 18 | if (!submit_status.ok()) { |
839 | 0 | _process_status = submit_status; |
840 | 0 | _set_scanner_done(); |
841 | 0 | return _process_status; |
842 | 0 | } |
843 | 18 | } |
844 | | |
845 | 4 | return Status::OK(); |
846 | 4 | } |
847 | | |
848 | | std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task( |
849 | 31 | std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) { |
850 | 31 | int32_t effective_max_concurrency = _max_scan_concurrency; |
851 | 31 | if (_enable_adaptive_scanners) { |
852 | 0 | effective_max_concurrency = _adaptive_processor->expected_scanners > 0 |
853 | 0 | ? _adaptive_processor->expected_scanners |
854 | 0 | : _max_scan_concurrency; |
855 | 0 | } |
856 | | |
857 | 31 | if (current_concurrency >= effective_max_concurrency) { |
858 | 7 | VLOG_DEBUG << fmt::format( |
859 | 0 | "ScannerContext {} current concurrency {} >= effective_max_concurrency {}, skip " |
860 | 0 | "pull", |
861 | 0 | ctx_id, current_concurrency, effective_max_concurrency); |
862 | 7 | return nullptr; |
863 | 7 | } |
864 | | |
865 | 24 | if (current_scan_task != nullptr) { |
866 | 3 | if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) { |
867 | | // This should not happen. |
868 | 2 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
869 | 2 | } |
870 | 1 | return current_scan_task; |
871 | 3 | } |
872 | | |
873 | 21 | if (!_pending_tasks.empty()) { |
874 | 19 | std::shared_ptr<ScanTask> next_scan_task; |
875 | 19 | next_scan_task = _pending_tasks.top(); |
876 | 19 | _pending_tasks.pop(); |
877 | 19 | return next_scan_task; |
878 | 19 | } else { |
879 | 2 | return nullptr; |
880 | 2 | } |
881 | 21 | } |
882 | | |
883 | 11 | bool ScannerContext::low_memory_mode() const { |
884 | 11 | return _local_state->low_memory_mode(); |
885 | 11 | } |
886 | | #include "common/compile_check_end.h" |
887 | | } // namespace doris |