be/src/exec/scan/scanner_context.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/scanner_context.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <glog/logging.h> |
23 | | #include <zconf.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <ctime> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | #include <ostream> |
30 | | #include <shared_mutex> |
31 | | #include <tuple> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/config.h" |
35 | | #include "common/exception.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/metrics/doris_metrics.h" |
38 | | #include "common/status.h" |
39 | | #include "core/block/block.h" |
40 | | #include "exec/operator/scan_operator.h" |
41 | | #include "exec/scan/scan_node.h" |
42 | | #include "exec/scan/scanner_scheduler.h" |
43 | | #include "runtime/descriptors.h" |
44 | | #include "runtime/exec_env.h" |
45 | | #include "runtime/runtime_profile.h" |
46 | | #include "runtime/runtime_state.h" |
47 | | #include "storage/tablet/tablet.h" |
48 | | #include "util/time.h" |
49 | | #include "util/uid_util.h" |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | using namespace std::chrono_literals; |
54 | | #include "common/compile_check_begin.h" |
55 | | ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, |
56 | | const TupleDescriptor* output_tuple_desc, |
57 | | const RowDescriptor* output_row_descriptor, |
58 | | const std::list<std::shared_ptr<ScannerDelegate>>& scanners, |
59 | | int64_t limit_, std::shared_ptr<Dependency> dependency |
60 | | #ifdef BE_TEST |
61 | | , |
62 | | int num_parallel_instances |
63 | | #endif |
64 | | ) |
65 | 230k | : HasTaskExecutionCtx(state), |
66 | 230k | _state(state), |
67 | 230k | _local_state(local_state), |
68 | 230k | _output_tuple_desc(output_row_descriptor |
69 | 230k | ? output_row_descriptor->tuple_descriptors().front() |
70 | 230k | : output_tuple_desc), |
71 | 230k | _output_row_descriptor(output_row_descriptor), |
72 | 230k | _batch_size(state->batch_size()), |
73 | 230k | limit(limit_), |
74 | 230k | _all_scanners(scanners.begin(), scanners.end()), |
75 | | #ifndef BE_TEST |
76 | 230k | _scanner_scheduler(local_state->scan_scheduler(state)), |
77 | | _min_scan_concurrency_of_scan_scheduler( |
78 | 230k | _scanner_scheduler->get_min_active_scan_threads()), |
79 | 230k | _max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state), |
80 | 230k | cast_set<int>(scanners.size()))), |
81 | | #else |
82 | | _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()), |
83 | | _min_scan_concurrency_of_scan_scheduler(0), |
84 | | _max_scan_concurrency(num_parallel_instances), |
85 | | #endif |
86 | 230k | _min_scan_concurrency(local_state->min_scanners_concurrency(state)) { |
87 | 230k | DCHECK(_state != nullptr); |
88 | 230k | DCHECK(_output_row_descriptor == nullptr || |
89 | 230k | _output_row_descriptor->tuple_descriptors().size() == 1); |
90 | 230k | _query_id = _state->get_query_ctx()->query_id(); |
91 | 230k | _resource_ctx = _state->get_query_ctx()->resource_ctx(); |
92 | 230k | ctx_id = UniqueId::gen_uid().to_string(); |
93 | 1.02M | for (auto& scanner : _all_scanners) { |
94 | 1.02M | _pending_scanners.push(std::make_shared<ScanTask>(scanner)); |
95 | 1.02M | }; |
96 | 230k | if (limit < 0) { |
97 | 230k | limit = -1; |
98 | 230k | } |
99 | 230k | _dependency = dependency; |
100 | 230k | DorisMetrics::instance()->scanner_ctx_cnt->increment(1); |
101 | 231k | if (auto ctx = task_exec_ctx(); ctx) { |
102 | 231k | ctx->ref_task_execution_ctx(); |
103 | 231k | } |
104 | 230k | } |
105 | | |
106 | | // After init function call, should not access _parent |
107 | 230k | Status ScannerContext::init() { |
108 | 230k | #ifndef BE_TEST |
109 | 230k | _scanner_profile = _local_state->_scanner_profile; |
110 | 230k | _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; |
111 | 230k | _scanner_memory_used_counter = _local_state->_memory_used_counter; |
112 | | |
113 | | // 3. get thread token |
114 | 230k | if (!_state->get_query_ctx()) { |
115 | 0 | return Status::InternalError("Query context of {} is not set", |
116 | 0 | print_id(_state->query_id())); |
117 | 0 | } |
118 | | |
119 | 231k | if (_state->get_query_ctx()->get_scan_scheduler()) { |
120 | 231k | _should_reset_thread_name = false; |
121 | 231k | } |
122 | | |
123 | 230k | auto scanner = _all_scanners.front().lock(); |
124 | 230k | DCHECK(scanner != nullptr); |
125 | | |
126 | 230k | if (auto* task_executor_scheduler = |
127 | 231k | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
128 | 231k | std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor(); |
129 | 231k | TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id)); |
130 | 231k | _task_handle = DORIS_TRY(task_executor->create_task( |
131 | 231k | task_id, []() { return 0.0; }, |
132 | 231k | config::task_executor_initial_max_concurrency_per_task > 0 |
133 | 231k | ? config::task_executor_initial_max_concurrency_per_task |
134 | 231k | : std::max(48, CpuInfo::num_cores() * 2), |
135 | 231k | std::chrono::milliseconds(100), std::nullopt)); |
136 | 231k | } |
137 | 230k | #endif |
138 | | // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator. |
139 | | // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value |
140 | | // is larger than 10MB. |
141 | 230k | _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); |
142 | | |
143 | | // Provide more memory for wide tables, increase proportionally by multiples of 300 |
144 | 230k | _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; |
145 | | |
146 | 230k | if (_all_scanners.empty()) { |
147 | 0 | _is_finished = true; |
148 | 0 | _set_scanner_done(); |
149 | 0 | } |
150 | | |
151 | | // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. |
152 | | // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. |
153 | | // you can refer https://github.com/apache/doris/issues/35340 for details. |
154 | 230k | const int32_t max_column_reader_num = _state->max_column_reader_num(); |
155 | | |
156 | 230k | if (_max_scan_concurrency != 1 && max_column_reader_num > 0) { |
157 | 118k | int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size()); |
158 | 118k | int32_t current_column_num = scan_column_num * _max_scan_concurrency; |
159 | 118k | if (current_column_num > max_column_reader_num) { |
160 | 0 | int32_t new_max_thread_num = max_column_reader_num / scan_column_num; |
161 | 0 | new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; |
162 | 0 | if (new_max_thread_num < _max_scan_concurrency) { |
163 | 0 | int32_t origin_max_thread_num = _max_scan_concurrency; |
164 | 0 | _max_scan_concurrency = new_max_thread_num; |
165 | 0 | LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) |
166 | 0 | << " scan's max_thread_num from " << origin_max_thread_num << " to " |
167 | 0 | << _max_scan_concurrency << ",column num: " << scan_column_num |
168 | 0 | << ", max_column_reader_num: " << max_column_reader_num; |
169 | 0 | } |
170 | 0 | } |
171 | 118k | } |
172 | | |
173 | 230k | COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency); |
174 | 230k | COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency); |
175 | | |
176 | 230k | std::unique_lock<std::mutex> l(_transfer_lock); |
177 | 230k | RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
178 | | |
179 | 230k | return Status::OK(); |
180 | 230k | } |
181 | | |
182 | 231k | ScannerContext::~ScannerContext() { |
183 | 231k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker()); |
184 | 231k | _tasks_queue.clear(); |
185 | 231k | BlockUPtr block; |
186 | 426k | while (_free_blocks.try_dequeue(block)) { |
187 | | // do nothing |
188 | 194k | } |
189 | 231k | block.reset(); |
190 | 231k | DorisMetrics::instance()->scanner_ctx_cnt->increment(-1); |
191 | 231k | if (_task_handle) { |
192 | 0 | if (auto* task_executor_scheduler = |
193 | 0 | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
194 | 0 | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
195 | 0 | } |
196 | 0 | _task_handle = nullptr; |
197 | 0 | } |
198 | 231k | if (auto ctx = task_exec_ctx(); ctx) { |
199 | 229k | ctx->unref_task_execution_ctx(); |
200 | 229k | } |
201 | 231k | } |
202 | | |
203 | 1.06M | BlockUPtr ScannerContext::get_free_block(bool force) { |
204 | 1.06M | BlockUPtr block = nullptr; |
205 | 1.06M | if (_free_blocks.try_dequeue(block)) { |
206 | 601k | DCHECK(block->mem_reuse()); |
207 | 601k | _block_memory_usage -= block->allocated_bytes(); |
208 | 601k | _scanner_memory_used_counter->set(_block_memory_usage); |
209 | | // A free block is reused, so the memory usage should be decreased |
210 | | // The caller of get_free_block will increase the memory usage |
211 | 601k | } else if (_block_memory_usage < _max_bytes_in_queue || force) { |
212 | 461k | _newly_create_free_blocks_num->update(1); |
213 | 461k | block = Block::create_unique(_output_tuple_desc->slots(), 0); |
214 | 461k | } |
215 | 1.06M | return block; |
216 | 1.06M | } |
217 | | |
218 | 1.06M | void ScannerContext::return_free_block(BlockUPtr block) { |
219 | | // If under low memory mode, should not return the freeblock, it will occupy too much memory. |
220 | 1.06M | if (!_local_state->low_memory_mode() && block->mem_reuse() && |
221 | 1.06M | _block_memory_usage < _max_bytes_in_queue) { |
222 | 796k | size_t block_size_to_reuse = block->allocated_bytes(); |
223 | 796k | _block_memory_usage += block_size_to_reuse; |
224 | 796k | _scanner_memory_used_counter->set(_block_memory_usage); |
225 | 796k | block->clear_column_data(); |
226 | | // Free blocks is used to improve memory efficiency. Failure during pushing back |
227 | | // free block will not incur any bad result so just ignore the return value. |
228 | 796k | _free_blocks.enqueue(std::move(block)); |
229 | 796k | } |
230 | 1.06M | } |
231 | | |
232 | | Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task, |
233 | 1.02M | std::unique_lock<std::mutex>& /*transfer_lock*/) { |
234 | | // increase _num_finished_scanners no matter the scan_task is submitted successfully or not. |
235 | | // since if submit failed, it will be added back by ScannerContext::push_back_scan_task |
236 | | // and _num_finished_scanners will be reduced. |
237 | | // if submit succeed, it will be also added back by ScannerContext::push_back_scan_task |
238 | | // see ScannerScheduler::_scanner_scan. |
239 | 1.02M | _num_scheduled_scanners++; |
240 | 1.02M | return _scanner_scheduler->submit(shared_from_this(), scan_task); |
241 | 1.02M | } |
242 | | |
243 | 128 | void ScannerContext::clear_free_blocks() { |
244 | 128 | clear_blocks(_free_blocks); |
245 | 128 | } |
246 | | |
247 | 1.02M | void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) { |
248 | 1.02M | if (scan_task->status_ok()) { |
249 | 1.03M | for (const auto& [block, _] : scan_task->cached_blocks) { |
250 | 1.03M | if (block->rows() > 0) { |
251 | 224k | Status st = validate_block_schema(block.get()); |
252 | 224k | if (!st.ok()) { |
253 | 0 | scan_task->set_status(st); |
254 | 0 | break; |
255 | 0 | } |
256 | 224k | } |
257 | 1.03M | } |
258 | 1.01M | } |
259 | | |
260 | 1.02M | std::lock_guard<std::mutex> l(_transfer_lock); |
261 | 1.02M | if (!scan_task->status_ok()) { |
262 | 1.28k | _process_status = scan_task->get_status(); |
263 | 1.28k | } |
264 | 1.02M | _tasks_queue.push_back(scan_task); |
265 | 1.02M | _num_scheduled_scanners--; |
266 | | |
267 | 1.02M | _dependency->set_ready(); |
268 | 1.02M | } |
269 | | |
270 | 1.03M | Status ScannerContext::get_block_from_queue(RuntimeState* state, Block* block, bool* eos, int id) { |
271 | 1.03M | if (state->is_cancelled()) { |
272 | 1 | _set_scanner_done(); |
273 | 1 | return state->cancel_reason(); |
274 | 1 | } |
275 | 1.03M | std::unique_lock l(_transfer_lock); |
276 | | |
277 | 1.03M | if (!_process_status.ok()) { |
278 | 1.17k | _set_scanner_done(); |
279 | 1.17k | return _process_status; |
280 | 1.17k | } |
281 | | |
282 | 1.03M | std::shared_ptr<ScanTask> scan_task = nullptr; |
283 | | |
284 | 1.03M | if (!_tasks_queue.empty() && !done()) { |
285 | | // https://en.cppreference.com/w/cpp/container/list/front |
286 | | // The behavior is undefined if the list is empty. |
287 | 1.03M | scan_task = _tasks_queue.front(); |
288 | 1.03M | } |
289 | | |
290 | 1.03M | if (scan_task != nullptr) { |
291 | | // The abnormal status of scanner may come from the execution of the scanner itself, |
292 | | // or come from the scanner scheduler, such as TooManyTasks. |
293 | 1.03M | if (!scan_task->status_ok()) { |
294 | | // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while. |
295 | 0 | _process_status = scan_task->get_status(); |
296 | 0 | _set_scanner_done(); |
297 | 0 | return _process_status; |
298 | 0 | } |
299 | | |
300 | | // No need to worry about small block, block is merged together when they are appended to cached_blocks. |
301 | 1.03M | if (!scan_task->cached_blocks.empty()) { |
302 | 1.03M | auto [current_block, block_size] = std::move(scan_task->cached_blocks.front()); |
303 | 1.03M | scan_task->cached_blocks.pop_front(); |
304 | 1.03M | _block_memory_usage -= block_size; |
305 | | // consume current block |
306 | 1.03M | block->swap(*current_block); |
307 | 1.03M | return_free_block(std::move(current_block)); |
308 | 1.03M | } |
309 | | |
310 | 1.03M | VLOG_DEBUG << fmt::format( |
311 | 342 | "ScannerContext {} get block from queue, task_queue size {}, current scan " |
312 | 342 | "task remaing cached_block size {}, eos {}, scheduled tasks {}", |
313 | 342 | ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(), scan_task->is_eos(), |
314 | 342 | _num_scheduled_scanners); |
315 | | |
316 | 1.03M | if (scan_task->cached_blocks.empty()) { |
317 | | // All Cached blocks are consumed, pop this task from task_queue. |
318 | 1.01M | if (!_tasks_queue.empty()) { |
319 | 1.01M | _tasks_queue.pop_front(); |
320 | 1.01M | } |
321 | | |
322 | 1.01M | if (scan_task->is_eos()) { |
323 | | // 1. if eos, record a finished scanner. |
324 | 1.01M | _num_finished_scanners++; |
325 | 1.01M | RETURN_IF_ERROR( |
326 | 1.01M | _scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l)); |
327 | 1.01M | } else { |
328 | 4.59k | RETURN_IF_ERROR( |
329 | 4.59k | _scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l)); |
330 | 4.59k | } |
331 | 1.01M | } |
332 | 1.03M | } |
333 | | |
334 | 1.03M | if (_num_finished_scanners == _all_scanners.size() && _tasks_queue.empty()) { |
335 | 225k | _set_scanner_done(); |
336 | 225k | _is_finished = true; |
337 | 225k | } |
338 | | |
339 | 1.03M | *eos = done(); |
340 | | |
341 | 1.03M | if (_tasks_queue.empty()) { |
342 | 869k | _dependency->block(); |
343 | 869k | } |
344 | | |
345 | 1.03M | return Status::OK(); |
346 | 1.03M | } |
347 | | |
348 | 224k | Status ScannerContext::validate_block_schema(Block* block) { |
349 | 224k | size_t index = 0; |
350 | 769k | for (auto& slot : _output_tuple_desc->slots()) { |
351 | 769k | auto& data = block->get_by_position(index++); |
352 | 769k | if (data.column->is_nullable() != data.type->is_nullable()) { |
353 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
354 | 0 | "column(name: {}) nullable({}) does not match type nullable({}), slot(id: " |
355 | 0 | "{}, " |
356 | 0 | "name:{})", |
357 | 0 | data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(), |
358 | 0 | slot->col_name()); |
359 | 0 | } |
360 | | |
361 | 769k | if (data.column->is_nullable() != slot->is_nullable()) { |
362 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
363 | 0 | "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) " |
364 | 0 | "nullable({})", |
365 | 0 | data.name, data.column->is_nullable(), slot->id(), slot->col_name(), |
366 | 0 | slot->is_nullable()); |
367 | 0 | } |
368 | 769k | } |
369 | 224k | return Status::OK(); |
370 | 224k | } |
371 | | |
372 | 456k | void ScannerContext::stop_scanners(RuntimeState* state) { |
373 | 456k | std::lock_guard<std::mutex> l(_transfer_lock); |
374 | 456k | if (_should_stop) { |
375 | 225k | return; |
376 | 225k | } |
377 | 231k | _should_stop = true; |
378 | 231k | _set_scanner_done(); |
379 | 1.02M | for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) { |
380 | 1.02M | if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) { |
381 | 1.02M | sc->_scanner->try_stop(); |
382 | 1.02M | } |
383 | 1.02M | } |
384 | 231k | _tasks_queue.clear(); |
385 | 231k | if (_task_handle) { |
386 | 231k | if (auto* task_executor_scheduler = |
387 | 231k | dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) { |
388 | 231k | static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle)); |
389 | 231k | } |
390 | 231k | _task_handle = nullptr; |
391 | 231k | } |
392 | | // TODO yiguolei, call mark close to scanners |
393 | 231k | if (state->enable_profile()) { |
394 | 1.38k | std::stringstream scanner_statistics; |
395 | 1.38k | std::stringstream scanner_rows_read; |
396 | 1.38k | std::stringstream scanner_wait_worker_time; |
397 | 1.38k | std::stringstream scanner_projection; |
398 | 1.38k | scanner_statistics << "["; |
399 | 1.38k | scanner_rows_read << "["; |
400 | 1.38k | scanner_wait_worker_time << "["; |
401 | 1.38k | scanner_projection << "["; |
402 | | // Scanners can in 3 state |
403 | | // state 1: in scanner context, not scheduled |
404 | | // state 2: in scanner worker pool's queue, scheduled but not running |
405 | | // state 3: scanner is running. |
406 | 2.77k | for (auto& scanner_ref : _all_scanners) { |
407 | 2.77k | auto scanner = scanner_ref.lock(); |
408 | 2.77k | if (scanner == nullptr) { |
409 | 0 | continue; |
410 | 0 | } |
411 | | // Add per scanner running time before close them |
412 | 2.77k | scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), |
413 | 2.77k | TUnit::TIME_NS) |
414 | 2.77k | << ", "; |
415 | 2.77k | scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(), |
416 | 2.77k | TUnit::TIME_NS) |
417 | 2.77k | << ", "; |
418 | 2.77k | scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), |
419 | 2.77k | TUnit::UNIT) |
420 | 2.77k | << ", "; |
421 | 2.77k | scanner_wait_worker_time |
422 | 2.77k | << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), |
423 | 2.77k | TUnit::TIME_NS) |
424 | 2.77k | << ", "; |
425 | | // since there are all scanners, some scanners is running, so that could not call scanner |
426 | | // close here. |
427 | 2.77k | } |
428 | 1.38k | scanner_statistics << "]"; |
429 | 1.38k | scanner_rows_read << "]"; |
430 | 1.38k | scanner_wait_worker_time << "]"; |
431 | 1.38k | scanner_projection << "]"; |
432 | 1.38k | _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); |
433 | 1.38k | _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); |
434 | 1.38k | _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); |
435 | 1.38k | _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str()); |
436 | 1.38k | } |
437 | 231k | } |
438 | | |
439 | 33 | std::string ScannerContext::debug_string() { |
440 | 33 | return fmt::format( |
441 | 33 | "id: {}, total scanners: {}, pending tasks: {}," |
442 | 33 | " _should_stop: {}, _is_finished: {}, free blocks: {}," |
443 | 33 | " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," |
444 | 33 | " _max_bytes_in_queue: {}, query_id: {}", |
445 | 33 | ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished, |
446 | 33 | _free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_scan_concurrency, |
447 | 33 | _max_bytes_in_queue, print_id(_query_id)); |
448 | 33 | } |
449 | | |
450 | 458k | void ScannerContext::_set_scanner_done() { |
451 | 458k | _dependency->set_always_ready(); |
452 | 458k | } |
453 | | |
454 | 2.04M | void ScannerContext::update_peak_running_scanner(int num) { |
455 | 2.04M | _local_state->_peak_running_scanner->add(num); |
456 | 2.04M | } |
457 | | |
458 | | int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock, |
459 | 1.24M | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
460 | | // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks. |
461 | 1.24M | int32_t margin_1 = _min_scan_concurrency - |
462 | 1.24M | (cast_set<int32_t>(_tasks_queue.size()) + _num_scheduled_scanners); |
463 | | |
464 | | // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. |
465 | 1.24M | int32_t margin_2 = |
466 | 1.24M | _min_scan_concurrency_of_scan_scheduler - |
467 | 1.24M | (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); |
468 | | |
469 | 1.24M | if (margin_1 <= 0 && margin_2 <= 0) { |
470 | 13.0k | return 0; |
471 | 13.0k | } |
472 | | |
473 | 1.23M | int32_t margin = std::max(margin_1, margin_2); |
474 | | |
475 | 1.23M | if (low_memory_mode()) { |
476 | | // In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`. |
477 | | // So that we will not submit too many scan tasks to scheduler. |
478 | 54 | margin = std::min(low_memory_mode_scanners() - _num_scheduled_scanners, margin); |
479 | 54 | } |
480 | | |
481 | 1.23M | VLOG_DEBUG << fmt::format( |
482 | 0 | "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " |
483 | 0 | "({} + {}), margin: {}", |
484 | 0 | print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _tasks_queue.size(), |
485 | 0 | _num_scheduled_scanners, margin_2, _min_scan_concurrency_of_scan_scheduler, |
486 | 0 | _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin); |
487 | | |
488 | 1.23M | return margin; |
489 | 1.24M | } |
490 | | |
491 | | // This function must be called with: |
492 | | // 1. _transfer_lock held. |
493 | | // 2. ScannerScheduler::_lock held. |
494 | | Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, |
495 | | std::unique_lock<std::mutex>& transfer_lock, |
496 | 1.24M | std::unique_lock<std::shared_mutex>& scheduler_lock) { |
497 | 1.24M | if (current_scan_task && |
498 | 1.24M | (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) { |
499 | 1 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
500 | 1 | } |
501 | | |
502 | 1.24M | std::list<std::shared_ptr<ScanTask>> tasks_to_submit; |
503 | | |
504 | 1.24M | int32_t margin = _get_margin(transfer_lock, scheduler_lock); |
505 | | |
506 | | // margin is less than zero. Means this scan operator could not submit any scan task for now. |
507 | 1.24M | if (margin <= 0) { |
508 | | // Be careful with current scan task. |
509 | | // We need to add it back to task queue to make sure it could be resubmitted. |
510 | 13.0k | if (current_scan_task) { |
511 | | // This usually happens when we should downgrade the concurrency. |
512 | 201 | _pending_scanners.push(current_scan_task); |
513 | 201 | VLOG_DEBUG << fmt::format( |
514 | 0 | "{} push back scanner to task queue, because diff <= 0, task_queue size " |
515 | 0 | "{}, _num_scheduled_scanners {}", |
516 | 0 | ctx_id, _tasks_queue.size(), _num_scheduled_scanners); |
517 | 201 | } |
518 | | |
519 | 13.0k | #ifndef NDEBUG |
520 | | // This DCHECK is necessary. |
521 | | // We need to make sure each scan operator could have at least 1 scan tasks. |
522 | | // Or this scan operator will not be re-scheduled. |
523 | 13.0k | if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) { |
524 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
525 | 0 | } |
526 | 13.0k | #endif |
527 | | |
528 | 13.0k | return Status::OK(); |
529 | 13.0k | } |
530 | | |
531 | 1.23M | bool first_pull = true; |
532 | | |
533 | 2.25M | while (margin-- > 0) { |
534 | 1.81M | std::shared_ptr<ScanTask> task_to_run; |
535 | 1.81M | const int32_t current_concurrency = cast_set<int32_t>( |
536 | 1.81M | _tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size()); |
537 | 1.81M | VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id, |
538 | 1 | current_concurrency, _tasks_queue.size(), _num_scheduled_scanners, |
539 | 1 | tasks_to_submit.size()); |
540 | 1.81M | if (first_pull) { |
541 | 1.23M | task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency); |
542 | 1.23M | if (task_to_run == nullptr) { |
543 | | // In two situations we will get nullptr. |
544 | | // 1. current_concurrency already reached _max_scan_concurrency. |
545 | | // 2. all scanners are finished. |
546 | 411k | if (current_scan_task) { |
547 | 1 | DCHECK(current_scan_task->cached_blocks.empty()); |
548 | 1 | DCHECK(!current_scan_task->is_eos()); |
549 | 1 | if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { |
550 | | // This should not happen. |
551 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
552 | 0 | "Scanner scheduler logical error."); |
553 | 0 | } |
554 | | // Current scan task is not eos, but we can not resubmit it. |
555 | | // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future. |
556 | 1 | _pending_scanners.push(current_scan_task); |
557 | 1 | } |
558 | 411k | } |
559 | 1.23M | first_pull = false; |
560 | 1.23M | } else { |
561 | 578k | task_to_run = _pull_next_scan_task(nullptr, current_concurrency); |
562 | 578k | } |
563 | | |
564 | 1.81M | if (task_to_run) { |
565 | 1.02M | tasks_to_submit.push_back(task_to_run); |
566 | 1.02M | } else { |
567 | 790k | break; |
568 | 790k | } |
569 | 1.81M | } |
570 | | |
571 | 1.23M | if (tasks_to_submit.empty()) { |
572 | 411k | return Status::OK(); |
573 | 411k | } |
574 | | |
575 | 823k | VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}", |
576 | 0 | print_id(_query_id), ctx_id, tasks_to_submit.size(), |
577 | 0 | _pending_scanners.size()); |
578 | | |
579 | 1.02M | for (auto& scan_task_iter : tasks_to_submit) { |
580 | 1.02M | Status submit_status = submit_scan_task(scan_task_iter, transfer_lock); |
581 | 1.02M | if (!submit_status.ok()) { |
582 | 0 | _process_status = submit_status; |
583 | 0 | _set_scanner_done(); |
584 | 0 | return _process_status; |
585 | 0 | } |
586 | 1.02M | } |
587 | | |
588 | 823k | return Status::OK(); |
589 | 823k | } |
590 | | |
591 | | std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task( |
592 | 1.81M | std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) { |
593 | 1.81M | if (current_concurrency >= _max_scan_concurrency) { |
594 | 376k | VLOG_DEBUG << fmt::format( |
595 | 0 | "ScannerContext {} current concurrency {} >= _max_scan_concurrency {}, skip " |
596 | 0 | "pull", |
597 | 0 | ctx_id, current_concurrency, _max_scan_concurrency); |
598 | 376k | return nullptr; |
599 | 376k | } |
600 | | |
601 | 1.43M | if (current_scan_task != nullptr) { |
602 | 3.86k | if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { |
603 | | // This should not happen. |
604 | 2 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); |
605 | 2 | } |
606 | 3.86k | return current_scan_task; |
607 | 3.86k | } |
608 | | |
609 | 1.43M | if (!_pending_scanners.empty()) { |
610 | 1.02M | std::shared_ptr<ScanTask> next_scan_task; |
611 | 1.02M | next_scan_task = _pending_scanners.top(); |
612 | 1.02M | _pending_scanners.pop(); |
613 | 1.02M | return next_scan_task; |
614 | 1.02M | } else { |
615 | 414k | return nullptr; |
616 | 414k | } |
617 | 1.43M | } |
618 | | |
619 | 5.40M | bool ScannerContext::low_memory_mode() const { |
620 | 5.40M | return _local_state->low_memory_mode(); |
621 | 5.40M | } |
622 | | #include "common/compile_check_end.h" |
623 | | } // namespace doris |