be/src/exec/scan/scanner_scheduler.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/scanner_scheduler.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cstdint> |
22 | | #include <functional> |
23 | | #include <list> |
24 | | #include <memory> |
25 | | #include <ostream> |
26 | | #include <string> |
27 | | #include <utility> |
28 | | |
29 | | #include "common/compiler_util.h" // IWYU pragma: keep |
30 | | #include "common/config.h" |
31 | | #include "common/exception.h" |
32 | | #include "common/logging.h" |
33 | | #include "common/status.h" |
34 | | #include "core/block/block.h" |
35 | | #include "exec/pipeline/pipeline_task.h" |
36 | | #include "exec/scan/file_scanner.h" |
37 | | #include "exec/scan/olap_scanner.h" // IWYU pragma: keep |
38 | | #include "exec/scan/scan_node.h" |
39 | | #include "exec/scan/scanner.h" |
40 | | #include "exec/scan/scanner_context.h" |
41 | | #include "runtime/exec_env.h" |
42 | | #include "runtime/runtime_state.h" |
43 | | #include "runtime/thread_context.h" |
44 | | #include "runtime/workload_group/workload_group_manager.h" |
45 | | #include "storage/tablet/tablet.h" |
46 | | #include "util/async_io.h" // IWYU pragma: keep |
47 | | #include "util/cpu_info.h" |
48 | | #include "util/defer_op.h" |
49 | | #include "util/thread.h" |
50 | | #include "util/threadpool.h" |
51 | | |
52 | | namespace doris { |
53 | | |
54 | | Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, |
55 | 1.41M | std::shared_ptr<ScanTask> scan_task) { |
56 | 1.41M | if (ctx->done()) { |
57 | 0 | return Status::OK(); |
58 | 0 | } |
59 | 1.41M | auto task_lock = ctx->task_exec_ctx(); |
60 | 1.41M | if (task_lock == nullptr) { |
61 | 18 | LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string() |
62 | 18 | << " maybe finished"; |
63 | 18 | return Status::OK(); |
64 | 18 | } |
65 | 1.41M | std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); |
66 | 1.41M | if (scanner_delegate == nullptr) { |
67 | 0 | return Status::OK(); |
68 | 0 | } |
69 | | |
70 | 1.41M | scanner_delegate->_scanner->start_wait_worker_timer(); |
71 | 1.41M | TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); |
72 | 1.41M | auto sumbit_task = [&]() { |
73 | 1.41M | auto work_func = [scanner_ref = scan_task, ctx]() { |
74 | 1.41M | auto status = [&] { |
75 | 1.41M | RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); |
76 | 1.41M | return Status::OK(); |
77 | 1.41M | }(); |
78 | | |
79 | 1.41M | if (!status.ok()) { |
80 | 0 | scanner_ref->set_status(status); |
81 | 0 | ctx->push_back_scan_task(scanner_ref); |
82 | 0 | return true; |
83 | 0 | } |
84 | 1.41M | return scanner_ref->is_eos(); |
85 | 1.41M | }; |
86 | 1.41M | SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task}; |
87 | 1.41M | return this->submit_scan_task(simple_scan_task); |
88 | 1.41M | }; |
89 | | |
90 | 1.41M | Status submit_status = sumbit_task(); |
91 | 1.41M | if (!submit_status.ok()) { |
92 | | // User will see TooManyTasks error. It looks like a more reasonable error. |
93 | 0 | Status scan_task_status = Status::TooManyTasks( |
94 | 0 | "Failed to submit scanner to scanner pool reason:" + |
95 | 0 | std::string(submit_status.msg()) + "|type:" + std::to_string(type)); |
96 | 0 | scan_task->set_status(scan_task_status); |
97 | 0 | return scan_task_status; |
98 | 0 | } |
99 | | |
100 | 1.41M | return Status::OK(); |
101 | 1.41M | } |
102 | | |
103 | | void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx, |
104 | 0 | const Status& st, size_t reserve_size) { |
105 | 0 | ctx->clear_free_blocks(); |
106 | 0 | auto* local_state = ctx->local_state(); |
107 | |
|
108 | 0 | auto debug_msg = fmt::format( |
109 | 0 | "Query: {} , scanner try to reserve: {}, operator name {}, " |
110 | 0 | "operator " |
111 | 0 | "id: {}, " |
112 | 0 | "task id: " |
113 | 0 | "{}, failed: {}", |
114 | 0 | print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size), |
115 | 0 | local_state->get_name(), local_state->parent()->node_id(), state->task_id(), |
116 | 0 | st.to_string()); |
117 | | // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str |
118 | 0 | if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { |
119 | 0 | debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str()); |
120 | 0 | } |
121 | 0 | VLOG_DEBUG << debug_msg; |
122 | |
|
123 | 0 | state->get_query_ctx()->set_low_memory_mode(); |
124 | 0 | } |
125 | | |
126 | | void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, |
127 | 1.41M | std::shared_ptr<ScanTask> scan_task) { |
128 | 1.41M | auto task_lock = ctx->task_exec_ctx(); |
129 | 1.41M | if (task_lock == nullptr) { |
130 | 0 | return; |
131 | 0 | } |
132 | 1.41M | SCOPED_ATTACH_TASK(ctx->state()); |
133 | | |
134 | 1.41M | ctx->update_peak_running_scanner(1); |
135 | 1.41M | Defer defer([&] { ctx->update_peak_running_scanner(-1); }); |
136 | | |
137 | 1.41M | std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); |
138 | 1.41M | if (scanner_delegate == nullptr) { |
139 | 1 | return; |
140 | 1 | } |
141 | | |
142 | 1.41M | ScannerSPtr& scanner = scanner_delegate->_scanner; |
143 | | // for cpu hard limit, thread name should not be reset |
144 | 1.41M | if (ctx->_should_reset_thread_name) { |
145 | 0 | Thread::set_self_name("_scanner_scan"); |
146 | 0 | } |
147 | | |
148 | 1.41M | #ifndef __APPLE__ |
149 | | // The configuration item is used to lower the priority of the scanner thread, |
150 | | // typically employed to ensure CPU scheduling for write operations. |
151 | 1.41M | if (config::scan_thread_nice_value != 0 && scanner->get_name() != FileScanner::NAME) { |
152 | 0 | Thread::set_thread_nice_value(); |
153 | 0 | } |
154 | 1.41M | #endif |
155 | | |
156 | | // we set and get counter according below order, to make sure the counter is updated before get_block, and the time of get_block is recorded in the counter. |
157 | | // 1. update_wait_worker_timer to make sure the time of waiting for worker thread is recorded in the timer |
158 | | // 2. start_scan_cpu_timer to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner |
159 | | // 3. update_scan_cpu_timer when defer, to make sure the cpu timer include the time of open and get_block, which is the real cpu time of scanner |
160 | | // 4. start_wait_worker_timer when defer, to make sure the time of waiting for worker thread is recorded in the timer |
161 | | |
162 | 1.41M | MonotonicStopWatch max_run_time_watch; |
163 | 1.41M | max_run_time_watch.start(); |
164 | 1.41M | scanner->update_wait_worker_timer(); |
165 | 1.41M | scanner->start_scan_cpu_timer(); |
166 | | |
167 | 1.41M | bool need_update_profile = true; |
168 | 1.41M | auto update_scanner_profile = [&]() { |
169 | 1.40M | if (need_update_profile) { |
170 | 1.40M | scanner->update_scan_cpu_timer(); |
171 | 1.40M | scanner->update_realtime_counters(); |
172 | 1.40M | need_update_profile = false; |
173 | 1.40M | } |
174 | 1.40M | }; |
175 | | |
176 | 1.41M | Status status = Status::OK(); |
177 | 1.41M | bool eos = false; |
178 | 1.41M | Defer defer_scanner([&] { |
179 | 1.41M | if (status.ok() && !eos) { |
180 | | // if status is not ok, it means the scanner is failed, and the counter may be not updated correctly, so no need to update counter again. if eos is true, it means the scanner is finished successfully, and the counter is updated correctly, so no need to update counter again. |
181 | 9.76k | scanner->start_wait_worker_timer(); |
182 | 9.76k | } |
183 | 1.41M | }); |
184 | | |
185 | 1.41M | ASSIGN_STATUS_IF_CATCH_EXCEPTION( |
186 | 1.41M | RuntimeState* state = ctx->state(); DCHECK(nullptr != state); |
187 | | // scanner->open may alloc plenty amount of memory(read blocks of data), |
188 | | // so better to also check low memory and clear free blocks here. |
189 | 1.41M | if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); } |
190 | | |
191 | 1.41M | if (!scanner->has_prepared()) { |
192 | 1.41M | status = scanner->prepare(); |
193 | 1.41M | if (!status.ok()) { |
194 | 1.41M | eos = true; |
195 | 1.41M | } |
196 | 1.41M | } |
197 | | |
198 | 1.41M | if (!eos && !scanner->is_open()) { |
199 | 1.41M | status = scanner->open(state); |
200 | 1.41M | if (!status.ok()) { |
201 | 1.41M | eos = true; |
202 | 1.41M | } |
203 | 1.41M | scanner->set_opened(); |
204 | 1.41M | } |
205 | | |
206 | 1.41M | Status rf_status = scanner->try_append_late_arrival_runtime_filter(); |
207 | 1.41M | if (!rf_status.ok()) { |
208 | 1.41M | LOG(WARNING) << "Failed to append late arrival runtime filter: " |
209 | 1.41M | << rf_status.to_string(); |
210 | 1.41M | } |
211 | | |
212 | 1.41M | size_t raw_bytes_threshold = config::doris_scanner_row_bytes; |
213 | 1.41M | if (ctx->low_memory_mode()) { |
214 | 1.41M | ctx->clear_free_blocks(); |
215 | 1.41M | if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { |
216 | 1.41M | raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); |
217 | 1.41M | } |
218 | 1.41M | } |
219 | | |
220 | 1.41M | size_t raw_bytes_read = 0; |
221 | 1.41M | bool first_read = true; int64_t limit = scanner->limit(); |
222 | | // If the first block is full, then it is true. Or the first block + second block > batch_size |
223 | 1.41M | bool has_first_full_block = false; |
224 | | |
225 | | // During low memory mode, every scan task will return at most 2 block to reduce memory usage. |
226 | 1.41M | while (!eos && raw_bytes_read < raw_bytes_threshold && |
227 | 1.41M | (!ctx->low_memory_mode() || !has_first_full_block) && |
228 | 1.41M | (!has_first_full_block || doris::thread_context() |
229 | 1.41M | ->thread_mem_tracker_mgr->limiter_mem_tracker() |
230 | 1.41M | ->check_limit(1))) { |
231 | 1.41M | if (UNLIKELY(ctx->done())) { |
232 | 1.41M | eos = true; |
233 | 1.41M | break; |
234 | 1.41M | } |
235 | 1.41M | if (max_run_time_watch.elapsed_time() > |
236 | 1.41M | config::doris_scanner_max_run_time_ms * 1e6) { |
237 | 1.41M | break; |
238 | 1.41M | } |
239 | 1.41M | DEFER_RELEASE_RESERVED(); |
240 | 1.41M | BlockUPtr free_block; |
241 | 1.41M | if (first_read) { |
242 | 1.41M | free_block = ctx->get_free_block(first_read); |
243 | 1.41M | } else { |
244 | 1.41M | if (state->get_query_ctx() |
245 | 1.41M | ->resource_ctx() |
246 | 1.41M | ->task_controller() |
247 | 1.41M | ->is_enable_reserve_memory()) { |
248 | 1.41M | size_t block_avg_bytes = scanner->get_block_avg_bytes(); |
249 | 1.41M | auto st = thread_context()->thread_mem_tracker_mgr->try_reserve( |
250 | 1.41M | block_avg_bytes); |
251 | 1.41M | if (!st.ok()) { |
252 | 1.41M | handle_reserve_memory_failure(state, ctx, st, block_avg_bytes); |
253 | 1.41M | break; |
254 | 1.41M | } |
255 | 1.41M | } |
256 | 1.41M | free_block = ctx->get_free_block(first_read); |
257 | 1.41M | } |
258 | 1.41M | if (free_block == nullptr) { |
259 | 1.41M | break; |
260 | 1.41M | } |
261 | | // We got a new created block or a reused block. |
262 | 1.41M | status = scanner->get_block_after_projects(state, free_block.get(), &eos); |
263 | 1.41M | first_read = false; |
264 | 1.41M | if (!status.ok()) { |
265 | 1.41M | LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string(); |
266 | 1.41M | break; |
267 | 1.41M | } |
268 | | // Check column type only after block is read successfully. |
269 | | // Or it may cause a crash when the block is not normal. |
270 | 1.41M | _make_sure_virtual_col_is_materialized(scanner, free_block.get()); |
271 | | // Projection will truncate useless columns, makes block size change. |
272 | 1.41M | auto free_block_bytes = free_block->allocated_bytes(); |
273 | 1.41M | raw_bytes_read += free_block_bytes; |
274 | 1.41M | if (!scan_task->cached_blocks.empty() && |
275 | 1.41M | scan_task->cached_blocks.back().first->rows() + free_block->rows() <= |
276 | 1.41M | ctx->batch_size()) { |
277 | 1.41M | size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); |
278 | 1.41M | MutableBlock mutable_block(scan_task->cached_blocks.back().first.get()); |
279 | 1.41M | status = mutable_block.merge(*free_block); |
280 | 1.41M | if (!status.ok()) { |
281 | 1.41M | LOG(WARNING) << "Block merge failed: " << status.to_string(); |
282 | 1.41M | break; |
283 | 1.41M | } |
284 | 1.41M | scan_task->cached_blocks.back().second = mutable_block.allocated_bytes(); |
285 | 1.41M | scan_task->cached_blocks.back().first.get()->set_columns( |
286 | 1.41M | std::move(mutable_block.mutable_columns())); |
287 | | |
288 | | // Return block succeed or not, this free_block is not used by this scan task any more. |
289 | | // If block can be reused, its memory usage will be added back. |
290 | 1.41M | ctx->return_free_block(std::move(free_block)); |
291 | 1.41M | ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() - |
292 | 1.41M | block_size); |
293 | 1.41M | } else { |
294 | 1.41M | if (!scan_task->cached_blocks.empty()) { |
295 | 1.41M | has_first_full_block = true; |
296 | 1.41M | } |
297 | 1.41M | ctx->inc_block_usage(free_block->allocated_bytes()); |
298 | 1.41M | scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); |
299 | 1.41M | } |
300 | | |
301 | 1.41M | if (limit > 0 && limit < ctx->batch_size()) { |
302 | | // If this scanner has limit, and less than batch size, |
303 | | // return immediately and no need to wait raw_bytes_threshold. |
304 | | // This can save time that each scanner may only return a small number of rows, |
305 | | // but rows are enough from all scanners. |
306 | | // If not break, the query like "select * from tbl where id=1 limit 10" |
307 | | // may scan a lot data when the "id=1"'s filter ratio is high. |
308 | | // If limit is larger than batch size, this rule is skipped, |
309 | | // to avoid user specify a large limit and causing too much small blocks. |
310 | 1.41M | break; |
311 | 1.41M | } |
312 | | |
313 | 1.41M | if (scan_task->cached_blocks.back().first->rows() > 0) { |
314 | 1.41M | auto block_avg_bytes = (scan_task->cached_blocks.back().first->bytes() + |
315 | 1.41M | scan_task->cached_blocks.back().first->rows() - 1) / |
316 | 1.41M | scan_task->cached_blocks.back().first->rows() * |
317 | 1.41M | ctx->batch_size(); |
318 | 1.41M | scanner->update_block_avg_bytes(block_avg_bytes); |
319 | 1.41M | } |
320 | 1.41M | if (ctx->low_memory_mode()) { |
321 | 1.41M | ctx->clear_free_blocks(); |
322 | 1.41M | if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { |
323 | 1.41M | raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); |
324 | 1.41M | } |
325 | 1.41M | } |
326 | 1.41M | } // end for while |
327 | | |
328 | 1.41M | if (UNLIKELY(!status.ok())) { |
329 | 1.41M | scan_task->set_status(status); |
330 | 1.41M | eos = true; |
331 | 1.41M | }, |
332 | 1.41M | status); |
333 | | |
334 | 1.41M | if (UNLIKELY(!status.ok())) { |
335 | 1.42k | scan_task->set_status(status); |
336 | 1.42k | eos = true; |
337 | 1.42k | } |
338 | | |
339 | 1.41M | if (eos) { |
340 | | // If eos, scanner will call _collect_profile_before_close to update profile, |
341 | | // so we need update_scanner_profile here |
342 | 1.40M | update_scanner_profile(); |
343 | 1.40M | scanner->mark_to_need_to_close(); |
344 | 1.40M | } |
345 | 1.41M | scan_task->set_eos(eos); |
346 | | |
347 | 18.4E | VLOG_DEBUG << fmt::format( |
348 | 18.4E | "Scanner context {} has finished task, cached_block {} current scheduled task is " |
349 | 18.4E | "{}, eos: {}, status: {}", |
350 | 18.4E | ctx->ctx_id, scan_task->cached_blocks.size(), ctx->num_scheduled_scanners(), eos, |
351 | 18.4E | status.to_string()); |
352 | | |
353 | 1.41M | ctx->push_back_scan_task(scan_task); |
354 | 1.41M | } |
355 | 80.0k | int ScannerScheduler::default_local_scan_thread_num() { |
356 | 80.0k | return config::doris_scanner_thread_pool_thread_num > 0 |
357 | 80.0k | ? config::doris_scanner_thread_pool_thread_num |
358 | 80.0k | : std::max(48, CpuInfo::num_cores() * 2); |
359 | 80.0k | } |
360 | 73.9k | int ScannerScheduler::default_remote_scan_thread_num() { |
361 | 73.9k | int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0 |
362 | 73.9k | ? config::doris_max_remote_scanner_thread_pool_thread_num |
363 | 73.9k | : std::max(512, CpuInfo::num_cores() * 10); |
364 | 73.9k | return std::max(num, default_local_scan_thread_num()); |
365 | 73.9k | } |
366 | | |
367 | 37 | int ScannerScheduler::get_remote_scan_thread_queue_size() { |
368 | 37 | return config::doris_remote_scanner_thread_pool_queue_size; |
369 | 37 | } |
370 | | |
371 | 6.15k | int ScannerScheduler::default_min_active_scan_threads() { |
372 | 6.15k | return config::min_active_scan_threads > 0 |
373 | 6.15k | ? config::min_active_scan_threads |
374 | 6.15k | : config::min_active_scan_threads = CpuInfo::num_cores() * 2; |
375 | 6.15k | } |
376 | | |
377 | 6.15k | int ScannerScheduler::default_min_active_file_scan_threads() { |
378 | 6.15k | return config::min_active_file_scan_threads > 0 |
379 | 6.15k | ? config::min_active_file_scan_threads |
380 | 6.15k | : config::min_active_file_scan_threads = CpuInfo::num_cores() * 8; |
381 | 6.15k | } |
382 | | |
383 | | void ScannerScheduler::_make_sure_virtual_col_is_materialized( |
384 | 1.51M | const std::shared_ptr<Scanner>& scanner, Block* free_block) { |
385 | 1.51M | #ifndef NDEBUG |
386 | | // Currently, virtual column can only be used on olap table. |
387 | 1.51M | std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner); |
388 | 1.51M | if (olap_scanner == nullptr) { |
389 | 170k | return; |
390 | 170k | } |
391 | | |
392 | 1.34M | if (free_block->rows() == 0) { |
393 | 1.10M | return; |
394 | 1.10M | } |
395 | | |
396 | 234k | size_t idx = 0; |
397 | 715k | for (const auto& entry : *free_block) { |
398 | | // Virtual column must be materialized on the end of SegmentIterator's next batch method. |
399 | 715k | const ColumnNothing* column_nothing = |
400 | 715k | check_and_get_column<ColumnNothing>(entry.column.get()); |
401 | 715k | if (column_nothing == nullptr) { |
402 | 715k | idx++; |
403 | 715k | continue; |
404 | 715k | } |
405 | | |
406 | 18.4E | std::vector<std::string> vcid_to_idx; |
407 | | |
408 | 18.4E | for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) { |
409 | 0 | vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second)); |
410 | 0 | } |
411 | | |
412 | 18.4E | std::string error_msg = fmt::format( |
413 | 18.4E | "Column in idx {} is nothing, block columns {}, normal_columns " |
414 | 18.4E | "{}, " |
415 | 18.4E | "vir_cid_to_idx_in_block_msg {}", |
416 | 18.4E | idx, free_block->columns(), olap_scanner->_return_columns.size(), |
417 | 18.4E | fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ","))); |
418 | 18.4E | throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg); |
419 | 715k | } |
420 | 234k | #endif |
421 | 234k | } |
422 | | |
423 | 1.42M | Result<SharedListenableFuture<Void>> ScannerSplitRunner::process_for(std::chrono::nanoseconds) { |
424 | 1.42M | _started = true; |
425 | 1.42M | bool is_completed = _scan_func(); |
426 | 1.42M | if (is_completed) { |
427 | 1.41M | _completion_future.set_value(Void {}); |
428 | 1.41M | } |
429 | 1.42M | return SharedListenableFuture<Void>::create_ready(Void {}); |
430 | 1.42M | } |
431 | | |
432 | 1.41M | bool ScannerSplitRunner::is_finished() { |
433 | 1.41M | return _completion_future.is_done(); |
434 | 1.41M | } |
435 | | |
436 | 1.41M | Status ScannerSplitRunner::finished_status() { |
437 | 1.41M | return _completion_future.get_status(); |
438 | 1.41M | } |
439 | | |
440 | 0 | bool ScannerSplitRunner::is_started() const { |
441 | 0 | return _started.load(); |
442 | 0 | } |
443 | | |
444 | 9.74k | bool ScannerSplitRunner::is_auto_reschedule() const { |
445 | 9.74k | return false; |
446 | 9.74k | } |
447 | | |
448 | | } // namespace doris |