be/src/exec/scan/scanner_context.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/scanner_context.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <glog/logging.h> |
23 | | #include <zconf.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <ctime> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | #include <ostream> |
30 | | #include <shared_mutex> |
31 | | #include <tuple> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/config.h" |
35 | | #include "common/exception.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/metrics/doris_metrics.h" |
38 | | #include "common/status.h" |
39 | | #include "core/block/block.h" |
40 | | #include "exec/operator/scan_operator.h" |
41 | | #include "exec/scan/scan_node.h" |
42 | | #include "exec/scan/scanner_scheduler.h" |
43 | | #include "runtime/descriptors.h" |
44 | | #include "runtime/exec_env.h" |
45 | | #include "runtime/runtime_profile.h" |
46 | | #include "runtime/runtime_state.h" |
47 | | #include "storage/tablet/tablet.h" |
48 | | #include "util/time.h" |
49 | | #include "util/uid_util.h" |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | using namespace std::chrono_literals; |
54 | | #include "common/compile_check_begin.h" |
55 | | ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, |
56 | | const TupleDescriptor* output_tuple_desc, |
57 | | const RowDescriptor* output_row_descriptor, |
58 | | const std::list<std::shared_ptr<ScannerDelegate>>& scanners, |
59 | | int64_t limit_, std::shared_ptr<Dependency> dependency, |
60 | | std::atomic<int64_t>* shared_scan_limit |
61 | | #ifdef BE_TEST |
62 | | , |
63 | | int num_parallel_instances |
64 | | #endif |
65 | | ) |
66 | 14 | : HasTaskExecutionCtx(state), |
67 | 14 | _state(state), |
68 | 14 | _local_state(local_state), |
69 | 14 | _output_tuple_desc(output_row_descriptor |
70 | 14 | ? output_row_descriptor->tuple_descriptors().front() |
71 | 14 | : output_tuple_desc), |
72 | 14 | _output_row_descriptor(output_row_descriptor), |
73 | 14 | _batch_size(state->batch_size()), |
74 | 14 | limit(limit_), |
75 | 14 | _shared_scan_limit(shared_scan_limit), |
76 | 14 | _all_scanners(scanners.begin(), scanners.end()), |
77 | | #ifndef BE_TEST |
78 | | _scanner_scheduler(local_state->scan_scheduler(state)), |
79 | | _min_scan_concurrency_of_scan_scheduler( |
80 | | _scanner_scheduler->get_min_active_scan_threads()), |
81 | | _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state), |
82 | | cast_set<int>(scanners.size()))), |
83 | | #else |
84 | 14 | _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()), |
85 | 14 | _min_scan_concurrency_of_scan_scheduler(0), |
86 | 14 | _max_scan_concurrency(num_parallel_instances), |
87 | | #endif |
88 | 14 | _min_scan_concurrency(local_state->min_scanners_concurrency(state)) { |
89 | 14 | DCHECK(_state != nullptr); |
90 | 14 | DCHECK(_output_row_descriptor == nullptr || |
91 | 14 | _output_row_descriptor->tuple_descriptors().size() == 1); |
92 | 14 | _query_id = _state->get_query_ctx()->query_id(); |
93 | 14 | _resource_ctx = _state->get_query_ctx()->resource_ctx(); |
94 | 14 | ctx_id = UniqueId::gen_uid().to_string(); |
95 | 151 | for (auto& scanner : _all_scanners) { |
96 | 151 | _pending_scanners.push(std::make_shared<ScanTask>(scanner)); |
97 | 151 | }; |
98 | 14 | if (limit < 0) { |
99 | 0 | limit = -1; |
100 | 0 | } |
101 | 14 | _dependency = dependency; |
102 | 14 | DorisMetrics::instance()->scanner_ctx_cnt->increment(1); |
103 | 14 | if (auto ctx = task_exec_ctx(); ctx) { |
104 | 0 | ctx->ref_task_execution_ctx(); |
105 | 0 | } |
106 | 14 | } |
107 | | |
108 | 0 | int64_t ScannerContext::acquire_limit_quota(int64_t desired) { |
109 | 0 | DCHECK(desired > 0); |
110 | 0 | int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire); |
111 | 0 | while (true) { |
112 | 0 | if (remaining < 0) { |
113 | | // No limit set, grant all desired rows. |
114 | 0 | return desired; |
115 | 0 | } |
116 | 0 | if (remaining == 0) { |
117 | 0 | return 0; |
118 | 0 | } |
119 | 0 | int64_t granted = std::min(desired, remaining); |
120 | 0 | if (_shared_scan_limit->compare_exchange_weak(remaining, remaining - granted, |
121 | 0 | std::memory_order_acq_rel, |
122 | 0 | std::memory_order_acquire)) { |
123 | 0 | return granted; |
124 | 0 | } |
125 | | // CAS failed, `remaining` is updated to current value, retry. |
126 | 0 | } |
127 | 0 | } |
128 | | |
129 | | // After init function call, should not access _parent |
130 | 6 | Status ScannerContext::init() { |
131 | | #ifndef BE_TEST |
132 | | _scanner_profile = _local_state->_scanner_profile; |
133 | | _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; |
134 | | _scanner_memory_used_counter = _local_state->_memory_used_counter; |
135 | | |
136 | | // 3. get thread token |
137 | | if (!_state->get_query_ctx()) { |
138 | | return Status::InternalError("Query context of {} is not set", |
139 | | print_id(_state->query_id())); |
140 | | } |
141 | | |
142 | | if (_state->get_query_ctx()->get_scan_scheduler()) { |
143 | | _should_reset_thread_name = false; |
144 | | } |
145 | | |
146 | | auto scanner = _all_scanners.front().lock(); |
147 | | DCHECK(scanner != nullptr); |
148 | | |
149 | | if (auto* task_executor_scheduler = |
150 | | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
151 | | std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor(); |
152 | | TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id)); |
153 | | _task_handle = DORIS_TRY(task_executor->create_task( |
154 | | task_id, []() { return 0.0; }, |
155 | | config::task_executor_initial_max_concurrency_per_task > 0 |
156 | | ? config::task_executor_initial_max_concurrency_per_task |
157 | | : std::max(48, CpuInfo::num_cores() * 2), |
158 | | std::chrono::milliseconds(100), std::nullopt)); |
159 | | } |
160 | | #endif |
161 | | // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator. |
162 | | // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value |
163 | | // is larger than 10MB. |
164 | 6 | _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); |
165 | | |
166 | | // Provide more memory for wide tables, increase proportionally by multiples of 300 |
167 | 6 | _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; |
168 | | |
169 | 6 | if (_all_scanners.empty()) { |
170 | 0 | _is_finished = true; |
171 | 0 | _set_scanner_done(); |
172 | 0 | } |
173 | | |
174 | | // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. |
175 | | // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. |
176 | | // you can refer https://github.com/apache/doris/issues/35340 for details. |
177 | 6 | const int32_t max_column_reader_num = _state->max_column_reader_num(); |
178 | | |
179 | 6 | if (_max_scan_concurrency != 1 && max_column_reader_num > 0) { |
180 | 0 | int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size()); |
181 | 0 | int32_t current_column_num = scan_column_num * _max_scan_concurrency; |
182 | 0 | if (current_column_num > max_column_reader_num) { |
183 | 0 | int32_t new_max_thread_num = max_column_reader_num / scan_column_num; |
184 | 0 | new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; |
185 | 0 | if (new_max_thread_num < _max_scan_concurrency) { |
186 | 0 | int32_t origin_max_thread_num = _max_scan_concurrency; |
187 | 0 | _max_scan_concurrency = new_max_thread_num; |
188 | 0 | LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) |
189 | 0 | << " scan's max_thread_num from " << origin_max_thread_num << " to " |
190 | 0 | << _max_scan_concurrency << ",column num: " << scan_column_num |
191 | 0 | << ", max_column_reader_num: " << max_column_reader_num; |
192 | 0 | } |
193 | 0 | } |
194 | 0 | } |
195 | | |
196 | 6 | COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency); |
197 | 6 | COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency); |
198 | | |
199 | 6 | std::unique_lock<std::mutex> l(_transfer_lock); |
200 | 6 | RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
201 | | |
202 | 6 | return Status::OK(); |
203 | 6 | } |
204 | | |
205 | 14 | ScannerContext::~ScannerContext() { |
206 | 14 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); |
207 | 14 | _tasks_queue.clear(); |
208 | 14 | BlockUPtr block; |
209 | 15 | while (_free_blocks.try_dequeue(block)) { |
210 | | // do nothing |
211 | 1 | } |
212 | 14 | block.reset(); |
213 | 14 | DorisMetrics::instance()->scanner_ctx_cnt->increment(-1); |
214 | 14 | if (_task_handle) { |
215 | 0 | if (auto* task_executor_scheduler = |
216 | 0 | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
217 | 0 | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
218 | 0 | } |
219 | 0 | _task_handle = nullptr; |
220 | 0 | } |
221 | 14 | if (auto ctx = task_exec_ctx(); ctx) { |
222 | 0 | ctx->unref_task_execution_ctx(); |
223 | 0 | } |
224 | 14 | } |
225 | | |
226 | 3 | BlockUPtr ScannerContext::get_free_block(bool force) { |
227 | 3 | BlockUPtr block = nullptr; |
228 | 3 | if (_free_blocks.try_dequeue(block)) { |
229 | 1 | DCHECK(block->mem_reuse()); |
230 | 1 | _block_memory_usage -= block->allocated_bytes(); |
231 | 1 | _scanner_memory_used_counter->set(_block_memory_usage); |
232 | | // A free block is reused, so the memory usage should be decreased |
233 | | // The caller of get_free_block will increase the memory usage |
234 | 2 | } else if (_block_memory_usage < _max_bytes_in_queue || force) { |
235 | 2 | _newly_create_free_blocks_num->update(1); |
236 | 2 | block = Block::create_unique(_output_tuple_desc->slots(), 0); |
237 | 2 | } |
238 | 3 | return block; |
239 | 3 | } |
240 | | |
241 | 1 | void ScannerContext::return_free_block(BlockUPtr block) { |
242 | | // If under low memory mode, should not return the freeblock, it will occupy too much memory. |
243 | 1 | if (!_local_state->low_memory_mode() && block->mem_reuse() && |
244 | 1 | _block_memory_usage < _max_bytes_in_queue) { |
245 | 1 | size_t block_size_to_reuse = block->allocated_bytes(); |
246 | 1 | _block_memory_usage += block_size_to_reuse; |
247 | 1 | _scanner_memory_used_counter->set(_block_memory_usage); |
248 | 1 | block->clear_column_data(); |
249 | | // Free blocks is used to improve memory efficiency. Failure during pushing back |
250 | | // free block will not incur any bad result so just ignore the return value. |
251 | 1 | _free_blocks.enqueue(std::move(block)); |
252 | 1 | } |
253 | 1 | } |
254 | | |
255 | | Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task, |
256 | 18 | std::unique_lock<std::mutex>& /*transfer_lock*/) { |
257 | | // increase _num_finished_scanners no matter the scan_task is submitted successfully or not. |
258 | | // since if submit failed, it will be added back by ScannerContext::push_back_scan_task |
259 | | // and _num_finished_scanners will be reduced. |
260 | | // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task |
261 | | // see ScannerScheduler::_scanner_scan. |
262 | 18 | _num_scheduled_scanners++; |
263 | 18 | return _scanner_scheduler->submit(shared_from_this(), scan_task); |
264 | 18 | } |
265 | | |
266 | 0 | void ScannerContext::clear_free_blocks() { |
267 | 0 | clear_blocks(_free_blocks); |
268 | 0 | } |
269 | | |
270 | 5 | void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) { |
271 | 5 | if (scan_task->status_ok()) { |
272 | 5 | for (const auto& [block, _] : scan_task->cached_blocks) { |
273 | 0 | if (block->rows() > 0) { |
274 | 0 | Status st = validate_block_schema(block.get()); |
275 | 0 | if (!st.ok()) { |
276 | 0 | scan_task->set_status(st); |
277 | 0 | break; |
278 | 0 | } |
279 | 0 | } |
280 | 0 | } |
281 | 5 | } |
282 | | |
283 | 5 | std::lock_guard<std::mutex> l(_transfer_lock); |
284 | 5 | if (!scan_task->status_ok()) { |
285 | 0 | _process_status = scan_task->get_status(); |
286 | 0 | } |
287 | 5 | _tasks_queue.push_back(scan_task); |
288 | 5 | _num_scheduled_scanners--; |
289 | | |
290 | 5 | _dependency->set_ready(); |
291 | 5 | } |
292 | | |
293 | 3 | Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) { |
294 | 3 | if (state->is_cancelled()) { |
295 | 1 | _set_scanner_done(); |
296 | 1 | return state->cancel_reason(); |
297 | 1 | } |
298 | 2 | std::unique_lock l(_transfer_lock); |
299 | | |
300 | 2 | if (!_process_status.ok()) { |
301 | 1 | _set_scanner_done(); |
302 | 1 | return _process_status; |
303 | 1 | } |
304 | | |
305 | 1 | std::shared_ptr<ScanTask> scan_task = nullptr; |
306 | | |
307 | 1 | if (!_tasks_queue.empty() && !done()) { |
308 | | // https://en.cppreference.com/w/cpp/container/list/front |
309 | | // The behavior is undefined if the list is empty. |
310 | 1 | scan_task = _tasks_queue.front(); |
311 | 1 | } |
312 | | |
313 | 1 | if (scan_task != nullptr) { |
314 | | // The abnormal status of scanner may come from the execution of the scanner itself, |
315 | | // or come from the scanner scheduler, such as TooManyTasks. |
316 | 1 | if (!scan_task->status_ok()) { |
317 | | // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while. |
318 | 0 | _process_status = scan_task->get_status(); |
319 | 0 | _set_scanner_done(); |
320 | 0 | return _process_status; |
321 | 0 | } |
322 | | |
323 | | // No need to worry about small block, block is merged together when they are appended to cached_blocks. |
324 | 1 | if (!scan_task->cached_blocks.empty()) { |
325 | 0 | auto [current_block, block_size] = std::move(scan_task->cached_blocks.front()); |
326 | 0 | scan_task->cached_blocks.pop_front(); |
327 | 0 | _block_memory_usage -= block_size; |
328 | | // consume current block |
329 | 0 | block->swap(*current_block); |
330 | 0 | return_free_block(std::move(current_block)); |
331 | 0 | } |
332 | | |
333 | 1 | VLOG_DEBUG << fmt::format( |
334 | 0 | "ScannerContext {} get block from queue, task_queue size {}, current scan " |
335 | 0 | "task remaing cached_block size {}, eos {}, scheduled tasks {}", |
336 | 0 | ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(), scan_task->is_eos(), |
337 | 0 | _num_scheduled_scanners); |
338 | | |
339 | 1 | if (scan_task->cached_blocks.empty()) { |
340 | | // All Cached blocks are consumed, pop this task from task_queue. |
341 | 1 | if (!_tasks_queue.empty()) { |
342 | 1 | _tasks_queue.pop_front(); |
343 | 1 | } |
344 | | |
345 | 1 | if (scan_task->is_eos()) { |
346 | | // 1. if eos, record a finished scanner. |
347 | 1 | _num_finished_scanners++; |
348 | 1 | RETURN_IF_ERROR( |
349 | 1 | _scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
350 | 1 | } else { |
351 | 0 | RETURN_IF_ERROR( |
352 | 0 | _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l)); |
353 | 0 | } |
354 | 1 | } |
355 | 1 | } |
356 | | |
357 | | // Mark finished when either: |
358 | | // (1) all scanners completed normally, or |
359 | | // (2) shared limit exhausted and no scanners are still running. |
360 | 1 | if (_tasks_queue.empty() && (_num_finished_scanners == _all_scanners.size() || |
361 | 1 | (_shared_scan_limit->load(std::memory_order_acquire) == 0 && |
362 | 1 | _num_scheduled_scanners == 0))) { |
363 | 0 | _set_scanner_done(); |
364 | 0 | _is_finished = true; |
365 | 0 | } |
366 | | |
367 | 1 | *eos = done(); |
368 | | |
369 | 1 | if (_tasks_queue.empty()) { |
370 | 1 | _dependency->block(); |
371 | 1 | } |
372 | | |
373 | 1 | return Status::OK(); |
374 | 1 | } |
375 | | |
376 | 0 | Status ScannerContext::validate_block_schema(Block* block) { |
377 | 0 | size_t index = 0; |
378 | 0 | for (auto& slot : _output_tuple_desc->slots()) { |
379 | 0 | auto& data = block->get_by_position(index++); |
380 | 0 | if (data.column->is_nullable() != data.type->is_nullable()) { |
381 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
382 | 0 | "column(name: {}) nullable({}) does not match type nullable({}), slot(id: " |
383 | 0 | "{}, " |
384 | 0 | "name:{})", |
385 | 0 | data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(), |
386 | 0 | slot->col_name()); |
387 | 0 | } |
388 | | |
389 | 0 | if (data.column->is_nullable() != slot->is_nullable()) { |
390 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
391 | 0 | "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) " |
392 | 0 | "nullable({})", |
393 | 0 | data.name, data.column->is_nullable(), slot->id(), slot->col_name(), |
394 | 0 | slot->is_nullable()); |
395 | 0 | } |
396 | 0 | } |
397 | 0 | return Status::OK(); |
398 | 0 | } |
399 | | |
400 | 0 | void ScannerContext::stop_scanners(RuntimeState* state) { |
401 | 0 | std::lock_guard<std::mutex> l(_transfer_lock); |
402 | 0 | if (_should_stop) { |
403 | 0 | return; |
404 | 0 | } |
405 | 0 | _should_stop = true; |
406 | 0 | _set_scanner_done(); |
407 | 0 | for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) { |
408 | 0 | if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) { |
409 | 0 | sc->_scanner->try_stop(); |
410 | 0 | } |
411 | 0 | } |
412 | 0 | _tasks_queue.clear(); |
413 | 0 | if (_task_handle) { |
414 | 0 | if (auto* task_executor_scheduler = |
415 | 0 | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
416 | 0 | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
417 | 0 | } |
418 | 0 | _task_handle = nullptr; |
419 | 0 | } |
420 | | // TODO yiguolei, call mark close to scanners |
421 | 0 | if (state->enable_profile()) { |
422 | 0 | std::stringstream scanner_statistics; |
423 | 0 | std::stringstream scanner_rows_read; |
424 | 0 | std::stringstream scanner_wait_worker_time; |
425 | 0 | std::stringstream scanner_projection; |
426 | 0 | scanner_statistics << "["; |
427 | 0 | scanner_rows_read << "["; |
428 | 0 | scanner_wait_worker_time << "["; |
429 | 0 | scanner_projection << "["; |
430 | | // Scanners can in 3 state |
431 | | // state 1: in scanner context, not scheduled |
432 | | // state 2: in scanner worker pool's queue, scheduled but not running |
433 | | // state 3: scanner is running. |
434 | 0 | for (auto& scanner_ref : _all_scanners) { |
435 | 0 | auto scanner = scanner_ref.lock(); |
436 | 0 | if (scanner == nullptr) { |
437 | 0 | continue; |
438 | 0 | } |
439 | | // Add per scanner running time before close them |
440 | 0 | scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), |
441 | 0 | TUnit::TIME_NS) |
442 | 0 | << ", "; |
443 | 0 | scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(), |
444 | 0 | TUnit::TIME_NS) |
445 | 0 | << ", "; |
446 | 0 | scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), |
447 | 0 | TUnit::UNIT) |
448 | 0 | << ", "; |
449 | 0 | scanner_wait_worker_time |
450 | 0 | << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), |
451 | 0 | TUnit::TIME_NS) |
452 | 0 | << ", "; |
453 | | // since there are all scanners, some scanners is running, so that could not call scanner |
454 | | // close here. |
455 | 0 | } |
456 | 0 | scanner_statistics << "]"; |
457 | 0 | scanner_rows_read << "]"; |
458 | 0 | scanner_wait_worker_time << "]"; |
459 | 0 | scanner_projection << "]"; |
460 | 0 | _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); |
461 | 0 | _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); |
462 | 0 | _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); |
463 | 0 | _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str()); |
464 | 0 | } |
465 | 0 | } |
466 | | |
467 | 18 | std::string ScannerContext::debug_string() { |
468 | 18 | return fmt::format( |
469 | 18 | "id: {}, total scanners: {}, pending tasks: {}," |
470 | 18 | " _should_stop: {}, _is_finished: {}, free blocks: {}," |
471 | 18 | " limit: {}, remaining_limit: {}, _num_running_scanners: {}, _max_thread_num: {}," |
472 | 18 | " _max_bytes_in_queue: {}, query_id: {}", |
473 | 18 | ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished, |
474 | 18 | _free_blocks.size_approx(), limit, _shared_scan_limit->load(std::memory_order_relaxed), |
475 | 18 | _num_scheduled_scanners, _max_scan_concurrency, _max_bytes_in_queue, |
476 | 18 | print_id(_query_id)); |
477 | 18 | } |
478 | | |
479 | 2 | void ScannerContext::_set_scanner_done() { |
480 | 2 | _dependency->set_always_ready(); |
481 | 2 | } |
482 | | |
483 | 0 | void ScannerContext::update_peak_running_scanner(int num) { |
484 | 0 | _local_state->_peak_running_scanner->add(num); |
485 | 0 | } |
486 | | |
487 | | int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock, |
488 | 12 | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
489 | | // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks. |
490 | 12 | int32_t margin_1 = _min_scan_concurrency - |
491 | 12 | (cast_set<int32_t>(_tasks_queue.size()) + _num_scheduled_scanners); |
492 | | |
493 | | // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. |
494 | 12 | int32_t margin_2 = |
495 | 12 | _min_scan_concurrency_of_scan_scheduler - |
496 | 12 | (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); |
497 | | |
498 | 12 | if (margin_1 <= 0 && margin_2 <= 0) { |
499 | 1 | return 0; |
500 | 1 | } |
501 | | |
502 | 11 | int32_t margin = std::max(margin_1, margin_2); |
503 | | |
504 | 11 | if (low_memory_mode()) { |
505 | | // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`. |
506 | | // So that we will not submit too many scan tasks to scheduler. |
507 | 0 | margin = std::min(low_memory_mode_scanners() - _num_scheduled_scanners, margin); |
508 | 0 | } |
509 | | |
510 | 11 | VLOG_DEBUG << fmt::format( |
511 | 0 | "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " |
512 | 0 | "({} + {}), margin: {}", |
513 | 0 | print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _tasks_queue.size(), |
514 | 0 | _num_scheduled_scanners, margin_2, _min_scan_concurrency_of_scan_scheduler, |
515 | 0 | _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin); |
516 | | |
517 | 11 | return margin; |
518 | 12 | } |
519 | | |
520 | | // This function must be called with: |
521 | | // 1. _transfer_lock held. |
522 | | // 2. ScannerScheduler::_lock held. |
523 | | Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
524 | | std::unique_lock<std::mutex>& transfer_lock, |
525 | 7 | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
526 | 7 | if (current_scan_task && |
527 | 7 | (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) { |
528 | 1 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
529 | 1 | } |
530 | | |
531 | 6 | std::list<std::shared_ptr<ScanTask>> tasks_to_submit; |
532 | | |
533 | 6 | int32_t margin = _get_margin(transfer_lock, scheduler_lock); |
534 | | |
535 | | // margin is less than zero. Means this scan operator could not submit any scan task for now. |
536 | 6 | if (margin <= 0) { |
537 | | // Be careful with current scan task. |
538 | | // We need to add it back to task queue to make sure it could be resubmitted. |
539 | 0 | if (current_scan_task) { |
540 | | // This usually happens when we should downgrade the concurrency. |
541 | 0 | _pending_scanners.push(current_scan_task); |
542 | 0 | VLOG_DEBUG << fmt::format( |
543 | 0 | "{} push back scanner to task queue, because diff <= 0, task_queue size " |
544 | 0 | "{}, _num_scheduled_scanners {}", |
545 | 0 | ctx_id, _tasks_queue.size(), _num_scheduled_scanners); |
546 | 0 | } |
547 | |
|
548 | 0 | #ifndef NDEBUG |
549 | | // This DCHECK is necessary. |
550 | | // We need to make sure each scan operator could have at least 1 scan tasks. |
551 | | // Or this scan operator will not be re-scheduled. |
552 | 0 | if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) { |
553 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
554 | 0 | } |
555 | 0 | #endif |
556 | | |
557 | 0 | return Status::OK(); |
558 | 0 | } |
559 | | |
560 | 6 | bool first_pull = true; |
561 | | |
562 | 24 | while (margin-- > 0) { |
563 | 24 | std::shared_ptr<ScanTask> task_to_run; |
564 | 24 | const int32_t current_concurrency = cast_set<int32_t>( |
565 | 24 | _tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size()); |
566 | 24 | VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id, |
567 | 0 | current_concurrency, _tasks_queue.size(), _num_scheduled_scanners, |
568 | 0 | tasks_to_submit.size()); |
569 | 24 | if (first_pull) { |
570 | 6 | task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency); |
571 | 6 | if (task_to_run == nullptr) { |
572 | | // In two situations we will get nullptr. |
573 | | // 1. current_concurrency already reached _max_scan_concurrency. |
574 | | // 2. all scanners are finished. |
575 | 2 | if (current_scan_task) { |
576 | 1 | DCHECK(current_scan_task->cached_blocks.empty()); |
577 | 1 | DCHECK(!current_scan_task->is_eos()); |
578 | 1 | if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { |
579 | | // This should not happen. |
580 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
581 | 0 | "Scanner scheduler logical error."); |
582 | 0 | } |
583 | | // Current scan task is not eos, but we can not resubmit it. |
584 | | // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future. |
585 | 1 | _pending_scanners.push(current_scan_task); |
586 | 1 | } |
587 | 2 | } |
588 | 6 | first_pull = false; |
589 | 18 | } else { |
590 | 18 | task_to_run = _pull_next_scan_task(nullptr, current_concurrency); |
591 | 18 | } |
592 | | |
593 | 24 | if (task_to_run) { |
594 | 18 | tasks_to_submit.push_back(task_to_run); |
595 | 18 | } else { |
596 | 6 | break; |
597 | 6 | } |
598 | 24 | } |
599 | | |
600 | 6 | if (tasks_to_submit.empty()) { |
601 | 2 | return Status::OK(); |
602 | 2 | } |
603 | | |
604 | 4 | VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}", |
605 | 0 | print_id(_query_id), ctx_id, tasks_to_submit.size(), |
606 | 0 | _pending_scanners.size()); |
607 | | |
608 | 18 | for (auto& scan_task_iter : tasks_to_submit) { |
609 | 18 | Status submit_status = submit_scan_task(scan_task_iter, transfer_lock); |
610 | 18 | if (!submit_status.ok()) { |
611 | 0 | _process_status = submit_status; |
612 | 0 | _set_scanner_done(); |
613 | 0 | return _process_status; |
614 | 0 | } |
615 | 18 | } |
616 | | |
617 | 4 | return Status::OK(); |
618 | 4 | } |
619 | | |
620 | | std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task( |
621 | 31 | std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) { |
622 | 31 | if (current_concurrency >= _max_scan_concurrency) { |
623 | 7 | VLOG_DEBUG << fmt::format( |
624 | 0 | "ScannerContext {} current concurrency {} >= _max_scan_concurrency {}, skip " |
625 | 0 | "pull", |
626 | 0 | ctx_id, current_concurrency, _max_scan_concurrency); |
627 | 7 | return nullptr; |
628 | 7 | } |
629 | | |
630 | 24 | if (current_scan_task != nullptr) { |
631 | 3 | if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { |
632 | | // This should not happen. |
633 | 2 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
634 | 2 | } |
635 | 1 | return current_scan_task; |
636 | 3 | } |
637 | | |
638 | 21 | if (!_pending_scanners.empty()) { |
639 | | // If shared limit quota is exhausted, do not submit new scanners from pending queue. |
640 | 19 | int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire); |
641 | 19 | if (remaining == 0) { |
642 | 0 | return nullptr; |
643 | 0 | } |
644 | 19 | std::shared_ptr<ScanTask> next_scan_task; |
645 | 19 | next_scan_task = _pending_scanners.top(); |
646 | 19 | _pending_scanners.pop(); |
647 | 19 | return next_scan_task; |
648 | 19 | } else { |
649 | 2 | return nullptr; |
650 | 2 | } |
651 | 21 | } |
652 | | |
653 | 11 | bool ScannerContext::low_memory_mode() const { |
654 | 11 | return _local_state->low_memory_mode(); |
655 | 11 | } |
656 | | #include "common/compile_check_end.h" |
657 | | } // namespace doris |