be/src/exec/scan/scanner_context.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/scanner_context.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <glog/logging.h> |
23 | | #include <zconf.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <ctime> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | #include <ostream> |
30 | | #include <shared_mutex> |
31 | | #include <tuple> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/config.h" |
35 | | #include "common/exception.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/metrics/doris_metrics.h" |
38 | | #include "common/status.h" |
39 | | #include "core/block/block.h" |
40 | | #include "exec/operator/scan_operator.h" |
41 | | #include "exec/scan/scan_node.h" |
42 | | #include "exec/scan/scanner_scheduler.h" |
43 | | #include "runtime/descriptors.h" |
44 | | #include "runtime/exec_env.h" |
45 | | #include "runtime/runtime_profile.h" |
46 | | #include "runtime/runtime_state.h" |
47 | | #include "storage/tablet/tablet.h" |
48 | | #include "util/time.h" |
49 | | #include "util/uid_util.h" |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | using namespace std::chrono_literals; |
54 | | |
55 | | // ==================== ScannerContext ==================== |
56 | | ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, |
57 | | const TupleDescriptor* output_tuple_desc, |
58 | | const RowDescriptor* output_row_descriptor, |
59 | | const std::list<std::shared_ptr<ScannerDelegate>>& scanners, |
60 | | int64_t limit_, std::shared_ptr<Dependency> dependency, |
61 | | std::atomic<int64_t>* shared_scan_limit, |
62 | | std::shared_ptr<MemShareArbitrator> arb, |
63 | | std::shared_ptr<MemLimiter> limiter, int ins_idx, |
64 | | bool enable_adaptive_scan |
65 | | #ifdef BE_TEST |
66 | | , |
67 | | int num_parallel_instances |
68 | | #endif |
69 | | ) |
70 | 289k | : HasTaskExecutionCtx(state), |
71 | 289k | _state(state), |
72 | 289k | _local_state(local_state), |
73 | 289k | _output_tuple_desc(output_row_descriptor |
74 | 289k | ? output_row_descriptor->tuple_descriptors().front() |
75 | 289k | : output_tuple_desc), |
76 | 289k | _output_row_descriptor(output_row_descriptor), |
77 | 289k | _batch_size(state->batch_size()), |
78 | 289k | limit(limit_), |
79 | 289k | _shared_scan_limit(shared_scan_limit), |
80 | 289k | _all_scanners(scanners.begin(), scanners.end()), |
81 | | #ifndef BE_TEST |
82 | 289k | _scanner_scheduler(local_state->scan_scheduler(state)), |
83 | | _min_scan_concurrency_of_scan_scheduler( |
84 | 289k | _scanner_scheduler->get_min_active_scan_threads()), |
85 | 289k | _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state), |
86 | 289k | cast_set<int>(scanners.size()))), |
87 | | #else |
88 | | _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()), |
89 | | _min_scan_concurrency_of_scan_scheduler(0), |
90 | | _max_scan_concurrency(num_parallel_instances), |
91 | | #endif |
92 | 289k | _min_scan_concurrency(local_state->min_scanners_concurrency(state)), |
93 | 289k | _scanner_mem_limiter(limiter), |
94 | 289k | _mem_share_arb(arb), |
95 | 289k | _ins_idx(ins_idx), |
96 | 289k | _enable_adaptive_scanners(enable_adaptive_scan) { |
97 | 289k | DCHECK(_state != nullptr); |
98 | 289k | DCHECK(_output_row_descriptor == nullptr || |
99 | 289k | _output_row_descriptor->tuple_descriptors().size() == 1); |
100 | 289k | _query_id = _state->get_query_ctx()->query_id(); |
101 | 289k | _resource_ctx = _state->get_query_ctx()->resource_ctx(); |
102 | 289k | ctx_id = UniqueId::gen_uid().to_string(); |
103 | 1.29M | for (auto& scanner : _all_scanners) { |
104 | 1.29M | _pending_tasks.push(std::make_shared<ScanTask>(scanner)); |
105 | 1.29M | } |
106 | 289k | if (limit < 0) { |
107 | 288k | limit = -1; |
108 | 288k | } |
109 | 289k | _dependency = dependency; |
110 | | // Initialize adaptive processor |
111 | 289k | _adaptive_processor = ScannerAdaptiveProcessor::create_shared(); |
112 | 289k | DorisMetrics::instance()->scanner_ctx_cnt->increment(1); |
113 | 289k | } |
114 | | |
115 | 1.83M | void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t new_value) { |
116 | 1.83M | if (!_enable_adaptive_scanners) { |
117 | 0 | return; |
118 | 0 | } |
119 | | |
120 | 1.83M | int64_t new_scan_mem_limit = _mem_share_arb->update_mem_bytes(old_value, new_value); |
121 | 1.83M | _scanner_mem_limiter->update_mem_limit(new_scan_mem_limit); |
122 | 1.83M | _scanner_mem_limiter->update_arb_mem_bytes(new_value); |
123 | | |
124 | 18.4E | VLOG_DEBUG << fmt::format( |
125 | 18.4E | "adjust_scan_mem_limit. context = {}, new mem scan limit = {}, scanner mem bytes = {} " |
126 | 18.4E | "-> {}", |
127 | 18.4E | debug_string(), new_scan_mem_limit, old_value, new_value); |
128 | 1.83M | } |
129 | | |
130 | 1.67M | int ScannerContext::_available_pickup_scanner_count() { |
131 | 1.67M | if (!_enable_adaptive_scanners) { |
132 | 10.9k | return _max_scan_concurrency; |
133 | 10.9k | } |
134 | | |
135 | 1.66M | int min_scanners = std::max(1, _min_scan_concurrency); |
136 | 1.66M | int max_scanners = _scanner_mem_limiter->available_scanner_count(_ins_idx); |
137 | 1.66M | max_scanners = std::min(max_scanners, _max_scan_concurrency); |
138 | 1.66M | min_scanners = std::min(min_scanners, max_scanners); |
139 | 1.66M | if (_ins_idx == 0) { |
140 | | // Adjust memory limit via memory share arbitrator |
141 | 1.39M | _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), |
142 | 1.39M | _scanner_mem_limiter->get_estimated_block_mem_bytes()); |
143 | 1.39M | } |
144 | | |
145 | 1.66M | ScannerAdaptiveProcessor& P = *_adaptive_processor; |
146 | 1.66M | int& scanners = P.expected_scanners; |
147 | 1.66M | int64_t now = UnixMillis(); |
148 | | // Avoid frequent adjustment - only adjust every 100ms |
149 | 1.66M | if (now - P.adjust_scanners_last_timestamp <= config::doris_scanner_dynamic_interval_ms) { |
150 | 1.26M | return scanners; |
151 | 1.26M | } |
152 | 404k | P.adjust_scanners_last_timestamp = now; |
153 | 404k | auto old_scanners = P.expected_scanners; |
154 | | |
155 | 404k | scanners = std::max(min_scanners, scanners); |
156 | 404k | scanners = std::min(max_scanners, scanners); |
157 | 18.4E | VLOG_DEBUG << fmt::format( |
158 | 18.4E | "_available_pickup_scanner_count. context = {}, old_scanners = {}, scanners = {} " |
159 | 18.4E | ", min_scanners: {}, max_scanners: {}", |
160 | 18.4E | debug_string(), old_scanners, scanners, min_scanners, max_scanners); |
161 | | |
162 | | // TODO(gabriel): Scanners are scheduled adaptively based on the memory usage now. |
163 | 404k | return scanners; |
164 | 1.66M | } |
165 | | |
166 | | // After init function call, should not access _parent |
167 | 290k | Status ScannerContext::init() { |
168 | 290k | #ifndef BE_TEST |
169 | 290k | _scanner_profile = _local_state->_scanner_profile; |
170 | 290k | _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; |
171 | 290k | _scanner_memory_used_counter = _local_state->_memory_used_counter; |
172 | | |
173 | | // 3. get thread token |
174 | 290k | if (!_state->get_query_ctx()) { |
175 | 0 | return Status::InternalError("Query context of {} is not set", |
176 | 0 | print_id(_state->query_id())); |
177 | 0 | } |
178 | | |
179 | 290k | if (_state->get_query_ctx()->get_scan_scheduler()) { |
180 | 290k | _should_reset_thread_name = false; |
181 | 290k | } |
182 | | |
183 | 290k | auto scanner = _all_scanners.front().lock(); |
184 | 290k | DCHECK(scanner != nullptr); |
185 | | |
186 | 290k | if (auto* task_executor_scheduler = |
187 | 290k | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
188 | 290k | std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor(); |
189 | 290k | TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id)); |
190 | 290k | _task_handle = DORIS_TRY(task_executor->create_task( |
191 | 290k | task_id, []() { return 0.0; }, |
192 | 290k | config::task_executor_initial_max_concurrency_per_task > 0 |
193 | 290k | ? config::task_executor_initial_max_concurrency_per_task |
194 | 290k | : std::max(48, CpuInfo::num_cores() * 2), |
195 | 290k | std::chrono::milliseconds(100), std::nullopt)); |
196 | 290k | } |
197 | 290k | #endif |
198 | | // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator. |
199 | | // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value |
200 | | // is larger than 10MB. |
201 | 290k | _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); |
202 | | |
203 | | // Provide more memory for wide tables, increase proportionally by multiples of 300 |
204 | 290k | _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; |
205 | | |
206 | 290k | if (_all_scanners.empty()) { |
207 | 0 | _is_finished = true; |
208 | 0 | _set_scanner_done(); |
209 | 0 | } |
210 | | |
211 | | // Initialize memory limiter if memory-aware scheduling is enabled |
212 | 290k | if (_enable_adaptive_scanners) { |
213 | 288k | DCHECK(_scanner_mem_limiter && _mem_share_arb); |
214 | 288k | int64_t c = _scanner_mem_limiter->update_open_tasks_count(1); |
215 | | // TODO(gabriel): set estimated block size |
216 | 288k | _scanner_mem_limiter->reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES); |
217 | 288k | _scanner_mem_limiter->update_arb_mem_bytes(DEFAULT_SCANNER_MEM_BYTES); |
218 | 288k | if (c == 0) { |
219 | | // First scanner context to open, adjust scan memory limit |
220 | 218k | _adjust_scan_mem_limit(DEFAULT_SCANNER_MEM_BYTES, |
221 | 218k | _scanner_mem_limiter->get_arb_scanner_mem_bytes()); |
222 | 218k | } |
223 | 288k | } |
224 | | |
225 | | // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. |
226 | | // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. |
227 | | // you can refer https://github.com/apache/doris/issues/35340 for details. |
228 | 290k | const int32_t max_column_reader_num = _state->max_column_reader_num(); |
229 | | |
230 | 290k | if (_max_scan_concurrency != 1 && max_column_reader_num > 0) { |
231 | 180k | int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size()); |
232 | 180k | int32_t current_column_num = scan_column_num * _max_scan_concurrency; |
233 | 180k | if (current_column_num > max_column_reader_num) { |
234 | 0 | int32_t new_max_thread_num = max_column_reader_num / scan_column_num; |
235 | 0 | new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; |
236 | 0 | if (new_max_thread_num < _max_scan_concurrency) { |
237 | 0 | int32_t origin_max_thread_num = _max_scan_concurrency; |
238 | 0 | _max_scan_concurrency = new_max_thread_num; |
239 | 0 | LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) |
240 | 0 | << " scan's max_thread_num from " << origin_max_thread_num << " to " |
241 | 0 | << _max_scan_concurrency << ",column num: " << scan_column_num |
242 | 0 | << ", max_column_reader_num: " << max_column_reader_num; |
243 | 0 | } |
244 | 0 | } |
245 | 180k | } |
246 | | |
247 | 290k | COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency); |
248 | 290k | COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency); |
249 | | |
250 | 290k | std::unique_lock<std::mutex> l(_transfer_lock); |
251 | 290k | RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
252 | | |
253 | 290k | return Status::OK(); |
254 | 290k | } |
255 | | |
256 | 290k | ScannerContext::~ScannerContext() { |
257 | 290k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); |
258 | 290k | _completed_tasks.clear(); |
259 | 290k | BlockUPtr block; |
260 | 486k | while (_free_blocks.try_dequeue(block)) { |
261 | | // do nothing |
262 | 195k | } |
263 | 290k | block.reset(); |
264 | 290k | DorisMetrics::instance()->scanner_ctx_cnt->increment(-1); |
265 | | |
266 | | // Cleanup memory limiter if last context closing |
267 | 290k | if (_enable_adaptive_scanners) { |
268 | 288k | if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) { |
269 | | // Last scanner context to close, reset scan memory limit |
270 | 218k | _adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), 0); |
271 | 218k | } |
272 | 288k | } |
273 | | |
274 | 290k | if (_task_handle) { |
275 | 0 | if (auto* task_executor_scheduler = |
276 | 0 | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
277 | 0 | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
278 | 0 | } |
279 | 0 | _task_handle = nullptr; |
280 | 0 | } |
281 | 290k | } |
282 | | |
283 | 1.38M | BlockUPtr ScannerContext::get_free_block(bool force) { |
284 | 1.38M | BlockUPtr block = nullptr; |
285 | 1.38M | if (_free_blocks.try_dequeue(block)) { |
286 | 835k | DCHECK(block->mem_reuse()); |
287 | 835k | _block_memory_usage -= block->allocated_bytes(); |
288 | 835k | _scanner_memory_used_counter->set(_block_memory_usage); |
289 | | // A free block is reused, so the memory usage should be decreased |
290 | | // The caller of get_free_block will increase the memory usage |
291 | 835k | } else if (_block_memory_usage < _max_bytes_in_queue || force) { |
292 | 553k | _newly_create_free_blocks_num->update(1); |
293 | 553k | block = Block::create_unique(_output_tuple_desc->slots(), 0); |
294 | 553k | } |
295 | 1.38M | return block; |
296 | 1.38M | } |
297 | | |
298 | 1.38M | void ScannerContext::return_free_block(BlockUPtr block) { |
299 | | // If under low memory mode, should not return the freeblock, it will occupy too much memory. |
300 | 1.38M | if (!_local_state->low_memory_mode() && block->mem_reuse() && |
301 | 1.38M | _block_memory_usage < _max_bytes_in_queue) { |
302 | 1.03M | size_t block_size_to_reuse = block->allocated_bytes(); |
303 | 1.03M | _block_memory_usage += block_size_to_reuse; |
304 | 1.03M | _scanner_memory_used_counter->set(_block_memory_usage); |
305 | 1.03M | block->clear_column_data(); |
306 | | // Free blocks is used to improve memory efficiency. Failure during pushing back |
307 | | // free block will not incur any bad result so just ignore the return value. |
308 | 1.03M | _free_blocks.enqueue(std::move(block)); |
309 | 1.03M | } |
310 | 1.38M | } |
311 | | |
312 | | Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task, |
313 | 1.39M | std::unique_lock<std::mutex>& /*transfer_lock*/) { |
314 | | // increase _num_finished_scanners no matter the scan_task is submitted successfully or not. |
315 | | // since if submit failed, it will be added back by ScannerContext::push_back_scan_task |
316 | | // and _num_finished_scanners will be reduced. |
317 | | // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task |
318 | | // see ScannerScheduler::_scanner_scan. |
319 | 1.39M | _in_flight_tasks_num++; |
320 | 1.39M | return _scanner_scheduler->submit(shared_from_this(), scan_task); |
321 | 1.39M | } |
322 | | |
323 | 0 | void ScannerContext::clear_free_blocks() { |
324 | 0 | clear_blocks(_free_blocks); |
325 | 0 | } |
326 | | |
327 | 1.38M | void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) { |
328 | 1.38M | if (scan_task->status_ok()) { |
329 | 1.38M | if (scan_task->cached_block && scan_task->cached_block->rows() > 0) { |
330 | 308k | Status st = validate_block_schema(scan_task->cached_block.get()); |
331 | 308k | if (!st.ok()) { |
332 | 0 | scan_task->set_status(st); |
333 | 0 | } |
334 | 308k | } |
335 | 1.38M | } |
336 | | |
337 | 1.38M | std::lock_guard<std::mutex> l(_transfer_lock); |
338 | 1.38M | if (!scan_task->status_ok()) { |
339 | 1.38k | _process_status = scan_task->get_status(); |
340 | 1.38k | } |
341 | 1.38M | _completed_tasks.push_back(scan_task); |
342 | 1.38M | _in_flight_tasks_num--; |
343 | | |
344 | 1.38M | _dependency->set_ready(); |
345 | 1.38M | } |
346 | | |
347 | 1.38M | Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) { |
348 | 1.38M | if (state->is_cancelled()) { |
349 | 1 | _set_scanner_done(); |
350 | 1 | return state->cancel_reason(); |
351 | 1 | } |
352 | 1.38M | std::unique_lock l(_transfer_lock); |
353 | | |
354 | 1.38M | if (!_process_status.ok()) { |
355 | 1.23k | _set_scanner_done(); |
356 | 1.23k | return _process_status; |
357 | 1.23k | } |
358 | | |
359 | 1.38M | std::shared_ptr<ScanTask> scan_task = nullptr; |
360 | | |
361 | 1.38M | if (!_completed_tasks.empty() && !done()) { |
362 | | // https://en.cppreference.com/w/cpp/container/list/front |
363 | | // The behavior is undefined if the list is empty. |
364 | 1.38M | scan_task = _completed_tasks.front(); |
365 | 1.38M | _completed_tasks.pop_front(); |
366 | 1.38M | } |
367 | | |
368 | 1.38M | if (scan_task != nullptr) { |
369 | | // The abnormal status of scanner may come from the execution of the scanner itself, |
370 | | // or come from the scanner scheduler, such as TooManyTasks. |
371 | 1.38M | if (!scan_task->status_ok()) { |
372 | | // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while. |
373 | 0 | _process_status = scan_task->get_status(); |
374 | 0 | _set_scanner_done(); |
375 | 0 | return _process_status; |
376 | 0 | } |
377 | | |
378 | 1.38M | if (scan_task->cached_block) { |
379 | | // No need to worry about small block, block is merged together when they are appended to cached_blocks. |
380 | 1.38M | auto current_block = std::move(scan_task->cached_block); |
381 | 1.38M | auto block_size = current_block->allocated_bytes(); |
382 | 1.38M | scan_task->cached_block.reset(); |
383 | 1.38M | _block_memory_usage -= block_size; |
384 | | // consume current block |
385 | 1.38M | block->swap(*current_block); |
386 | 1.38M | return_free_block(std::move(current_block)); |
387 | 1.38M | } |
388 | 18.4E | VLOG_DEBUG << fmt::format( |
389 | 18.4E | "ScannerContext {} get block from queue, current scan " |
390 | 18.4E | "task remaing cached_block size {}, eos {}, scheduled tasks {}", |
391 | 18.4E | ctx_id, _completed_tasks.size(), scan_task->is_eos(), _in_flight_tasks_num); |
392 | 1.38M | if (scan_task->is_eos()) { |
393 | | // 1. if eos, record a finished scanner. |
394 | 1.28M | _num_finished_scanners++; |
395 | 1.28M | RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
396 | 1.28M | } else { |
397 | 97.7k | scan_task->set_state(ScanTask::State::IN_FLIGHT); |
398 | 97.7k | RETURN_IF_ERROR( |
399 | 97.7k | _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l)); |
400 | 97.7k | } |
401 | 1.38M | } |
402 | | |
403 | 1.38M | if (_completed_tasks.empty() && |
404 | 1.38M | (_num_finished_scanners == _all_scanners.size() || |
405 | 1.33M | (_shared_scan_limit->load(std::memory_order_acquire) == 0 && _in_flight_tasks_num == 0))) { |
406 | 287k | _set_scanner_done(); |
407 | 287k | _is_finished = true; |
408 | 287k | } |
409 | | |
410 | 1.38M | *eos = done(); |
411 | | |
412 | 1.38M | if (_completed_tasks.empty()) { |
413 | 1.33M | _dependency->block(); |
414 | 1.33M | } |
415 | | |
416 | 1.38M | return Status::OK(); |
417 | 1.38M | } |
418 | | |
419 | 308k | Status ScannerContext::validate_block_schema(Block* block) { |
420 | 308k | size_t index = 0; |
421 | 1.16M | for (auto& slot : _output_tuple_desc->slots()) { |
422 | 1.16M | auto& data = block->get_by_position(index++); |
423 | 1.16M | if (data.column->is_nullable() != data.type->is_nullable()) { |
424 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
425 | 0 | "column(name: {}) nullable({}) does not match type nullable({}), slot(id: " |
426 | 0 | "{}, " |
427 | 0 | "name:{})", |
428 | 0 | data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(), |
429 | 0 | slot->col_name()); |
430 | 0 | } |
431 | | |
432 | 1.16M | if (data.column->is_nullable() != slot->is_nullable()) { |
433 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
434 | 0 | "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) " |
435 | 0 | "nullable({})", |
436 | 0 | data.name, data.column->is_nullable(), slot->id(), slot->col_name(), |
437 | 0 | slot->is_nullable()); |
438 | 0 | } |
439 | 1.16M | } |
440 | 308k | return Status::OK(); |
441 | 308k | } |
442 | | |
443 | 578k | void ScannerContext::stop_scanners(RuntimeState* state) { |
444 | 578k | std::lock_guard<std::mutex> l(_transfer_lock); |
445 | 578k | if (_should_stop) { |
446 | 287k | return; |
447 | 287k | } |
448 | 291k | _should_stop = true; |
449 | 291k | _set_scanner_done(); |
450 | 1.29M | for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) { |
451 | 1.29M | if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) { |
452 | 1.29M | sc->_scanner->try_stop(); |
453 | 1.29M | } |
454 | 1.29M | } |
455 | 291k | _completed_tasks.clear(); |
456 | 291k | if (_task_handle) { |
457 | 291k | if (auto* task_executor_scheduler = |
458 | 291k | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
459 | 291k | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
460 | 291k | } |
461 | 291k | _task_handle = nullptr; |
462 | 291k | } |
463 | | // TODO yiguolei, call mark close to scanners |
464 | 291k | if (state->enable_profile()) { |
465 | 1.74k | std::stringstream scanner_statistics; |
466 | 1.74k | std::stringstream scanner_rows_read; |
467 | 1.74k | std::stringstream scanner_wait_worker_time; |
468 | 1.74k | std::stringstream scanner_projection; |
469 | 1.74k | std::stringstream scanner_prepare_time; |
470 | 1.74k | std::stringstream scanner_open_time; |
471 | 1.74k | scanner_statistics << "["; |
472 | 1.74k | scanner_rows_read << "["; |
473 | 1.74k | scanner_wait_worker_time << "["; |
474 | 1.74k | scanner_projection << "["; |
475 | 1.74k | scanner_prepare_time << "["; |
476 | 1.74k | scanner_open_time << "["; |
477 | | // Scanners can in 3 state |
478 | | // state 1: in scanner context, not scheduled |
479 | | // state 2: in scanner worker pool's queue, scheduled but not running |
480 | | // state 3: scanner is running. |
481 | 2.95k | for (auto& scanner_ref : _all_scanners) { |
482 | 2.95k | auto scanner = scanner_ref.lock(); |
483 | 2.95k | if (scanner == nullptr) { |
484 | 0 | continue; |
485 | 0 | } |
486 | | // Add per scanner running time before close them |
487 | 2.95k | scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), |
488 | 2.95k | TUnit::TIME_NS) |
489 | 2.95k | << ", "; |
490 | 2.95k | scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(), |
491 | 2.95k | TUnit::TIME_NS) |
492 | 2.95k | << ", "; |
493 | 2.95k | scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), |
494 | 2.95k | TUnit::UNIT) |
495 | 2.95k | << ", "; |
496 | 2.95k | scanner_wait_worker_time |
497 | 2.95k | << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), |
498 | 2.95k | TUnit::TIME_NS) |
499 | 2.95k | << ", "; |
500 | 2.95k | scanner_prepare_time << PrettyPrinter::print( |
501 | 2.95k | scanner->_scanner->get_prepare_time_cost_ns(), |
502 | 2.95k | TUnit::TIME_NS) |
503 | 2.95k | << ", "; |
504 | 2.95k | scanner_open_time << PrettyPrinter::print(scanner->_scanner->get_open_time_cost_ns(), |
505 | 2.95k | TUnit::TIME_NS) |
506 | 2.95k | << ", "; |
507 | | // since there are all scanners, some scanners is running, so that could not call scanner |
508 | | // close here. |
509 | 2.95k | } |
510 | 1.74k | scanner_statistics << "]"; |
511 | 1.74k | scanner_rows_read << "]"; |
512 | 1.74k | scanner_wait_worker_time << "]"; |
513 | 1.74k | scanner_projection << "]"; |
514 | 1.74k | scanner_prepare_time << "]"; |
515 | 1.74k | scanner_open_time << "]"; |
516 | 1.74k | _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); |
517 | 1.74k | _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); |
518 | 1.74k | _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); |
519 | 1.74k | _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str()); |
520 | 1.74k | _scanner_profile->add_info_string("PerScannerPrepareTime", scanner_prepare_time.str()); |
521 | 1.74k | _scanner_profile->add_info_string("PerScannerOpenTime", scanner_open_time.str()); |
522 | 1.74k | } |
523 | 291k | } |
524 | | |
525 | 20 | std::string ScannerContext::debug_string() { |
526 | 20 | return fmt::format( |
527 | 20 | "_query_id: {}, id: {}, total scanners: {}, pending tasks: {}, completed tasks: {}," |
528 | 20 | " _should_stop: {}, _is_finished: {}, free blocks: {}," |
529 | 20 | " limit: {}, _in_flight_tasks_num: {}, remaining_limit: {}, _num_running_scanners: {}, " |
530 | 20 | "_max_thread_num: {}," |
531 | 20 | " _max_bytes_in_queue: {}, _ins_idx: {}, _enable_adaptive_scanners: {}, " |
532 | 20 | "_mem_share_arb: {}, _scanner_mem_limiter: {}", |
533 | 20 | print_id(_query_id), ctx_id, _all_scanners.size(), _pending_tasks.size(), |
534 | 20 | _completed_tasks.size(), _should_stop, _is_finished, _free_blocks.size_approx(), limit, |
535 | 20 | _shared_scan_limit->load(std::memory_order_relaxed), _in_flight_tasks_num, |
536 | 20 | _num_finished_scanners, _max_scan_concurrency, _max_bytes_in_queue, _ins_idx, |
537 | 20 | _enable_adaptive_scanners, |
538 | 20 | _enable_adaptive_scanners ? _mem_share_arb->debug_string() : "NULL", |
539 | 20 | _enable_adaptive_scanners ? _scanner_mem_limiter->debug_string() : "NULL"); |
540 | 20 | } |
541 | | |
542 | 579k | void ScannerContext::_set_scanner_done() { |
543 | 579k | _dependency->set_always_ready(); |
544 | 579k | } |
545 | | |
546 | 2.78M | void ScannerContext::update_peak_running_scanner(int num) { |
547 | 2.78M | #ifndef BE_TEST |
548 | 2.78M | _local_state->_peak_running_scanner->add(num); |
549 | 2.78M | #endif |
550 | 2.78M | if (_enable_adaptive_scanners) { |
551 | 2.76M | _scanner_mem_limiter->update_running_tasks_count(num); |
552 | 2.76M | } |
553 | 2.78M | } |
554 | | |
555 | 1.38M | void ScannerContext::reestimated_block_mem_bytes(int64_t num) { |
556 | 1.38M | if (_enable_adaptive_scanners) { |
557 | 1.37M | _scanner_mem_limiter->reestimated_block_mem_bytes(num); |
558 | 1.37M | } |
559 | 1.38M | } |
560 | | |
561 | | int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock, |
562 | 1.67M | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
563 | | // Get effective max concurrency considering adaptive scheduling |
564 | 1.67M | int32_t effective_max_concurrency = _available_pickup_scanner_count(); |
565 | 1.67M | DCHECK_LE(effective_max_concurrency, _max_scan_concurrency); |
566 | | |
567 | | // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks. |
568 | 1.67M | int32_t margin_1 = _min_scan_concurrency - |
569 | 1.67M | (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num); |
570 | | |
571 | | // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. |
572 | 1.67M | int32_t margin_2 = |
573 | 1.67M | _min_scan_concurrency_of_scan_scheduler - |
574 | 1.67M | (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); |
575 | | |
576 | | // margin_3 is used to respect adaptive max concurrency limit |
577 | 1.67M | int32_t margin_3 = |
578 | 1.67M | std::max(effective_max_concurrency - |
579 | 1.67M | (cast_set<int32_t>(_completed_tasks.size()) + _in_flight_tasks_num), |
580 | 1.67M | 1); |
581 | | |
582 | 1.67M | if (margin_1 <= 0 && margin_2 <= 0) { |
583 | 1 | return 0; |
584 | 1 | } |
585 | | |
586 | 1.67M | int32_t margin = std::max(margin_1, margin_2); |
587 | 1.67M | if (_enable_adaptive_scanners) { |
588 | 1.66M | margin = std::min(margin, margin_3); // Cap by adaptive limit |
589 | 1.66M | } |
590 | | |
591 | 1.67M | if (low_memory_mode()) { |
592 | | // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`. |
593 | | // So that we will not submit too many scan tasks to scheduler. |
594 | 0 | margin = std::min(low_memory_mode_scanners() - _in_flight_tasks_num, margin); |
595 | 0 | } |
596 | | |
597 | 1.67M | VLOG_DEBUG << fmt::format( |
598 | 1 | "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " |
599 | 1 | "({} + {}), margin_3: {} = {} - ({} + {}), margin: {}, adaptive: {}", |
600 | 1 | print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _completed_tasks.size(), |
601 | 1 | _in_flight_tasks_num, margin_2, _min_scan_concurrency_of_scan_scheduler, |
602 | 1 | _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), |
603 | 1 | margin_3, effective_max_concurrency, _completed_tasks.size(), _in_flight_tasks_num, |
604 | 1 | margin, _enable_adaptive_scanners); |
605 | | |
606 | 1.67M | return margin; |
607 | 1.67M | } |
608 | | |
609 | | // This function must be called with: |
610 | | // 1. _transfer_lock held. |
611 | | // 2. ScannerScheduler::_lock held. |
612 | | Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
613 | | std::unique_lock<std::mutex>& transfer_lock, |
614 | 1.67M | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
615 | 1.67M | if (current_scan_task && |
616 | 1.67M | (current_scan_task->cached_block != nullptr || current_scan_task->is_eos())) { |
617 | 1 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
618 | 1 | } |
619 | | |
620 | 1.67M | std::list<std::shared_ptr<ScanTask>> tasks_to_submit; |
621 | | |
622 | 1.67M | int32_t margin = _get_margin(transfer_lock, scheduler_lock); |
623 | | |
624 | | // margin is less than zero. Means this scan operator could not submit any scan task for now. |
625 | 1.67M | if (margin <= 0) { |
626 | | // Be careful with current scan task. |
627 | | // We need to add it back to task queue to make sure it could be resubmitted. |
628 | 0 | if (current_scan_task) { |
629 | | // This usually happens when we should downgrade the concurrency. |
630 | 0 | current_scan_task->set_state(ScanTask::State::PENDING); |
631 | 0 | _pending_tasks.push(current_scan_task); |
632 | 0 | VLOG_DEBUG << fmt::format( |
633 | 0 | "{} push back scanner to task queue, because diff <= 0, _completed_tasks size " |
634 | 0 | "{}, _in_flight_tasks_num {}", |
635 | 0 | ctx_id, _completed_tasks.size(), _in_flight_tasks_num); |
636 | 0 | } |
637 | |
|
638 | 0 | #ifndef NDEBUG |
639 | | // This DCHECK is necessary. |
640 | | // We need to make sure each scan operator could have at least 1 scan tasks. |
641 | | // Or this scan operator will not be re-scheduled. |
642 | 0 | if (!_pending_tasks.empty() && _in_flight_tasks_num == 0 && _completed_tasks.empty()) { |
643 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
644 | 0 | } |
645 | 0 | #endif |
646 | | |
647 | 0 | return Status::OK(); |
648 | 0 | } |
649 | | |
650 | 1.67M | bool first_pull = true; |
651 | | |
652 | 3.06M | while (margin-- > 0) { |
653 | 1.75M | std::shared_ptr<ScanTask> task_to_run; |
654 | 1.75M | const int32_t current_concurrency = cast_set<int32_t>( |
655 | 1.75M | _completed_tasks.size() + _in_flight_tasks_num + tasks_to_submit.size()); |
656 | 18.4E | VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id, |
657 | 18.4E | current_concurrency, _completed_tasks.size(), |
658 | 18.4E | _in_flight_tasks_num, tasks_to_submit.size()); |
659 | 1.75M | if (first_pull) { |
660 | 1.67M | task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency); |
661 | 1.67M | if (task_to_run == nullptr) { |
662 | | // In two situations we will get nullptr. |
663 | | // 1. current_concurrency already reached _max_scan_concurrency. |
664 | | // 2. all scanners are finished. |
665 | 346k | if (current_scan_task) { |
666 | 9 | DCHECK(current_scan_task->cached_block == nullptr); |
667 | 9 | DCHECK(!current_scan_task->is_eos()); |
668 | 9 | if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) { |
669 | | // This should not happen. |
670 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
671 | 0 | "Scanner scheduler logical error."); |
672 | 0 | } |
673 | | // Current scan task is not scheduled, we need to add it back to task queue to make sure it could be resubmitted. |
674 | 9 | current_scan_task->set_state(ScanTask::State::PENDING); |
675 | 9 | _pending_tasks.push(current_scan_task); |
676 | 9 | } |
677 | 346k | } |
678 | 1.67M | first_pull = false; |
679 | 1.67M | } else { |
680 | 77.0k | task_to_run = _pull_next_scan_task(nullptr, current_concurrency); |
681 | 77.0k | } |
682 | | |
683 | 1.75M | if (task_to_run) { |
684 | 1.39M | tasks_to_submit.push_back(task_to_run); |
685 | 1.39M | } else { |
686 | 364k | break; |
687 | 364k | } |
688 | 1.75M | } |
689 | | |
690 | 1.67M | if (tasks_to_submit.empty()) { |
691 | 346k | return Status::OK(); |
692 | 346k | } |
693 | | |
694 | 18.4E | VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}", |
695 | 18.4E | print_id(_query_id), ctx_id, tasks_to_submit.size(), |
696 | 18.4E | _pending_tasks.size()); |
697 | | |
698 | 1.39M | for (auto& scan_task_iter : tasks_to_submit) { |
699 | 1.39M | Status submit_status = submit_scan_task(scan_task_iter, transfer_lock); |
700 | 1.39M | if (!submit_status.ok()) { |
701 | 0 | _process_status = submit_status; |
702 | 0 | _set_scanner_done(); |
703 | 0 | return _process_status; |
704 | 0 | } |
705 | 1.39M | } |
706 | | |
707 | 1.33M | return Status::OK(); |
708 | 1.33M | } |
709 | | |
710 | | std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task( |
711 | 1.75M | std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) { |
712 | 1.75M | int32_t effective_max_concurrency = _max_scan_concurrency; |
713 | 1.75M | if (_enable_adaptive_scanners) { |
714 | 1.73M | effective_max_concurrency = _adaptive_processor->expected_scanners > 0 |
715 | 1.73M | ? _adaptive_processor->expected_scanners |
716 | 1.73M | : _max_scan_concurrency; |
717 | 1.73M | } |
718 | | |
719 | 1.75M | if (current_concurrency >= effective_max_concurrency) { |
720 | 8.37k | VLOG_DEBUG << fmt::format( |
721 | 0 | "ScannerContext {} current concurrency {} >= effective_max_concurrency {}, skip " |
722 | 0 | "pull", |
723 | 0 | ctx_id, current_concurrency, effective_max_concurrency); |
724 | 8.37k | return nullptr; |
725 | 8.37k | } |
726 | | |
727 | 1.74M | if (current_scan_task != nullptr) { |
728 | 97.9k | if (current_scan_task->cached_block != nullptr || current_scan_task->is_eos()) { |
729 | | // This should not happen. |
730 | 2 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
731 | 2 | } |
732 | 97.9k | return current_scan_task; |
733 | 97.9k | } |
734 | | |
735 | 1.64M | if (!_pending_tasks.empty()) { |
736 | | // Skip submitting more pending scanners once the LIMIT budget is |
737 | | // exhausted; they would only open and immediately EOF. |
738 | 1.29M | if (_shared_scan_limit->load(std::memory_order_acquire) == 0) { |
739 | 103 | return nullptr; |
740 | 103 | } |
741 | 1.29M | std::shared_ptr<ScanTask> next_scan_task; |
742 | 1.29M | next_scan_task = _pending_tasks.top(); |
743 | 1.29M | _pending_tasks.pop(); |
744 | 1.29M | return next_scan_task; |
745 | 1.29M | } else { |
746 | 355k | return nullptr; |
747 | 355k | } |
748 | 1.64M | } |
749 | | |
750 | 5.83M | bool ScannerContext::low_memory_mode() const { |
751 | 5.83M | return _local_state->low_memory_mode(); |
752 | 5.83M | } |
753 | | } // namespace doris |