be/src/exec/scan/scanner_context.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/scanner_context.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <glog/logging.h> |
23 | | #include <zconf.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <ctime> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | #include <ostream> |
30 | | #include <shared_mutex> |
31 | | #include <tuple> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/config.h" |
35 | | #include "common/exception.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/metrics/doris_metrics.h" |
38 | | #include "common/status.h" |
39 | | #include "core/block/block.h" |
40 | | #include "exec/operator/scan_operator.h" |
41 | | #include "exec/scan/scan_node.h" |
42 | | #include "exec/scan/scanner_scheduler.h" |
43 | | #include "runtime/descriptors.h" |
44 | | #include "runtime/exec_env.h" |
45 | | #include "runtime/runtime_profile.h" |
46 | | #include "runtime/runtime_state.h" |
47 | | #include "storage/tablet/tablet.h" |
48 | | #include "util/time.h" |
49 | | #include "util/uid_util.h" |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | using namespace std::chrono_literals; |
54 | | #include "common/compile_check_begin.h" |
55 | | ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, |
56 | | const TupleDescriptor* output_tuple_desc, |
57 | | const RowDescriptor* output_row_descriptor, |
58 | | const std::list<std::shared_ptr<ScannerDelegate>>& scanners, |
59 | | int64_t limit_, std::shared_ptr<Dependency> dependency, |
60 | | std::atomic<int64_t>* shared_scan_limit |
61 | | #ifdef BE_TEST |
62 | | , |
63 | | int num_parallel_instances |
64 | | #endif |
65 | | ) |
66 | 235k | : HasTaskExecutionCtx(state), |
67 | 235k | _state(state), |
68 | 235k | _local_state(local_state), |
69 | 235k | _output_tuple_desc(output_row_descriptor |
70 | 235k | ? output_row_descriptor->tuple_descriptors().front() |
71 | 235k | : output_tuple_desc), |
72 | 235k | _output_row_descriptor(output_row_descriptor), |
73 | 235k | _batch_size(state->batch_size()), |
74 | 235k | limit(limit_), |
75 | 235k | _shared_scan_limit(shared_scan_limit), |
76 | 235k | _all_scanners(scanners.begin(), scanners.end()), |
77 | | #ifndef BE_TEST |
78 | 235k | _scanner_scheduler(local_state->scan_scheduler(state)), |
79 | | _min_scan_concurrency_of_scan_scheduler( |
80 | 235k | _scanner_scheduler->get_min_active_scan_threads()), |
81 | 235k | _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state), |
82 | 235k | cast_set<int>(scanners.size()))), |
83 | | #else |
84 | | _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()), |
85 | | _min_scan_concurrency_of_scan_scheduler(0), |
86 | | _max_scan_concurrency(num_parallel_instances), |
87 | | #endif |
88 | 235k | _min_scan_concurrency(local_state->min_scanners_concurrency(state)) { |
89 | 235k | DCHECK(_state != nullptr); |
90 | 235k | DCHECK(_output_row_descriptor == nullptr || |
91 | 235k | _output_row_descriptor->tuple_descriptors().size() == 1); |
92 | 235k | _query_id = _state->get_query_ctx()->query_id(); |
93 | 235k | _resource_ctx = _state->get_query_ctx()->resource_ctx(); |
94 | 235k | ctx_id = UniqueId::gen_uid().to_string(); |
95 | 975k | for (auto& scanner : _all_scanners) { |
96 | 975k | _pending_scanners.push(std::make_shared<ScanTask>(scanner)); |
97 | 975k | }; |
98 | 237k | if (limit < 0) { |
99 | 237k | limit = -1; |
100 | 237k | } |
101 | 235k | _dependency = dependency; |
102 | 235k | DorisMetrics::instance()->scanner_ctx_cnt->increment(1); |
103 | 235k | } |
104 | | |
105 | 235k | int64_t ScannerContext::acquire_limit_quota(int64_t desired) { |
106 | 235k | DCHECK(desired > 0); |
107 | 235k | int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire); |
108 | 235k | while (true) { |
109 | 235k | if (remaining < 0) { |
110 | | // No limit set, grant all desired rows. |
111 | 235k | return desired; |
112 | 235k | } |
113 | 429 | if (remaining == 0) { |
114 | 9 | return 0; |
115 | 9 | } |
116 | 420 | int64_t granted = std::min(desired, remaining); |
117 | 420 | if (_shared_scan_limit->compare_exchange_weak(remaining, remaining - granted, |
118 | 420 | std::memory_order_acq_rel, |
119 | 437 | std::memory_order_acquire)) { |
120 | 437 | return granted; |
121 | 437 | } |
122 | | // CAS failed, `remaining` is updated to current value, retry. |
123 | 420 | } |
124 | 235k | } |
125 | | |
126 | | // After init function call, should not access _parent |
127 | 235k | Status ScannerContext::init() { |
128 | 235k | #ifndef BE_TEST |
129 | 235k | _scanner_profile = _local_state->_scanner_profile; |
130 | 235k | _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; |
131 | 235k | _scanner_memory_used_counter = _local_state->_memory_used_counter; |
132 | | |
133 | | // 3. get thread token |
134 | 235k | if (!_state->get_query_ctx()) { |
135 | 0 | return Status::InternalError("Query context of {} is not set", |
136 | 0 | print_id(_state->query_id())); |
137 | 0 | } |
138 | | |
139 | 236k | if (_state->get_query_ctx()->get_scan_scheduler()) { |
140 | 236k | _should_reset_thread_name = false; |
141 | 236k | } |
142 | | |
143 | 235k | auto scanner = _all_scanners.front().lock(); |
144 | 235k | DCHECK(scanner != nullptr); |
145 | | |
146 | 235k | if (auto* task_executor_scheduler = |
147 | 236k | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
148 | 236k | std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor(); |
149 | 236k | TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id)); |
150 | 236k | _task_handle = DORIS_TRY(task_executor->create_task( |
151 | 236k | task_id, []() { return 0.0; }, |
152 | 236k | config::task_executor_initial_max_concurrency_per_task > 0 |
153 | 236k | ? config::task_executor_initial_max_concurrency_per_task |
154 | 236k | : std::max(48, CpuInfo::num_cores() * 2), |
155 | 236k | std::chrono::milliseconds(100), std::nullopt)); |
156 | 236k | } |
157 | 235k | #endif |
158 | | // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator. |
159 | | // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value |
160 | | // is larger than 10MB. |
161 | 235k | _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); |
162 | | |
163 | | // Provide more memory for wide tables, increase proportionally by multiples of 300 |
164 | 235k | _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; |
165 | | |
166 | 235k | if (_all_scanners.empty()) { |
167 | 0 | _is_finished = true; |
168 | 0 | _set_scanner_done(); |
169 | 0 | } |
170 | | |
171 | | // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. |
172 | | // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. |
173 | | // you can refer https://github.com/apache/doris/issues/35340 for details. |
174 | 235k | const int32_t max_column_reader_num = _state->max_column_reader_num(); |
175 | | |
176 | 235k | if (_max_scan_concurrency != 1 && max_column_reader_num > 0) { |
177 | 119k | int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size()); |
178 | 119k | int32_t current_column_num = scan_column_num * _max_scan_concurrency; |
179 | 119k | if (current_column_num > max_column_reader_num) { |
180 | 0 | int32_t new_max_thread_num = max_column_reader_num / scan_column_num; |
181 | 0 | new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; |
182 | 0 | if (new_max_thread_num < _max_scan_concurrency) { |
183 | 0 | int32_t origin_max_thread_num = _max_scan_concurrency; |
184 | 0 | _max_scan_concurrency = new_max_thread_num; |
185 | 0 | LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) |
186 | 0 | << " scan's max_thread_num from " << origin_max_thread_num << " to " |
187 | 0 | << _max_scan_concurrency << ",column num: " << scan_column_num |
188 | 0 | << ", max_column_reader_num: " << max_column_reader_num; |
189 | 0 | } |
190 | 0 | } |
191 | 119k | } |
192 | | |
193 | 235k | COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency); |
194 | 235k | COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency); |
195 | | |
196 | 235k | std::unique_lock<std::mutex> l(_transfer_lock); |
197 | 235k | RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
198 | | |
199 | 235k | return Status::OK(); |
200 | 235k | } |
201 | | |
202 | 237k | ScannerContext::~ScannerContext() { |
203 | 237k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); |
204 | 237k | _tasks_queue.clear(); |
205 | 237k | BlockUPtr block; |
206 | 415k | while (_free_blocks.try_dequeue(block)) { |
207 | | // do nothing |
208 | 177k | } |
209 | 237k | block.reset(); |
210 | 237k | DorisMetrics::instance()->scanner_ctx_cnt->increment(-1); |
211 | 237k | if (_task_handle) { |
212 | 0 | if (auto* task_executor_scheduler = |
213 | 0 | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
214 | 0 | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
215 | 0 | } |
216 | 0 | _task_handle = nullptr; |
217 | 0 | } |
218 | 237k | } |
219 | | |
220 | 1.01M | BlockUPtr ScannerContext::get_free_block(bool force) { |
221 | 1.01M | BlockUPtr block = nullptr; |
222 | 1.01M | if (_free_blocks.try_dequeue(block)) { |
223 | 571k | DCHECK(block->mem_reuse()); |
224 | 571k | _block_memory_usage -= block->allocated_bytes(); |
225 | 571k | _scanner_memory_used_counter->set(_block_memory_usage); |
226 | | // A free block is reused, so the memory usage should be decreased |
227 | | // The caller of get_free_block will increase the memory usage |
228 | 571k | } else if (_block_memory_usage < _max_bytes_in_queue || force) { |
229 | 450k | _newly_create_free_blocks_num->update(1); |
230 | 450k | block = Block::create_unique(_output_tuple_desc->slots(), 0); |
231 | 450k | } |
232 | 1.01M | return block; |
233 | 1.01M | } |
234 | | |
235 | 1.01M | void ScannerContext::return_free_block(BlockUPtr block) { |
236 | | // If under low memory mode, should not return the freeblock, it will occupy too much memory. |
237 | 1.01M | if (!_local_state->low_memory_mode() && block->mem_reuse() && |
238 | 1.01M | _block_memory_usage < _max_bytes_in_queue) { |
239 | 748k | size_t block_size_to_reuse = block->allocated_bytes(); |
240 | 748k | _block_memory_usage += block_size_to_reuse; |
241 | 748k | _scanner_memory_used_counter->set(_block_memory_usage); |
242 | 748k | block->clear_column_data(); |
243 | | // Free blocks is used to improve memory efficiency. Failure during pushing back |
244 | | // free block will not incur any bad result so just ignore the return value. |
245 | 748k | _free_blocks.enqueue(std::move(block)); |
246 | 748k | } |
247 | 1.01M | } |
248 | | |
249 | | Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task, |
250 | 979k | std::unique_lock<std::mutex>& /*transfer_lock*/) { |
251 | | // increase _num_finished_scanners no matter the scan_task is submitted successfully or not. |
252 | | // since if submit failed, it will be added back by ScannerContext::push_back_scan_task |
253 | | // and _num_finished_scanners will be reduced. |
254 | | // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task |
255 | | // see ScannerScheduler::_scanner_scan. |
256 | 979k | _num_scheduled_scanners++; |
257 | 979k | return _scanner_scheduler->submit(shared_from_this(), scan_task); |
258 | 979k | } |
259 | | |
260 | 460 | void ScannerContext::clear_free_blocks() { |
261 | 460 | clear_blocks(_free_blocks); |
262 | 460 | } |
263 | | |
264 | 976k | void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) { |
265 | 976k | if (scan_task->status_ok()) { |
266 | 986k | for (const auto& [block, _] : scan_task->cached_blocks) { |
267 | 986k | if (block->rows() > 0) { |
268 | 230k | Status st = validate_block_schema(block.get()); |
269 | 230k | if (!st.ok()) { |
270 | 0 | scan_task->set_status(st); |
271 | 0 | break; |
272 | 0 | } |
273 | 230k | } |
274 | 986k | } |
275 | 974k | } |
276 | | |
277 | 976k | std::lock_guard<std::mutex> l(_transfer_lock); |
278 | 976k | if (!scan_task->status_ok()) { |
279 | 1.35k | _process_status = scan_task->get_status(); |
280 | 1.35k | } |
281 | 976k | _tasks_queue.push_back(scan_task); |
282 | 976k | _num_scheduled_scanners--; |
283 | | |
284 | 976k | _dependency->set_ready(); |
285 | 976k | } |
286 | | |
287 | 990k | Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) { |
288 | 990k | if (state->is_cancelled()) { |
289 | 1 | _set_scanner_done(); |
290 | 1 | return state->cancel_reason(); |
291 | 1 | } |
292 | 990k | std::unique_lock l(_transfer_lock); |
293 | | |
294 | 990k | if (!_process_status.ok()) { |
295 | 1.16k | _set_scanner_done(); |
296 | 1.16k | return _process_status; |
297 | 1.16k | } |
298 | | |
299 | 989k | std::shared_ptr<ScanTask> scan_task = nullptr; |
300 | | |
301 | 989k | if (!_tasks_queue.empty() && !done()) { |
302 | | // https://en.cppreference.com/w/cpp/container/list/front |
303 | | // The behavior is undefined if the list is empty. |
304 | 989k | scan_task = _tasks_queue.front(); |
305 | 989k | } |
306 | | |
307 | 989k | if (scan_task != nullptr) { |
308 | | // The abnormal status of scanner may come from the execution of the scanner itself, |
309 | | // or come from the scanner scheduler, such as TooManyTasks. |
310 | 989k | if (!scan_task->status_ok()) { |
311 | | // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while. |
312 | 0 | _process_status = scan_task->get_status(); |
313 | 0 | _set_scanner_done(); |
314 | 0 | return _process_status; |
315 | 0 | } |
316 | | |
317 | | // No need to worry about small block, block is merged together when they are appended to cached_blocks. |
318 | 989k | if (!scan_task->cached_blocks.empty()) { |
319 | 987k | auto [current_block, block_size] = std::move(scan_task->cached_blocks.front()); |
320 | 987k | scan_task->cached_blocks.pop_front(); |
321 | 987k | _block_memory_usage -= block_size; |
322 | | // consume current block |
323 | 987k | block->swap(*current_block); |
324 | 987k | return_free_block(std::move(current_block)); |
325 | 987k | } |
326 | | |
327 | 989k | VLOG_DEBUG << fmt::format( |
328 | 321 | "ScannerContext {} get block from queue, task_queue size {}, current scan " |
329 | 321 | "task remaing cached_block size {}, eos {}, scheduled tasks {}", |
330 | 321 | ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(), scan_task->is_eos(), |
331 | 321 | _num_scheduled_scanners); |
332 | | |
333 | 989k | if (scan_task->cached_blocks.empty()) { |
334 | | // All Cached blocks are consumed, pop this task from task_queue. |
335 | 974k | if (!_tasks_queue.empty()) { |
336 | 974k | _tasks_queue.pop_front(); |
337 | 974k | } |
338 | | |
339 | 974k | if (scan_task->is_eos()) { |
340 | | // 1. if eos, record a finished scanner. |
341 | 969k | _num_finished_scanners++; |
342 | 969k | RETURN_IF_ERROR( |
343 | 969k | _scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
344 | 969k | } else { |
345 | 5.29k | RETURN_IF_ERROR( |
346 | 5.29k | _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l)); |
347 | 5.29k | } |
348 | 974k | } |
349 | 989k | } |
350 | | |
351 | | // Mark finished when either: |
352 | | // (1) all scanners completed normally, or |
353 | | // (2) shared limit exhausted and no scanners are still running. |
354 | 989k | if (_tasks_queue.empty() && (_num_finished_scanners == _all_scanners.size() || |
355 | 864k | (_shared_scan_limit->load(std::memory_order_acquire) == 0 && |
356 | 630k | _num_scheduled_scanners == 0))) { |
357 | 234k | _set_scanner_done(); |
358 | 234k | _is_finished = true; |
359 | 234k | } |
360 | | |
361 | 989k | *eos = done(); |
362 | | |
363 | 989k | if (_tasks_queue.empty()) { |
364 | 864k | _dependency->block(); |
365 | 864k | } |
366 | | |
367 | 989k | return Status::OK(); |
368 | 989k | } |
369 | | |
370 | 230k | Status ScannerContext::validate_block_schema(Block* block) { |
371 | 230k | size_t index = 0; |
372 | 784k | for (auto& slot : _output_tuple_desc->slots()) { |
373 | 784k | auto& data = block->get_by_position(index++); |
374 | 784k | if (data.column->is_nullable() != data.type->is_nullable()) { |
375 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
376 | 0 | "column(name: {}) nullable({}) does not match type nullable({}), slot(id: " |
377 | 0 | "{}, " |
378 | 0 | "name:{})", |
379 | 0 | data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(), |
380 | 0 | slot->col_name()); |
381 | 0 | } |
382 | | |
383 | 784k | if (data.column->is_nullable() != slot->is_nullable()) { |
384 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
385 | 0 | "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) " |
386 | 0 | "nullable({})", |
387 | 0 | data.name, data.column->is_nullable(), slot->id(), slot->col_name(), |
388 | 0 | slot->is_nullable()); |
389 | 0 | } |
390 | 784k | } |
391 | 230k | return Status::OK(); |
392 | 230k | } |
393 | | |
394 | 472k | void ScannerContext::stop_scanners(RuntimeState* state) { |
395 | 472k | std::lock_guard<std::mutex> l(_transfer_lock); |
396 | 472k | if (_should_stop) { |
397 | 234k | return; |
398 | 234k | } |
399 | 238k | _should_stop = true; |
400 | 238k | _set_scanner_done(); |
401 | 977k | for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) { |
402 | 977k | if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) { |
403 | 977k | sc->_scanner->try_stop(); |
404 | 977k | } |
405 | 977k | } |
406 | 238k | _tasks_queue.clear(); |
407 | 238k | if (_task_handle) { |
408 | 238k | if (auto* task_executor_scheduler = |
409 | 238k | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
410 | 238k | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
411 | 238k | } |
412 | 238k | _task_handle = nullptr; |
413 | 238k | } |
414 | | // TODO yiguolei, call mark close to scanners |
415 | 238k | if (state->enable_profile()) { |
416 | 1.37k | std::stringstream scanner_statistics; |
417 | 1.37k | std::stringstream scanner_rows_read; |
418 | 1.37k | std::stringstream scanner_wait_worker_time; |
419 | 1.37k | std::stringstream scanner_projection; |
420 | 1.37k | scanner_statistics << "["; |
421 | 1.37k | scanner_rows_read << "["; |
422 | 1.37k | scanner_wait_worker_time << "["; |
423 | 1.37k | scanner_projection << "["; |
424 | | // Scanners can in 3 state |
425 | | // state 1: in scanner context, not scheduled |
426 | | // state 2: in scanner worker pool's queue, scheduled but not running |
427 | | // state 3: scanner is running. |
428 | 2.81k | for (auto& scanner_ref : _all_scanners) { |
429 | 2.81k | auto scanner = scanner_ref.lock(); |
430 | 2.81k | if (scanner == nullptr) { |
431 | 0 | continue; |
432 | 0 | } |
433 | | // Add per scanner running time before close them |
434 | 2.81k | scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), |
435 | 2.81k | TUnit::TIME_NS) |
436 | 2.81k | << ", "; |
437 | 2.81k | scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(), |
438 | 2.81k | TUnit::TIME_NS) |
439 | 2.81k | << ", "; |
440 | 2.81k | scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), |
441 | 2.81k | TUnit::UNIT) |
442 | 2.81k | << ", "; |
443 | 2.81k | scanner_wait_worker_time |
444 | 2.81k | << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), |
445 | 2.81k | TUnit::TIME_NS) |
446 | 2.81k | << ", "; |
447 | | // since there are all scanners, some scanners is running, so that could not call scanner |
448 | | // close here. |
449 | 2.81k | } |
450 | 1.37k | scanner_statistics << "]"; |
451 | 1.37k | scanner_rows_read << "]"; |
452 | 1.37k | scanner_wait_worker_time << "]"; |
453 | 1.37k | scanner_projection << "]"; |
454 | 1.37k | _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); |
455 | 1.37k | _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); |
456 | 1.37k | _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); |
457 | 1.37k | _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str()); |
458 | 1.37k | } |
459 | 238k | } |
460 | | |
461 | 35 | std::string ScannerContext::debug_string() { |
462 | 35 | return fmt::format( |
463 | 35 | "id: {}, total scanners: {}, pending tasks: {}," |
464 | 35 | " _should_stop: {}, _is_finished: {}, free blocks: {}," |
465 | 35 | " limit: {}, remaining_limit: {}, _num_running_scanners: {}, _max_thread_num: {}," |
466 | 35 | " _max_bytes_in_queue: {}, query_id: {}", |
467 | 35 | ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished, |
468 | 35 | _free_blocks.size_approx(), limit, _shared_scan_limit->load(std::memory_order_relaxed), |
469 | 35 | _num_scheduled_scanners, _max_scan_concurrency, _max_bytes_in_queue, |
470 | 35 | print_id(_query_id)); |
471 | 35 | } |
472 | | |
473 | 473k | void ScannerContext::_set_scanner_done() { |
474 | 473k | _dependency->set_always_ready(); |
475 | 473k | } |
476 | | |
477 | 1.95M | void ScannerContext::update_peak_running_scanner(int num) { |
478 | 1.95M | _local_state->_peak_running_scanner->add(num); |
479 | 1.95M | } |
480 | | |
481 | | int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock, |
482 | 1.21M | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
483 | | // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks. |
484 | 1.21M | int32_t margin_1 = _min_scan_concurrency - |
485 | 1.21M | (cast_set<int32_t>(_tasks_queue.size()) + _num_scheduled_scanners); |
486 | | |
487 | | // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. |
488 | 1.21M | int32_t margin_2 = |
489 | 1.21M | _min_scan_concurrency_of_scan_scheduler - |
490 | 1.21M | (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); |
491 | | |
492 | 1.21M | if (margin_1 <= 0 && margin_2 <= 0) { |
493 | 8.43k | return 0; |
494 | 8.43k | } |
495 | | |
496 | 1.20M | int32_t margin = std::max(margin_1, margin_2); |
497 | | |
498 | 1.20M | if (low_memory_mode()) { |
499 | | // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`. |
500 | | // So that we will not submit too many scan tasks to scheduler. |
501 | 84 | margin = std::min(low_memory_mode_scanners() - _num_scheduled_scanners, margin); |
502 | 84 | } |
503 | | |
504 | 1.20M | VLOG_DEBUG << fmt::format( |
505 | 1 | "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " |
506 | 1 | "({} + {}), margin: {}", |
507 | 1 | print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _tasks_queue.size(), |
508 | 1 | _num_scheduled_scanners, margin_2, _min_scan_concurrency_of_scan_scheduler, |
509 | 1 | _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin); |
510 | | |
511 | 1.20M | return margin; |
512 | 1.21M | } |
513 | | |
514 | | // This function must be called with: |
515 | | // 1. _transfer_lock held. |
516 | | // 2. ScannerScheduler::_lock held. |
517 | | Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
518 | | std::unique_lock<std::mutex>& transfer_lock, |
519 | 1.21M | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
520 | 1.21M | if (current_scan_task && |
521 | 1.21M | (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) { |
522 | 1 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
523 | 1 | } |
524 | | |
525 | 1.21M | std::list<std::shared_ptr<ScanTask>> tasks_to_submit; |
526 | | |
527 | 1.21M | int32_t margin = _get_margin(transfer_lock, scheduler_lock); |
528 | | |
529 | | // margin is less than zero. Means this scan operator could not submit any scan task for now. |
530 | 1.21M | if (margin <= 0) { |
531 | | // Be careful with current scan task. |
532 | | // We need to add it back to task queue to make sure it could be resubmitted. |
533 | 8.43k | if (current_scan_task) { |
534 | | // This usually happens when we should downgrade the concurrency. |
535 | 173 | _pending_scanners.push(current_scan_task); |
536 | 173 | VLOG_DEBUG << fmt::format( |
537 | 0 | "{} push back scanner to task queue, because diff <= 0, task_queue size " |
538 | 0 | "{}, _num_scheduled_scanners {}", |
539 | 0 | ctx_id, _tasks_queue.size(), _num_scheduled_scanners); |
540 | 173 | } |
541 | | |
542 | 8.43k | #ifndef NDEBUG |
543 | | // This DCHECK is necessary. |
544 | | // We need to make sure each scan operator could have at least 1 scan tasks. |
545 | | // Or this scan operator will not be re-scheduled. |
546 | 8.43k | if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) { |
547 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
548 | 0 | } |
549 | 8.43k | #endif |
550 | | |
551 | 8.43k | return Status::OK(); |
552 | 8.43k | } |
553 | | |
554 | 1.20M | bool first_pull = true; |
555 | | |
556 | 2.18M | while (margin-- > 0) { |
557 | 1.78M | std::shared_ptr<ScanTask> task_to_run; |
558 | 1.78M | const int32_t current_concurrency = cast_set<int32_t>( |
559 | 1.78M | _tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size()); |
560 | 1.78M | VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id, |
561 | 2 | current_concurrency, _tasks_queue.size(), _num_scheduled_scanners, |
562 | 2 | tasks_to_submit.size()); |
563 | 1.78M | if (first_pull) { |
564 | 1.20M | task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency); |
565 | 1.20M | if (task_to_run == nullptr) { |
566 | | // In two situations we will get nullptr. |
567 | | // 1. current_concurrency already reached _max_scan_concurrency. |
568 | | // 2. all scanners are finished. |
569 | 424k | if (current_scan_task) { |
570 | 1 | DCHECK(current_scan_task->cached_blocks.empty()); |
571 | 1 | DCHECK(!current_scan_task->is_eos()); |
572 | 1 | if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { |
573 | | // This should not happen. |
574 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
575 | 0 | "Scanner scheduler logical error."); |
576 | 0 | } |
577 | | // Current scan task is not eos, but we can not resubmit it. |
578 | | // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future. |
579 | 1 | _pending_scanners.push(current_scan_task); |
580 | 1 | } |
581 | 424k | } |
582 | 1.20M | first_pull = false; |
583 | 1.20M | } else { |
584 | 583k | task_to_run = _pull_next_scan_task(nullptr, current_concurrency); |
585 | 583k | } |
586 | | |
587 | 1.78M | if (task_to_run) { |
588 | 979k | tasks_to_submit.push_back(task_to_run); |
589 | 979k | } else { |
590 | 808k | break; |
591 | 808k | } |
592 | 1.78M | } |
593 | | |
594 | 1.20M | if (tasks_to_submit.empty()) { |
595 | 424k | return Status::OK(); |
596 | 424k | } |
597 | | |
598 | 779k | VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}", |
599 | 0 | print_id(_query_id), ctx_id, tasks_to_submit.size(), |
600 | 0 | _pending_scanners.size()); |
601 | | |
602 | 979k | for (auto& scan_task_iter : tasks_to_submit) { |
603 | 979k | Status submit_status = submit_scan_task(scan_task_iter, transfer_lock); |
604 | 979k | if (!submit_status.ok()) { |
605 | 0 | _process_status = submit_status; |
606 | 0 | _set_scanner_done(); |
607 | 0 | return _process_status; |
608 | 0 | } |
609 | 979k | } |
610 | | |
611 | 779k | return Status::OK(); |
612 | 779k | } |
613 | | |
614 | | std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task( |
615 | 1.78M | std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) { |
616 | 1.78M | if (current_concurrency >= _max_scan_concurrency) { |
617 | 381k | VLOG_DEBUG << fmt::format( |
618 | 0 | "ScannerContext {} current concurrency {} >= _max_scan_concurrency {}, skip " |
619 | 0 | "pull", |
620 | 0 | ctx_id, current_concurrency, _max_scan_concurrency); |
621 | 381k | return nullptr; |
622 | 381k | } |
623 | | |
624 | 1.40M | if (current_scan_task != nullptr) { |
625 | 4.58k | if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { |
626 | | // This should not happen. |
627 | 2 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
628 | 2 | } |
629 | 4.58k | return current_scan_task; |
630 | 4.58k | } |
631 | | |
632 | 1.40M | if (!_pending_scanners.empty()) { |
633 | | // If shared limit quota is exhausted, do not submit new scanners from pending queue. |
634 | 974k | int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire); |
635 | 974k | if (remaining == 0) { |
636 | 79 | return nullptr; |
637 | 79 | } |
638 | 974k | std::shared_ptr<ScanTask> next_scan_task; |
639 | 974k | next_scan_task = _pending_scanners.top(); |
640 | 974k | _pending_scanners.pop(); |
641 | 974k | return next_scan_task; |
642 | 974k | } else { |
643 | 427k | return nullptr; |
644 | 427k | } |
645 | 1.40M | } |
646 | | |
647 | 5.20M | bool ScannerContext::low_memory_mode() const { |
648 | 5.20M | return _local_state->low_memory_mode(); |
649 | 5.20M | } |
650 | | #include "common/compile_check_end.h" |
651 | | } // namespace doris |