be/src/exec/scan/olap_scanner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/olap_scanner.h" |
19 | | |
20 | | #include <gen_cpp/Descriptors_types.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <glog/logging.h> |
24 | | #include <stdlib.h> |
25 | | #include <thrift/protocol/TDebugProtocol.h> |
26 | | |
27 | | #include <algorithm> |
28 | | #include <atomic> |
29 | | #include <iterator> |
30 | | #include <ostream> |
31 | | #include <set> |
32 | | |
33 | | #include "cloud/cloud_storage_engine.h" |
34 | | #include "cloud/cloud_tablet_hotspot.h" |
35 | | #include "cloud/config.h" |
36 | | #include "common/config.h" |
37 | | #include "common/consts.h" |
38 | | #include "common/logging.h" |
39 | | #include "common/metrics/doris_metrics.h" |
40 | | #include "core/block/block.h" |
41 | | #include "core/data_type/data_type_number.h" |
42 | | #include "exec/common/variant_util.h" |
43 | | #include "exec/operator/olap_scan_operator.h" |
44 | | #include "exec/scan/scan_node.h" |
45 | | #include "exprs/function_filter.h" |
46 | | #include "exprs/vexpr.h" |
47 | | #include "exprs/vexpr_context.h" |
48 | | #include "io/cache/block_file_cache_profile.h" |
49 | | #include "io/io_common.h" |
50 | | #include "runtime/descriptors.h" |
51 | | #include "runtime/exec_env.h" |
52 | | #include "runtime/runtime_profile.h" |
53 | | #include "runtime/runtime_state.h" |
54 | | #include "service/backend_options.h" |
55 | | #include "storage/binlog.h" |
56 | | #include "storage/id_manager.h" |
57 | | #include "storage/index/inverted/inverted_index_profile.h" |
58 | | #include "storage/iterator/block_reader.h" |
59 | | #include "storage/olap_common.h" |
60 | | #include "storage/olap_tuple.h" |
61 | | #include "storage/olap_utils.h" |
62 | | #include "storage/predicate/predicate_creator.h" |
63 | | #include "storage/storage_engine.h" |
64 | | #include "storage/tablet/tablet_schema.h" |
65 | | #ifndef NDEBUG |
66 | | #include "util/debug_points.h" |
67 | | #endif |
68 | | #include "util/json/path_in_data.h" |
69 | | |
70 | | namespace doris { |
71 | | #include "common/compile_check_avoid_begin.h" |
72 | | |
73 | | using ReadSource = TabletReadSource; |
74 | | |
75 | | OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& params) |
76 | 1.03M | : Scanner(params.state, parent, params.limit, params.profile), |
77 | 1.03M | _key_ranges(std::move(params.key_ranges)), |
78 | 1.03M | _tablet_reader_params({.tablet = std::move(params.tablet), |
79 | 1.03M | .tablet_schema {}, |
80 | 1.03M | .reader_type = params.read_row_binlog ? ReaderType::READER_BINLOG |
81 | 1.03M | : ReaderType::READER_QUERY, |
82 | 1.03M | .aggregation = params.aggregation, |
83 | 1.03M | .version = {0, params.version}, |
84 | 1.03M | .start_key {}, |
85 | 1.03M | .end_key {}, |
86 | 1.03M | .predicates {}, |
87 | 1.03M | .function_filters {}, |
88 | 1.03M | .delete_predicates {}, |
89 | 1.03M | .target_cast_type_for_variants {}, |
90 | 1.03M | .all_access_paths {}, |
91 | 1.03M | .predicate_access_paths {}, |
92 | 1.03M | .rs_splits {}, |
93 | 1.03M | .return_columns {}, |
94 | 1.03M | .output_columns {}, |
95 | 1.03M | .filled_columns {}, |
96 | 1.03M | .common_expr_ctxs_push_down {}, |
97 | 1.03M | .topn_filter_source_node_ids {}, |
98 | 1.03M | .key_group_cluster_key_idxes {}, |
99 | 1.03M | .virtual_column_exprs {}, |
100 | 1.03M | .score_runtime {}, |
101 | 1.03M | .collection_statistics {}, |
102 | 1.03M | .ann_topn_runtime {}, |
103 | 1.03M | .condition_cache_digest = parent->get_condition_cache_digest(), |
104 | 1.03M | .binlog_scan_type = params.binlog_scan_type}), |
105 | 1.03M | _start_tso(params.start_tso), |
106 | 1.03M | _end_tso(params.end_tso) { |
107 | 1.03M | _tablet_reader_params.set_read_source(std::move(params.read_source), |
108 | 1.03M | _state->skip_delete_bitmap()); |
109 | 1.03M | _has_prepared = false; |
110 | 1.03M | _vector_search_params = params.state->get_vector_search_params(); |
111 | 1.03M | } |
112 | | |
113 | | static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema, |
114 | 2.86k | const std::vector<uint32_t>& read_columns) { |
115 | | // avoid too long for one line, |
116 | | // it is hard to display in `show profile` stmt if one line is too long. |
117 | 2.86k | const int col_per_line = 10; |
118 | 2.86k | int i = 0; |
119 | 2.86k | std::string read_columns_string; |
120 | 2.86k | read_columns_string += "["; |
121 | 15.1k | for (auto it = read_columns.cbegin(); it != read_columns.cend(); it++) { |
122 | 12.3k | if (it != read_columns.cbegin()) { |
123 | 9.56k | read_columns_string += ", "; |
124 | 9.56k | } |
125 | 12.3k | read_columns_string += tablet_schema->columns().at(*it)->name(); |
126 | 12.3k | if (i >= col_per_line) { |
127 | 13 | read_columns_string += "\n"; |
128 | 13 | i = 0; |
129 | 12.3k | } else { |
130 | 12.3k | ++i; |
131 | 12.3k | } |
132 | 12.3k | } |
133 | 2.86k | read_columns_string += "]"; |
134 | 2.86k | return read_columns_string; |
135 | 2.86k | } |
136 | | |
137 | 2.12M | static bool has_file_cache_statistics(const io::FileCacheStatistics& stats) { |
138 | 2.12M | return stats.num_local_io_total != 0 || stats.num_remote_io_total != 0 || |
139 | 2.12M | stats.num_peer_io_total != 0 || stats.local_io_timer != 0 || |
140 | 2.12M | stats.bytes_read_from_local != 0 || stats.bytes_read_from_remote != 0 || |
141 | 2.12M | stats.bytes_read_from_peer != 0 || stats.remote_io_timer != 0 || |
142 | 2.12M | stats.peer_io_timer != 0 || stats.remote_wait_timer != 0 || |
143 | 2.12M | stats.write_cache_io_timer != 0 || stats.bytes_write_into_cache != 0 || |
144 | 2.12M | stats.num_skip_cache_io_total != 0 || stats.read_cache_file_directly_timer != 0 || |
145 | 2.12M | stats.cache_get_or_set_timer != 0 || stats.lock_wait_timer != 0 || |
146 | 2.12M | stats.get_timer != 0 || stats.set_timer != 0 || |
147 | 2.12M | stats.inverted_index_num_local_io_total != 0 || |
148 | 2.12M | stats.inverted_index_num_remote_io_total != 0 || |
149 | 2.12M | stats.inverted_index_num_peer_io_total != 0 || |
150 | 2.12M | stats.inverted_index_bytes_read_from_local != 0 || |
151 | 2.12M | stats.inverted_index_bytes_read_from_remote != 0 || |
152 | 2.12M | stats.inverted_index_bytes_read_from_peer != 0 || |
153 | 2.12M | stats.inverted_index_local_io_timer != 0 || stats.inverted_index_remote_io_timer != 0 || |
154 | 2.12M | stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0; |
155 | 2.12M | } |
156 | | |
157 | 1.03M | Status OlapScanner::_prepare_impl() { |
158 | 1.03M | auto* local_state = static_cast<OlapScanLocalState*>(_local_state); |
159 | 1.03M | auto& tablet = _tablet_reader_params.tablet; |
160 | 1.03M | auto& tablet_schema = _tablet_reader_params.tablet_schema; |
161 | 1.03M | DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", { |
162 | 1.03M | LOG_WARNING("CloudTablet.capture_rs_readers.return e-230 init") |
163 | 1.03M | .tag("tablet_id", tablet->tablet_id()); |
164 | 1.03M | return Status::Error<false>(-230, "injected error"); |
165 | 1.03M | }); |
166 | | |
167 | 1.03M | for (auto& ctx : local_state->_common_expr_ctxs_push_down) { |
168 | 23.1k | VExprContextSPtr context; |
169 | 23.1k | RETURN_IF_ERROR(ctx->clone(_state, context)); |
170 | 23.1k | _common_expr_ctxs_push_down.emplace_back(context); |
171 | 23.1k | context->prepare_ann_range_search(_vector_search_params); |
172 | 23.1k | } |
173 | | |
174 | 1.03M | for (auto pair : local_state->_slot_id_to_virtual_column_expr) { |
175 | | // Scanner will be executed in a different thread, so we need to clone the context. |
176 | 386 | VExprContextSPtr context; |
177 | 386 | RETURN_IF_ERROR(pair.second->clone(_state, context)); |
178 | 386 | _slot_id_to_virtual_column_expr[pair.first] = context; |
179 | 386 | } |
180 | | |
181 | 1.03M | _score_runtime = local_state->_score_runtime; |
182 | | // All scanners share the same ann_topn_runtime. |
183 | 1.03M | _ann_topn_runtime = local_state->_ann_topn_runtime; |
184 | | |
185 | | // set limit to reduce end of rowset and segment mem use |
186 | 1.03M | _tablet_reader = std::make_unique<BlockReader>(); |
187 | | // batch size is passed down to segment iterator, use _state->batch_size() |
188 | | // instead of _parent->limit(), because if _parent->limit() is a very small |
189 | | // value (e.g. select a from t where a .. and b ... limit 1), |
190 | | // it will be very slow when reading data in segment iterator |
191 | 1.03M | _tablet_reader->set_batch_size(_state->batch_size()); |
192 | | // Adaptive batch size: pass byte-budget settings to the storage reader. |
193 | | // The reader still uses batch_size() as the row ceiling. |
194 | 1.03M | _tablet_reader->set_preferred_block_size_bytes(_state->preferred_block_size_bytes()); |
195 | 1.03M | { |
196 | 1.03M | TOlapScanNode& olap_scan_node = local_state->olap_scan_node(); |
197 | 1.03M | TabletSchemaSPtr source_tablet_schema = |
198 | 1.03M | _tablet_reader_params.reader_type == ReaderType::READER_BINLOG |
199 | 1.03M | ? tablet->row_binlog_tablet_schema() |
200 | 1.03M | : tablet->tablet_schema(); |
201 | | |
202 | 1.03M | tablet_schema = std::make_shared<TabletSchema>(); |
203 | 1.03M | tablet_schema->copy_from(*source_tablet_schema); |
204 | 1.03M | if (olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() && |
205 | 1.03M | olap_scan_node.columns_desc[0].col_unique_id >= 0) { |
206 | 1.03M | tablet_schema->clear_columns(); |
207 | 16.2M | for (const auto& column_desc : olap_scan_node.columns_desc) { |
208 | 16.2M | tablet_schema->append_column(TabletColumn(column_desc)); |
209 | 16.2M | } |
210 | 1.03M | if (olap_scan_node.__isset.schema_version) { |
211 | 1.03M | tablet_schema->set_schema_version(olap_scan_node.schema_version); |
212 | 1.03M | } |
213 | 1.03M | } |
214 | 1.03M | if (olap_scan_node.__isset.indexes_desc) { |
215 | 1.03M | tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc); |
216 | 1.03M | } |
217 | | |
218 | 1.03M | if (_tablet_reader_params.rs_splits.empty()) { |
219 | | // Non-pipeline mode, Tablet : Scanner = 1 : 1 |
220 | | // acquire tablet rowset readers at the beginning of the scan node |
221 | | // to prevent this case: when there are lots of olap scanners to run for example 10000 |
222 | | // the rowsets maybe compacted when the last olap scanner starts |
223 | 0 | ReadSource read_source; |
224 | |
|
225 | 0 | if (config::is_cloud_mode()) { |
226 | | // FIXME(plat1ko): Avoid pointer cast |
227 | 0 | ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
228 | 0 | } |
229 | |
|
230 | 0 | auto maybe_read_source = tablet->capture_read_source( |
231 | 0 | _tablet_reader_params.version, |
232 | 0 | { |
233 | 0 | .skip_missing_versions = _state->skip_missing_version(), |
234 | 0 | .enable_fetch_rowsets_from_peers = |
235 | 0 | config::enable_fetch_rowsets_from_peer_replicas, |
236 | 0 | .capture_row_binlog = |
237 | 0 | _tablet_reader_params.reader_type == ReaderType::READER_BINLOG, |
238 | 0 | .enable_prefer_cached_rowset = |
239 | 0 | config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() |
240 | 0 | : false, |
241 | 0 | .query_freshness_tolerance_ms = |
242 | 0 | config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() |
243 | 0 | : -1, |
244 | 0 | }); |
245 | 0 | if (!maybe_read_source) { |
246 | 0 | LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error(); |
247 | 0 | return maybe_read_source.error(); |
248 | 0 | } |
249 | 0 | read_source = std::move(maybe_read_source.value()); |
250 | |
|
251 | 0 | if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) { |
252 | 0 | LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}", |
253 | 0 | tablet->tablet_id(), print_id(_state->query_id())); |
254 | 0 | } |
255 | |
|
256 | 0 | if (!_state->skip_delete_predicate()) { |
257 | 0 | read_source.fill_delete_predicates(); |
258 | 0 | } |
259 | 0 | _tablet_reader_params.set_read_source(std::move(read_source)); |
260 | 0 | } |
261 | | |
262 | | // Initialize tablet_reader_params |
263 | 1.03M | RETURN_IF_ERROR(_init_tablet_reader_params( |
264 | 1.03M | local_state->_parent->cast<OlapScanOperatorX>()._slot_id_to_slot_desc, _key_ranges, |
265 | 1.03M | local_state->_slot_id_to_predicates, local_state->_push_down_functions)); |
266 | 1.03M | } |
267 | | |
268 | | // add read columns in profile |
269 | 1.03M | if (_state->enable_profile()) { |
270 | 2.83k | _profile->add_info_string("ReadColumns", |
271 | 2.83k | read_columns_to_string(tablet_schema, _return_columns)); |
272 | 2.83k | } |
273 | | |
274 | 1.03M | if (_tablet_reader_params.score_runtime) { |
275 | 14 | SCOPED_TIMER(local_state->_statistics_collect_timer); |
276 | 14 | _tablet_reader_params.collection_statistics = std::make_shared<CollectionStatistics>(); |
277 | | |
278 | 14 | io::IOContext io_ctx { |
279 | 14 | .reader_type = _tablet_reader_params.reader_type, |
280 | 14 | .expiration_time = tablet->ttl_seconds(), |
281 | 14 | .query_id = &_state->query_id(), |
282 | 14 | .file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats, |
283 | 14 | .is_inverted_index = true, |
284 | 14 | }; |
285 | | |
286 | 14 | RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect( |
287 | 14 | _state, _tablet_reader_params.rs_splits, _tablet_reader_params.tablet_schema, |
288 | 14 | _tablet_reader_params.common_expr_ctxs_push_down, &io_ctx)); |
289 | 14 | } |
290 | | |
291 | 1.03M | _has_prepared = true; |
292 | 1.03M | return Status::OK(); |
293 | 1.03M | } |
294 | | |
295 | 1.03M | Status OlapScanner::_open_impl(RuntimeState* state) { |
296 | 1.03M | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
297 | 1.03M | SCOPED_TIMER(_local_state->cast<OlapScanLocalState>()._reader_init_timer); |
298 | | |
299 | 1.03M | auto res = _tablet_reader->init(_tablet_reader_params); |
300 | 1.03M | if (!res.ok()) { |
301 | 49 | res.append("failed to initialize storage reader. tablet=" + |
302 | 49 | std::to_string(_tablet_reader_params.tablet->tablet_id()) + |
303 | 49 | ", backend=" + BackendOptions::get_localhost()); |
304 | 49 | return res; |
305 | 49 | } |
306 | | |
307 | | // Do not hold rs_splits any more to release memory. |
308 | 1.03M | _tablet_reader_params.rs_splits.clear(); |
309 | | |
310 | 1.03M | return Status::OK(); |
311 | 1.03M | } |
312 | | |
313 | | // For binlog partition-based incremental read. Pushes down [start_tso, end_tso] range |
314 | | // predicates onto the binlog timestamp column for row-binlog scans. Also ensures the |
315 | | // timestamp column is part of return_columns so the predicates can be evaluated by the |
316 | | // storage layer. |
317 | 1.03M | Status OlapScanner::_init_row_binlog_tso_predicates() { |
318 | 1.03M | if (_tablet_reader_params.reader_type != ReaderType::READER_BINLOG) { |
319 | 1.03M | return Status::OK(); |
320 | 1.03M | } |
321 | | |
322 | 18.4E | if (!_start_tso.has_value() && !_end_tso.has_value()) { |
323 | 0 | return Status::OK(); |
324 | 0 | } |
325 | | |
326 | 18.4E | auto& tablet_schema = _tablet_reader_params.tablet_schema; |
327 | 18.4E | int32_t tso_index = tablet_schema->binlog_timestamp_col_idx(); |
328 | 18.4E | if (tso_index < 0) { |
329 | 0 | return Status::InternalError("Column {} not found in tablet schema after append", |
330 | 0 | BINLOG_TIMESTAMP_COL); |
331 | 0 | } |
332 | | |
333 | 18.4E | auto data_type = std::make_shared<DataTypeInt64>(); |
334 | 18.4E | if (_start_tso.has_value()) { |
335 | 0 | Field start_value = Field::create_field<TYPE_BIGINT>(*_start_tso); |
336 | 0 | _tablet_reader_params.predicates.push_back(create_comparison_predicate<PredicateType::GT>( |
337 | 0 | tso_index, std::string(kRowBinlogTimestampColName), data_type, start_value, false)); |
338 | 0 | } |
339 | 18.4E | if (_end_tso.has_value()) { |
340 | 0 | Field end_value = Field::create_field<TYPE_BIGINT>(*_end_tso); |
341 | 0 | _tablet_reader_params.predicates.push_back(create_comparison_predicate<PredicateType::LE>( |
342 | 0 | tso_index, std::string(kRowBinlogTimestampColName), data_type, end_value, false)); |
343 | 0 | } |
344 | | |
345 | 18.4E | if (std::find(_tablet_reader_params.return_columns.begin(), |
346 | 18.4E | _tablet_reader_params.return_columns.end(), |
347 | 18.4E | tso_index) == _tablet_reader_params.return_columns.end()) { |
348 | 0 | _tablet_reader_params.return_columns.push_back(tso_index); |
349 | 0 | } |
350 | | |
351 | 18.4E | return Status::OK(); |
352 | 18.4E | } |
353 | | |
354 | | // it will be called under tablet read lock because capture rs readers need |
355 | | Status OlapScanner::_init_tablet_reader_params( |
356 | | const phmap::flat_hash_map<int, SlotDescriptor*>& slot_id_to_slot_desc, |
357 | | const std::vector<OlapScanRange*>& key_ranges, |
358 | | const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
359 | | slot_to_predicates, |
360 | 1.03M | const std::vector<FunctionFilter>& function_filters) { |
361 | | // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty |
362 | 1.03M | const bool single_version = _tablet_reader_params.has_single_version(); |
363 | | |
364 | 1.03M | auto* olap_local_state = static_cast<OlapScanLocalState*>(_local_state); |
365 | 1.03M | bool read_mor_as_dup = olap_local_state->olap_scan_node().__isset.read_mor_as_dup && |
366 | 1.03M | olap_local_state->olap_scan_node().read_mor_as_dup; |
367 | 1.03M | if (_state->skip_storage_engine_merge() || read_mor_as_dup) { |
368 | 49 | _tablet_reader_params.direct_mode = true; |
369 | 49 | _tablet_reader_params.aggregation = true; |
370 | 1.03M | } else { |
371 | 1.03M | auto push_down_agg_type = _local_state->get_push_down_agg_type(); |
372 | 1.03M | _tablet_reader_params.direct_mode = _tablet_reader_params.aggregation || single_version || |
373 | 1.03M | (push_down_agg_type != TPushAggOp::NONE && |
374 | 10.8k | push_down_agg_type != TPushAggOp::COUNT_ON_INDEX); |
375 | 1.03M | } |
376 | | |
377 | 1.03M | RETURN_IF_ERROR(_init_variant_columns()); |
378 | 1.03M | RETURN_IF_ERROR(_init_return_columns()); |
379 | | |
380 | 1.03M | _tablet_reader_params.push_down_agg_type_opt = _local_state->get_push_down_agg_type(); |
381 | | |
382 | | // Binlog DETAIL/MIN_DELTA scans widen `return_columns` with key/op/lsn/before |
383 | | // columns to drive the row-level merge in BlockReader. The storage-layer |
384 | | // statistics fast path (VStatisticsIterator, picked when push_down_agg_type |
385 | | // is COUNT/MINMAX) bypasses SegmentIterator entirely, returning raw segment |
386 | | // row counts without binlog op filtering and with a schema that does not |
387 | | // match the widened read schema. The result is both wrong (raw segment |
388 | | // count != binlog row count) and unsafe (column-count DCHECK fires inside |
389 | | // VStatisticsIterator::next_batch). Disable the fast path for these scans. |
390 | 1.03M | if (_tablet_reader_params.binlog_scan_type == TBinlogScanType::DETAIL || |
391 | 1.03M | _tablet_reader_params.binlog_scan_type == TBinlogScanType::MIN_DELTA) { |
392 | 0 | _tablet_reader_params.push_down_agg_type_opt = TPushAggOp::NONE; |
393 | 0 | } |
394 | | |
395 | 1.03M | _tablet_reader_params.common_expr_ctxs_push_down = _common_expr_ctxs_push_down; |
396 | 1.03M | _tablet_reader_params.virtual_column_exprs = _virtual_column_exprs; |
397 | 1.03M | _tablet_reader_params.score_runtime = _score_runtime; |
398 | 1.03M | _tablet_reader_params.output_columns = ((OlapScanLocalState*)_local_state)->_output_column_ids; |
399 | 1.03M | _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime; |
400 | 1.03M | for (const auto& ele : ((OlapScanLocalState*)_local_state)->_cast_types_for_variants) { |
401 | 2.19k | _tablet_reader_params.target_cast_type_for_variants[ele.first] = ele.second; |
402 | 2.19k | }; |
403 | 1.03M | auto& tablet_schema = _tablet_reader_params.tablet_schema; |
404 | 7.87M | for (auto& predicates : slot_to_predicates) { |
405 | 7.87M | const int sid = predicates.first; |
406 | 7.87M | DCHECK(slot_id_to_slot_desc.contains(sid)); |
407 | 7.87M | int32_t index = |
408 | 7.87M | tablet_schema->field_index(slot_id_to_slot_desc.find(sid)->second->col_name()); |
409 | 7.87M | if (index < 0) { |
410 | 0 | throw Exception( |
411 | 0 | Status::InternalError("Column {} not found in tablet schema", |
412 | 0 | slot_id_to_slot_desc.find(sid)->second->col_name())); |
413 | 0 | } |
414 | 7.87M | for (auto& predicate : predicates.second) { |
415 | 816k | _tablet_reader_params.predicates.push_back(predicate->clone(index)); |
416 | 816k | } |
417 | 7.87M | } |
418 | | |
419 | 1.03M | std::copy(function_filters.cbegin(), function_filters.cend(), |
420 | 1.03M | std::inserter(_tablet_reader_params.function_filters, |
421 | 1.03M | _tablet_reader_params.function_filters.begin())); |
422 | | |
423 | | // Merge the columns in delete predicate that not in latest schema in to current tablet schema |
424 | 1.03M | for (auto& del_pred : _tablet_reader_params.delete_predicates) { |
425 | 6.46k | tablet_schema->merge_dropped_columns(*del_pred->tablet_schema()); |
426 | 6.46k | } |
427 | | |
428 | | // Push key ranges to the tablet reader. |
429 | | // Skip the "full scan" placeholder (has_lower_bound == false) — when no key |
430 | | // predicates exist, start_key/end_key remain empty and the reader does a full scan. |
431 | 1.70M | for (auto* key_range : key_ranges) { |
432 | 1.70M | if (!key_range->has_lower_bound) { |
433 | 143k | continue; |
434 | 143k | } |
435 | | |
436 | 1.56M | _tablet_reader_params.start_key_include = key_range->begin_include; |
437 | 1.56M | _tablet_reader_params.end_key_include = key_range->end_include; |
438 | | |
439 | 1.56M | _tablet_reader_params.start_key.push_back(key_range->begin_scan_range); |
440 | 1.56M | _tablet_reader_params.end_key.push_back(key_range->end_scan_range); |
441 | 1.56M | } |
442 | | |
443 | 1.03M | _tablet_reader_params.profile = _local_state->custom_profile(); |
444 | 1.03M | _tablet_reader_params.runtime_state = _state; |
445 | | |
446 | 1.03M | _tablet_reader_params.origin_return_columns = &_return_columns; |
447 | 1.03M | _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set; |
448 | | |
449 | 1.03M | auto add_return_column_if_absent = [&](uint32_t cid) { |
450 | 0 | if (std::find(_tablet_reader_params.return_columns.begin(), |
451 | 0 | _tablet_reader_params.return_columns.end(), |
452 | 0 | cid) == _tablet_reader_params.return_columns.end()) { |
453 | 0 | _tablet_reader_params.return_columns.push_back(cid); |
454 | 0 | } |
455 | 0 | }; |
456 | | |
457 | | // For row-binlog scans that emit BEFORE/AFTER pairs (MIN_DELTA / DETAIL), we must read |
458 | | // every key column, every requested value column, the binlog meta columns (op / lsn / |
459 | | // tso) and their __BEFORE__ mirrors, so the BlockReader can reconstruct change rows. |
460 | 1.03M | const bool need_before_columns = |
461 | 1.03M | _tablet_reader_params.binlog_scan_type == TBinlogScanType::MIN_DELTA || |
462 | 1.03M | _tablet_reader_params.binlog_scan_type == TBinlogScanType::DETAIL; |
463 | 1.03M | if (need_before_columns) { |
464 | 0 | for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) { |
465 | 0 | add_return_column_if_absent(static_cast<uint32_t>(i)); |
466 | 0 | } |
467 | 0 | for (auto cid : _return_columns) { |
468 | 0 | add_return_column_if_absent(cid); |
469 | 0 | } |
470 | |
|
471 | 0 | if (int32_t op_idx = tablet_schema->field_index(std::string(kRowBinlogOpColName)); |
472 | 0 | op_idx >= 0) { |
473 | 0 | add_return_column_if_absent(static_cast<uint32_t>(op_idx)); |
474 | 0 | } |
475 | 0 | if (int32_t lsn_idx = tablet_schema->binlog_lsn_col_idx(); lsn_idx >= 0) { |
476 | 0 | add_return_column_if_absent(static_cast<uint32_t>(lsn_idx)); |
477 | 0 | } |
478 | |
|
479 | 0 | for (auto cid : _return_columns) { |
480 | 0 | if (cid >= tablet_schema->num_key_columns()) { |
481 | 0 | const auto& col_name = tablet_schema->column(cid).name(); |
482 | 0 | std::string before_col_name; |
483 | 0 | before_col_name.append("__BEFORE__"); |
484 | 0 | before_col_name.append(col_name); |
485 | 0 | before_col_name.append("__"); |
486 | 0 | if (int32_t before_idx = tablet_schema->field_index(before_col_name); |
487 | 0 | before_idx >= 0) { |
488 | 0 | add_return_column_if_absent(static_cast<uint32_t>(before_idx)); |
489 | 0 | } |
490 | 0 | } |
491 | 0 | } |
492 | 1.03M | } else if (_tablet_reader_params.direct_mode) { |
493 | 1.02M | _tablet_reader_params.return_columns = _return_columns; |
494 | 1.02M | } else { |
495 | | // we need to fetch all key columns to do the right aggregation on storage engine side. |
496 | 41.5k | for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) { |
497 | 29.2k | _tablet_reader_params.return_columns.push_back(i); |
498 | 29.2k | } |
499 | 52.0k | for (auto index : _return_columns) { |
500 | 52.0k | if (tablet_schema->column(index).is_key()) { |
501 | 20.1k | continue; |
502 | 20.1k | } |
503 | 31.9k | _tablet_reader_params.return_columns.push_back(index); |
504 | 31.9k | } |
505 | | // expand the sequence column |
506 | 12.3k | if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) { |
507 | 40 | bool has_replace_col = false; |
508 | 90 | for (auto col : _return_columns) { |
509 | 90 | if (tablet_schema->column(col).aggregation() == |
510 | 90 | FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) { |
511 | 40 | has_replace_col = true; |
512 | 40 | break; |
513 | 40 | } |
514 | 90 | } |
515 | 40 | if (auto sequence_col_idx = tablet_schema->sequence_col_idx(); |
516 | 40 | has_replace_col && tablet_schema->has_sequence_col() && |
517 | 40 | std::find(_return_columns.begin(), _return_columns.end(), sequence_col_idx) == |
518 | 28 | _return_columns.end()) { |
519 | 16 | _tablet_reader_params.return_columns.push_back(sequence_col_idx); |
520 | 16 | } |
521 | 40 | if (has_replace_col) { |
522 | 40 | const auto& val_to_seq = tablet_schema->value_col_idx_to_seq_col_idx(); |
523 | 40 | std::set<uint32_t> return_seq_columns; |
524 | | |
525 | 242 | for (auto col : _tablet_reader_params.return_columns) { |
526 | | // we need to add the necessary sequence column in _return_columns, and |
527 | | // Avoid adding the same seq column twice |
528 | 242 | const auto val_iter = val_to_seq.find(col); |
529 | 242 | if (val_iter != val_to_seq.end()) { |
530 | 42 | auto seq = val_iter->second; |
531 | 42 | if (std::find(_tablet_reader_params.return_columns.begin(), |
532 | 42 | _tablet_reader_params.return_columns.end(), |
533 | 42 | seq) == _tablet_reader_params.return_columns.end()) { |
534 | 4 | return_seq_columns.insert(seq); |
535 | 4 | } |
536 | 42 | } |
537 | 242 | } |
538 | 40 | _tablet_reader_params.return_columns.insert( |
539 | 40 | std::end(_tablet_reader_params.return_columns), |
540 | 40 | std::begin(return_seq_columns), std::end(return_seq_columns)); |
541 | 40 | } |
542 | 40 | } |
543 | 12.3k | } |
544 | | |
545 | 1.03M | RETURN_IF_ERROR(_init_row_binlog_tso_predicates()); |
546 | | |
547 | | // For any row-binlog scan, force the storage layer to deliver rows strictly in primary-key |
548 | | // order so the BlockReader can group consecutive same-key changes (MIN_DELTA) or emit |
549 | | // BEFORE/AFTER pairs in deterministic order (DETAIL). Disable ORDER BY / TopN pushdowns |
550 | | // and reset their related params, since they would otherwise re-order the stream. |
551 | 1.03M | if (_tablet_reader_params.binlog_scan_type != TBinlogScanType::NONE) { |
552 | 0 | _tablet_reader_params.read_orderby_key = true; |
553 | 0 | _tablet_reader_params.read_orderby_key_reverse = false; |
554 | 0 | _tablet_reader_params.read_orderby_key_num_prefix_columns = 0; |
555 | 0 | _tablet_reader_params.read_orderby_key_limit = 0; |
556 | 0 | _tablet_reader_params.force_key_ordered_read = true; |
557 | 0 | _tablet_reader_params.topn_filter_source_node_ids.clear(); |
558 | 0 | } |
559 | | |
560 | 1.03M | _tablet_reader_params.use_page_cache = _state->enable_page_cache(); |
561 | | |
562 | 1.03M | DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK); |
563 | | |
564 | 1.03M | if (!_state->skip_storage_engine_merge()) { |
565 | 1.03M | auto* olap_scan_local_state = (OlapScanLocalState*)_local_state; |
566 | 1.03M | TOlapScanNode& olap_scan_node = olap_scan_local_state->olap_scan_node(); |
567 | | |
568 | | // Set MOR value predicate pushdown flag |
569 | 1.03M | if (olap_scan_node.__isset.enable_mor_value_predicate_pushdown && |
570 | 1.03M | olap_scan_node.enable_mor_value_predicate_pushdown) { |
571 | 24 | _tablet_reader_params.enable_mor_value_predicate_pushdown = true; |
572 | 24 | } |
573 | | |
574 | 1.03M | const bool has_key_topn = |
575 | 1.03M | olap_scan_node.__isset.sort_info && !olap_scan_node.sort_info.is_asc_order.empty(); |
576 | 1.03M | if (has_key_topn) { |
577 | 2.38k | _limit = _local_state->limit_per_scanner(); |
578 | 2.38k | } |
579 | | |
580 | 1.03M | const bool no_runtime_filters = _total_rf_num == 0; |
581 | 1.03M | const bool segment_limit_enabled = _state->enable_segment_limit_pushdown(); |
582 | 1.03M | const bool storage_no_merge = olap_scan_local_state->_storage_no_merge(); |
583 | | |
584 | 1.03M | if (_limit > 0 && no_runtime_filters && segment_limit_enabled && storage_no_merge) { |
585 | 2.26k | for (const auto& conjunct : _conjuncts) { |
586 | 0 | DORIS_CHECK(!olap_scan_local_state->_check_expr_storage_filter( |
587 | 0 | conjunct->root(), OlapScanLocalState::ExprStorageFilterCheckMode:: |
588 | 0 | HAS_SEGMENT_EVALUABLE_EXPR)); |
589 | 0 | } |
590 | 2.26k | } |
591 | | |
592 | | // Segment LIMIT has only two legal states: completely disabled, or enabled after every |
593 | | // row-filtering conjunct has become a storage predicate or SegmentIterator common expr. |
594 | 1.03M | const bool can_push_down_segment_limit = _limit > 0 && no_runtime_filters && |
595 | 1.03M | _conjuncts.empty() && segment_limit_enabled && |
596 | 1.03M | storage_no_merge; |
597 | 1.03M | if (can_push_down_segment_limit) { |
598 | 2.25k | if (has_key_topn) { |
599 | 1.77k | _tablet_reader_params.read_orderby_key = true; |
600 | 1.77k | if (!olap_scan_node.sort_info.is_asc_order[0]) { |
601 | 147 | _tablet_reader_params.read_orderby_key_reverse = true; |
602 | 147 | } |
603 | 1.77k | _tablet_reader_params.read_orderby_key_num_prefix_columns = |
604 | 1.77k | olap_scan_node.sort_info.is_asc_order.size(); |
605 | 1.77k | _tablet_reader_params.read_orderby_key_limit = _limit; |
606 | 1.77k | } else { |
607 | 481 | _tablet_reader_params.general_read_limit = _limit; |
608 | 481 | } |
609 | 2.25k | } |
610 | | |
611 | 1.03M | if (_tablet_reader_params.read_orderby_key_limit > 0 || |
612 | 1.03M | _tablet_reader_params.general_read_limit > 0) { |
613 | 2.26k | DORIS_CHECK(can_push_down_segment_limit); |
614 | 2.26k | DORIS_CHECK(_conjuncts.empty()); |
615 | 2.26k | } |
616 | | |
617 | | // A key TopN scan cannot share the plain LIMIT early-stop counter. If |
618 | | // storage TopN is pushed down, each scanner must produce its full local |
619 | | // candidates. If it is not pushed down for any reason, the upper TopN |
620 | | // still needs all rows from the scan. |
621 | 1.03M | if (has_key_topn) { |
622 | 2.38k | _shared_scan_limit = nullptr; |
623 | 2.38k | if (_tablet_reader_params.read_orderby_key_limit == 0) { |
624 | 599 | _limit = -1; |
625 | 599 | } |
626 | 2.38k | } |
627 | | // Note: _shared_scan_limit is intentionally not pushed into the |
628 | | // storage layer. SegmentIterator's _process_eof() is irreversible, |
629 | | // so a concurrently-decremented atomic could reach 0 while a segment |
630 | | // still has data needed by other scanners. |
631 | | |
632 | | // set push down topn filter |
633 | 1.03M | _tablet_reader_params.topn_filter_source_node_ids = |
634 | 1.03M | olap_scan_local_state->get_topn_filter_source_node_ids(_state, true); |
635 | 1.03M | if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) { |
636 | 4.88k | _tablet_reader_params.topn_filter_target_node_id = |
637 | 4.88k | olap_scan_local_state->parent()->node_id(); |
638 | 4.88k | } |
639 | 1.03M | } |
640 | | |
641 | | // If this is a Two-Phase read query, and we need to delay the release of Rowset |
642 | | // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset |
643 | 1.03M | if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { |
644 | 0 | constexpr static int delayed_s = 60; |
645 | 0 | for (auto rs_reader : _tablet_reader_params.rs_splits) { |
646 | 0 | uint64_t delayed_expired_timestamp = |
647 | 0 | UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() + |
648 | 0 | delayed_s; |
649 | 0 | rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp( |
650 | 0 | delayed_expired_timestamp); |
651 | 0 | ExecEnv::GetInstance()->storage_engine().add_quering_rowset( |
652 | 0 | rs_reader.rs_reader->rowset()); |
653 | 0 | } |
654 | 0 | } |
655 | | |
656 | 1.03M | if (tablet_schema->has_global_row_id()) { |
657 | 7.42k | auto& id_file_map = _state->get_id_file_map(); |
658 | 14.9k | for (auto rs_reader : _tablet_reader_params.rs_splits) { |
659 | 14.9k | id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset()); |
660 | 14.9k | } |
661 | 7.42k | } |
662 | | |
663 | 1.03M | return Status::OK(); |
664 | 1.03M | } |
665 | | |
666 | 1.03M | Status OlapScanner::_init_variant_columns() { |
667 | 1.03M | auto& tablet_schema = _tablet_reader_params.tablet_schema; |
668 | 1.03M | if (tablet_schema->num_variant_columns() == 0) { |
669 | 1.03M | return Status::OK(); |
670 | 1.03M | } |
671 | | // Parent column has path info to distinction from each other |
672 | 18.5k | for (auto* slot : _output_tuple_desc->slots()) { |
673 | 18.5k | if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { |
674 | | // Such columns are not exist in frontend schema info, so we need to |
675 | | // add them into tablet_schema for later column indexing. |
676 | 9.36k | const auto& dt_variant = |
677 | 9.36k | assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type())); |
678 | 9.36k | TabletColumn subcol = TabletColumn::create_materialized_variant_column( |
679 | 9.36k | tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(), |
680 | 9.36k | slot->column_paths(), slot->col_unique_id(), |
681 | 9.36k | dt_variant.variant_max_subcolumns_count(), dt_variant.enable_doc_mode()); |
682 | 9.36k | if (tablet_schema->field_index(*subcol.path_info_ptr()) < 0) { |
683 | 7.29k | tablet_schema->append_column(subcol, TabletSchema::ColumnType::VARIANT); |
684 | 7.29k | } |
685 | 9.36k | } |
686 | 18.5k | } |
687 | 6.73k | variant_util::inherit_column_attributes(tablet_schema); |
688 | 6.73k | return Status::OK(); |
689 | 1.03M | } |
690 | | |
691 | 1.03M | Status OlapScanner::_init_return_columns() { |
692 | | // For OLAP scan, _output_tuple_desc is the storage-aligned scan tuple |
693 | | // descriptor. filled_key_column_slot_ids marks key slots that are present |
694 | | // only for scan-schema alignment. For example, on an AGG table with keys |
695 | | // (k1, k2), a query returning only k2 may still scan (k1, k2); k1 is a |
696 | | // filled column and can be removed by the projection output tuple. |
697 | 10.3M | for (auto* slot : _output_tuple_desc->slots()) { |
698 | | // variant column using path to index a column |
699 | 10.3M | int32_t index = 0; |
700 | 10.3M | auto& tablet_schema = _tablet_reader_params.tablet_schema; |
701 | 10.3M | if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { |
702 | 9.36k | index = tablet_schema->field_index(PathInData( |
703 | 9.36k | tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(), |
704 | 9.36k | slot->column_paths())); |
705 | 10.3M | } else { |
706 | 10.3M | index = slot->col_unique_id() >= 0 ? tablet_schema->field_index(slot->col_unique_id()) |
707 | 18.4E | : tablet_schema->field_index(slot->col_name()); |
708 | 10.3M | } |
709 | | |
710 | 10.3M | if (index < 0) { |
711 | 0 | return Status::InternalError( |
712 | 0 | "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}", |
713 | 0 | slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id()); |
714 | 0 | } |
715 | | |
716 | 10.3M | if (slot->get_virtual_column_expr()) { |
717 | 381 | _virtual_column_exprs[index] = _slot_id_to_virtual_column_expr[slot->id()]; |
718 | | |
719 | 18.4E | VLOG_DEBUG << fmt::format("Virtual column, slot id: {}, cid {}, type: {}", slot->id(), |
720 | 18.4E | index, slot->get_data_type_ptr()->get_name()); |
721 | 381 | } |
722 | | |
723 | 10.3M | const auto& column = tablet_schema->column(index); |
724 | 10.3M | auto* olap_local_state = static_cast<OlapScanLocalState*>(_local_state); |
725 | 10.3M | const auto& olap_scan_node = olap_local_state->olap_scan_node(); |
726 | 10.3M | if (olap_scan_node.__isset.filled_key_column_slot_ids && |
727 | 10.3M | olap_scan_node.filled_key_column_slot_ids.contains(slot->id())) { |
728 | 0 | DORIS_CHECK(column.is_key()); |
729 | 0 | if (_tablet_reader_params.direct_mode) { |
730 | 0 | _tablet_reader_params.filled_columns.insert(index); |
731 | 0 | } |
732 | 0 | } |
733 | 10.3M | int32_t unique_id = |
734 | 10.3M | column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id(); |
735 | 10.3M | if (!slot->all_access_paths().empty()) { |
736 | 66.8k | _tablet_reader_params.all_access_paths.insert({unique_id, slot->all_access_paths()}); |
737 | 66.8k | } |
738 | | |
739 | 10.3M | if (!slot->predicate_access_paths().empty()) { |
740 | 9.51k | _tablet_reader_params.predicate_access_paths.insert( |
741 | 9.51k | {unique_id, slot->predicate_access_paths()}); |
742 | 9.51k | } |
743 | | |
744 | 10.3M | if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT || |
745 | 10.3M | slot->type()->get_primitive_type() == PrimitiveType::TYPE_MAP || |
746 | 10.3M | slot->type()->get_primitive_type() == PrimitiveType::TYPE_ARRAY) && |
747 | 10.3M | !slot->all_access_paths().empty()) { |
748 | 51.9k | tablet_schema->add_pruned_columns_data_type(column.unique_id(), slot->type()); |
749 | 51.9k | } |
750 | | |
751 | 10.3M | _return_columns.push_back(index); |
752 | 10.3M | if (slot->is_nullable() && !tablet_schema->column(index).is_nullable()) { |
753 | 0 | _tablet_columns_convert_to_null_set.emplace(index); |
754 | 10.3M | } else if (!slot->is_nullable() && tablet_schema->column(index).is_nullable()) { |
755 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA>( |
756 | 0 | "slot(id: {}, name: {})'s nullable does not match " |
757 | 0 | "column(tablet id: {}, index: {}, name: {}) ", |
758 | 0 | slot->id(), slot->col_name(), tablet_schema->table_id(), index, |
759 | 0 | tablet_schema->column(index).name()); |
760 | 0 | } |
761 | 10.3M | } |
762 | | |
763 | 1.03M | if (_return_columns.empty()) { |
764 | 0 | return Status::InternalError("failed to build storage scanner, no materialized slot!"); |
765 | 0 | } |
766 | | |
767 | 1.03M | return Status::OK(); |
768 | 1.03M | } |
769 | | |
770 | 2.16M | bool OlapScanner::check_partition_pruned() const { |
771 | 2.16M | if (!_local_state) { |
772 | 0 | return false; |
773 | 0 | } |
774 | 2.16M | return _local_state->is_partition_pruned(_tablet_reader_params.tablet->partition_id()); |
775 | 2.16M | } |
776 | | |
777 | 1.08M | doris::TabletStorageType OlapScanner::get_storage_type() { |
778 | 1.08M | if (config::is_cloud_mode()) { |
779 | | // we don't have cold storage in cloud mode, all storage is treated as local |
780 | 899k | return doris::TabletStorageType::STORAGE_TYPE_LOCAL; |
781 | 899k | } |
782 | 186k | int local_reader = 0; |
783 | 334k | for (const auto& reader : _tablet_reader_params.rs_splits) { |
784 | 334k | local_reader += reader.rs_reader->rowset()->is_local(); |
785 | 334k | } |
786 | 186k | int total_reader = _tablet_reader_params.rs_splits.size(); |
787 | | |
788 | 186k | if (local_reader == total_reader) { |
789 | 186k | return doris::TabletStorageType::STORAGE_TYPE_LOCAL; |
790 | 18.4E | } else if (local_reader == 0) { |
791 | 0 | return doris::TabletStorageType::STORAGE_TYPE_REMOTE; |
792 | 0 | } |
793 | 18.4E | return doris::TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL; |
794 | 186k | } |
795 | | |
796 | 1.32M | Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
797 | | // Read one block from block reader |
798 | | // ATTN: Here we need to let the _get_block_impl method guarantee the semantics of the interface, |
799 | | // that is, eof can be set to true only when the returned block is empty. |
800 | 1.32M | RETURN_IF_ERROR(_tablet_reader->next_block_with_aggregation(block, eof)); |
801 | 1.32M | if (block->rows() > 0) { |
802 | 290k | _tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed); |
803 | 290k | *eof = false; |
804 | 290k | } |
805 | 1.32M | #ifndef NDEBUG |
806 | 1.32M | RETURN_IF_ERROR(_check_ann_cache_hit_debug_points(_tablet_reader->stats())); |
807 | 1.32M | #endif |
808 | 1.32M | return Status::OK(); |
809 | 1.32M | } |
810 | | |
811 | 1.04M | Status OlapScanner::close(RuntimeState* state) { |
812 | 1.04M | if (!_try_close()) { |
813 | 150 | return Status::OK(); |
814 | 150 | } |
815 | 1.04M | RETURN_IF_ERROR(Scanner::close(state)); |
816 | 1.04M | return Status::OK(); |
817 | 1.04M | } |
818 | | |
819 | 1.08M | void OlapScanner::update_realtime_counters() { |
820 | 1.08M | if (!_has_prepared) { |
821 | | // Counter update need prepare successfully, or it maybe core. For example, olap scanner |
822 | | // will open tablet reader during prepare, if not prepare successfully, tablet reader == nullptr. |
823 | 0 | return; |
824 | 0 | } |
825 | 1.08M | OlapScanLocalState* local_state = static_cast<OlapScanLocalState*>(_local_state); |
826 | 1.08M | const OlapReaderStatistics& stats = _tablet_reader->stats(); |
827 | 1.08M | COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read); |
828 | 1.08M | COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read); |
829 | 1.08M | COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read); |
830 | 1.08M | COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read); |
831 | | |
832 | | // Make sure the scan bytes and scan rows counter in audit log is the same as the counter in |
833 | | // doris metrics. |
834 | | // ScanBytes is the uncompressed bytes read from local + remote |
835 | | // bytes_read_from_local is the compressed bytes read from local |
836 | | // bytes_read_from_remote is the compressed bytes read from remote |
837 | | // scan bytes > bytes_read_from_local + bytes_read_from_remote |
838 | 1.08M | _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(stats.raw_rows_read); |
839 | 1.08M | _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes( |
840 | 1.08M | stats.uncompressed_bytes_read); |
841 | | |
842 | | // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote |
843 | 1.08M | if (stats.file_cache_stats.bytes_read_from_local == 0 && |
844 | 1.08M | stats.file_cache_stats.bytes_read_from_remote == 0) { |
845 | 991k | _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( |
846 | 991k | stats.compressed_bytes_read); |
847 | 991k | DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
848 | 991k | stats.compressed_bytes_read); |
849 | 991k | } else { |
850 | 91.6k | _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( |
851 | 91.6k | stats.file_cache_stats.bytes_read_from_local); |
852 | 91.6k | _state->get_query_ctx() |
853 | 91.6k | ->resource_ctx() |
854 | 91.6k | ->io_context() |
855 | 91.6k | ->update_scan_bytes_from_remote_storage( |
856 | 91.6k | stats.file_cache_stats.bytes_read_from_remote); |
857 | | |
858 | 91.6k | DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
859 | 91.6k | stats.file_cache_stats.bytes_read_from_local); |
860 | 91.6k | DorisMetrics::instance()->query_scan_bytes_from_remote->increment( |
861 | 91.6k | stats.file_cache_stats.bytes_read_from_remote); |
862 | 91.6k | } |
863 | | |
864 | 1.08M | if (has_file_cache_statistics(stats.file_cache_stats)) { |
865 | 93.8k | io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get()); |
866 | 93.8k | cache_profile.update(&stats.file_cache_stats); |
867 | 93.8k | _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache( |
868 | 93.8k | stats.file_cache_stats.bytes_write_into_cache); |
869 | 93.8k | } |
870 | | |
871 | 1.08M | _tablet_reader->mutable_stats()->compressed_bytes_read = 0; |
872 | 1.08M | _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0; |
873 | 1.08M | _tablet_reader->mutable_stats()->raw_rows_read = 0; |
874 | 1.08M | _tablet_reader->mutable_stats()->file_cache_stats = {}; |
875 | 1.08M | } |
876 | | |
877 | 1.03M | void OlapScanner::_collect_profile_before_close() { |
878 | | // Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside. |
879 | 1.03M | if (_has_updated_counter) { |
880 | 0 | return; |
881 | 0 | } |
882 | 1.03M | _has_updated_counter = true; |
883 | 1.03M | _tablet_reader->update_profile(_profile); |
884 | | |
885 | 1.03M | Scanner::_collect_profile_before_close(); |
886 | | |
887 | | // Update counters for OlapScanner |
888 | | // Update counters from tablet reader's stats |
889 | 1.03M | auto& stats = _tablet_reader->stats(); |
890 | 1.03M | auto* local_state = (OlapScanLocalState*)_local_state; |
891 | 1.03M | COUNTER_UPDATE(local_state->_io_timer, stats.io_ns); |
892 | 1.03M | COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read); |
893 | 1.03M | COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read); |
894 | 1.03M | COUNTER_UPDATE(local_state->_decompressor_timer, stats.decompress_ns); |
895 | 1.03M | COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read); |
896 | 1.03M | COUNTER_UPDATE(local_state->_block_load_timer, stats.block_load_ns); |
897 | 1.03M | COUNTER_UPDATE(local_state->_block_load_counter, stats.blocks_load); |
898 | 1.03M | COUNTER_UPDATE(local_state->_block_fetch_timer, stats.block_fetch_ns); |
899 | 1.03M | COUNTER_UPDATE(local_state->_delete_bitmap_get_agg_timer, stats.delete_bitmap_get_agg_ns); |
900 | 1.03M | COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read); |
901 | 1.03M | COUNTER_UPDATE(local_state->_vec_cond_timer, stats.vec_cond_ns); |
902 | 1.03M | COUNTER_UPDATE(local_state->_short_cond_timer, stats.short_cond_ns); |
903 | 1.03M | COUNTER_UPDATE(local_state->_expr_filter_timer, stats.expr_filter_ns); |
904 | 1.03M | COUNTER_UPDATE(local_state->_block_init_timer, stats.block_init_ns); |
905 | 1.03M | COUNTER_UPDATE(local_state->_block_init_seek_timer, stats.block_init_seek_ns); |
906 | 1.03M | COUNTER_UPDATE(local_state->_block_init_seek_counter, stats.block_init_seek_num); |
907 | 1.03M | COUNTER_UPDATE(local_state->_segment_generate_row_range_by_keys_timer, |
908 | 1.03M | stats.generate_row_ranges_by_keys_ns); |
909 | 1.03M | COUNTER_UPDATE(local_state->_segment_generate_row_range_by_column_conditions_timer, |
910 | 1.03M | stats.generate_row_ranges_by_column_conditions_ns); |
911 | 1.03M | COUNTER_UPDATE(local_state->_segment_generate_row_range_by_bf_timer, |
912 | 1.03M | stats.generate_row_ranges_by_bf_ns); |
913 | 1.03M | COUNTER_UPDATE(local_state->_collect_iterator_merge_next_timer, |
914 | 1.03M | stats.collect_iterator_merge_next_timer); |
915 | 1.03M | COUNTER_UPDATE(local_state->_segment_generate_row_range_by_zonemap_timer, |
916 | 1.03M | stats.generate_row_ranges_by_zonemap_ns); |
917 | 1.03M | COUNTER_UPDATE(local_state->_segment_generate_row_range_by_dict_timer, |
918 | 1.03M | stats.generate_row_ranges_by_dict_ns); |
919 | 1.03M | COUNTER_UPDATE(local_state->_predicate_column_read_timer, stats.predicate_column_read_ns); |
920 | 1.03M | COUNTER_UPDATE(local_state->_non_predicate_column_read_timer, stats.non_predicate_read_ns); |
921 | 1.03M | COUNTER_UPDATE(local_state->_predicate_column_read_seek_timer, |
922 | 1.03M | stats.predicate_column_read_seek_ns); |
923 | 1.03M | COUNTER_UPDATE(local_state->_predicate_column_read_seek_counter, |
924 | 1.03M | stats.predicate_column_read_seek_num); |
925 | 1.03M | COUNTER_UPDATE(local_state->_lazy_read_timer, stats.lazy_read_ns); |
926 | 1.03M | COUNTER_UPDATE(local_state->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns); |
927 | 1.03M | COUNTER_UPDATE(local_state->_lazy_read_seek_counter, stats.block_lazy_read_seek_num); |
928 | 1.03M | COUNTER_UPDATE(local_state->_output_col_timer, stats.output_col_ns); |
929 | 1.03M | COUNTER_UPDATE(local_state->_rows_vec_cond_filtered_counter, stats.rows_vec_cond_filtered); |
930 | 1.03M | COUNTER_UPDATE(local_state->_rows_short_circuit_cond_filtered_counter, |
931 | 1.03M | stats.rows_short_circuit_cond_filtered); |
932 | 1.03M | COUNTER_UPDATE(local_state->_rows_expr_cond_filtered_counter, stats.rows_expr_cond_filtered); |
933 | 1.03M | COUNTER_UPDATE(local_state->_rows_vec_cond_input_counter, stats.vec_cond_input_rows); |
934 | 1.03M | COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter, |
935 | 1.03M | stats.short_circuit_cond_input_rows); |
936 | 1.03M | COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows); |
937 | 1.03M | COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered); |
938 | 1.03M | COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered); |
939 | 1.03M | COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.segment_dict_filtered); |
940 | 1.03M | COUNTER_UPDATE(local_state->_bf_filtered_counter, stats.rows_bf_filtered); |
941 | 1.03M | COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_filtered); |
942 | 1.03M | COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_by_bitmap); |
943 | 1.03M | COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_vec_del_cond_filtered); |
944 | 1.03M | COUNTER_UPDATE(local_state->_conditions_filtered_counter, stats.rows_conditions_filtered); |
945 | 1.03M | COUNTER_UPDATE(local_state->_key_range_filtered_counter, stats.rows_key_range_filtered); |
946 | 1.03M | COUNTER_UPDATE(local_state->_total_pages_num_counter, stats.total_pages_num); |
947 | 1.03M | COUNTER_UPDATE(local_state->_cached_pages_num_counter, stats.cached_pages_num); |
948 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_filter_counter, stats.rows_inverted_index_filtered); |
949 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_filter_timer, stats.inverted_index_filter_timer); |
950 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_query_cache_hit_counter, |
951 | 1.03M | stats.inverted_index_query_cache_hit); |
952 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_query_cache_miss_counter, |
953 | 1.03M | stats.inverted_index_query_cache_miss); |
954 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_query_timer, stats.inverted_index_query_timer); |
955 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_query_null_bitmap_timer, |
956 | 1.03M | stats.inverted_index_query_null_bitmap_timer); |
957 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_query_bitmap_copy_timer, |
958 | 1.03M | stats.inverted_index_query_bitmap_copy_timer); |
959 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_searcher_open_timer, |
960 | 1.03M | stats.inverted_index_searcher_open_timer); |
961 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_searcher_search_timer, |
962 | 1.03M | stats.inverted_index_searcher_search_timer); |
963 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_searcher_search_init_timer, |
964 | 1.03M | stats.inverted_index_searcher_search_init_timer); |
965 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_searcher_search_exec_timer, |
966 | 1.03M | stats.inverted_index_searcher_search_exec_timer); |
967 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_hit_counter, |
968 | 1.03M | stats.inverted_index_searcher_cache_hit); |
969 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_miss_counter, |
970 | 1.03M | stats.inverted_index_searcher_cache_miss); |
971 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_downgrade_count_counter, |
972 | 1.03M | stats.inverted_index_downgrade_count); |
973 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_analyzer_timer, |
974 | 1.03M | stats.inverted_index_analyzer_timer); |
975 | 1.03M | COUNTER_UPDATE(local_state->_inverted_index_lookup_timer, stats.inverted_index_lookup_timer); |
976 | 1.03M | COUNTER_UPDATE(local_state->_variant_scan_sparse_column_timer, |
977 | 1.03M | stats.variant_scan_sparse_column_timer_ns); |
978 | 1.03M | COUNTER_UPDATE(local_state->_variant_scan_sparse_column_bytes, |
979 | 1.03M | stats.variant_scan_sparse_column_bytes); |
980 | 1.03M | COUNTER_UPDATE(local_state->_variant_fill_path_from_sparse_column_timer, |
981 | 1.03M | stats.variant_fill_path_from_sparse_column_timer_ns); |
982 | 1.03M | COUNTER_UPDATE(local_state->_variant_subtree_default_iter_count, |
983 | 1.03M | stats.variant_subtree_default_iter_count); |
984 | 1.03M | COUNTER_UPDATE(local_state->_variant_subtree_leaf_iter_count, |
985 | 1.03M | stats.variant_subtree_leaf_iter_count); |
986 | 1.03M | COUNTER_UPDATE(local_state->_variant_subtree_hierarchical_iter_count, |
987 | 1.03M | stats.variant_subtree_hierarchical_iter_count); |
988 | 1.03M | COUNTER_UPDATE(local_state->_variant_subtree_sparse_iter_count, |
989 | 1.03M | stats.variant_subtree_sparse_iter_count); |
990 | 1.03M | COUNTER_UPDATE(local_state->_variant_doc_value_column_iter_count, |
991 | 1.03M | stats.variant_doc_value_column_iter_count); |
992 | | |
993 | 1.03M | if (stats.adaptive_batch_size_predict_max_rows > 0) { |
994 | 777k | local_state->_adaptive_batch_predict_min_rows_counter->set( |
995 | 777k | stats.adaptive_batch_size_predict_min_rows); |
996 | 777k | local_state->_adaptive_batch_predict_max_rows_counter->set( |
997 | 777k | stats.adaptive_batch_size_predict_max_rows); |
998 | 777k | } |
999 | | |
1000 | 1.03M | InvertedIndexProfileReporter inverted_index_profile; |
1001 | 1.03M | inverted_index_profile.update(local_state->_index_filter_profile.get(), |
1002 | 1.03M | &stats.inverted_index_stats); |
1003 | | |
1004 | 1.03M | if (has_file_cache_statistics(stats.file_cache_stats)) { |
1005 | 0 | io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get()); |
1006 | 0 | cache_profile.update(&stats.file_cache_stats); |
1007 | 0 | _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache( |
1008 | 0 | stats.file_cache_stats.bytes_write_into_cache); |
1009 | 0 | } |
1010 | 1.03M | COUNTER_UPDATE(local_state->_output_index_result_column_timer, |
1011 | 1.03M | stats.output_index_result_column_timer); |
1012 | 1.03M | COUNTER_UPDATE(local_state->_filtered_segment_counter, stats.filtered_segment_number); |
1013 | 1.03M | COUNTER_UPDATE(local_state->_total_segment_counter, stats.total_segment_number); |
1014 | 1.03M | COUNTER_UPDATE(local_state->_condition_cache_hit_counter, stats.condition_cache_hit_seg_nums); |
1015 | 1.03M | COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, |
1016 | 1.03M | stats.condition_cache_filtered_rows); |
1017 | | |
1018 | 1.03M | COUNTER_UPDATE(local_state->_tablet_reader_init_timer, stats.tablet_reader_init_timer_ns); |
1019 | 1.03M | COUNTER_UPDATE(local_state->_tablet_reader_capture_rs_readers_timer, |
1020 | 1.03M | stats.tablet_reader_capture_rs_readers_timer_ns); |
1021 | 1.03M | COUNTER_UPDATE(local_state->_tablet_reader_init_return_columns_timer, |
1022 | 1.03M | stats.tablet_reader_init_return_columns_timer_ns); |
1023 | 1.03M | COUNTER_UPDATE(local_state->_tablet_reader_init_keys_param_timer, |
1024 | 1.03M | stats.tablet_reader_init_keys_param_timer_ns); |
1025 | 1.03M | COUNTER_UPDATE(local_state->_tablet_reader_init_orderby_keys_param_timer, |
1026 | 1.03M | stats.tablet_reader_init_orderby_keys_param_timer_ns); |
1027 | 1.03M | COUNTER_UPDATE(local_state->_tablet_reader_init_conditions_param_timer, |
1028 | 1.03M | stats.tablet_reader_init_conditions_param_timer_ns); |
1029 | 1.03M | COUNTER_UPDATE(local_state->_tablet_reader_init_delete_condition_param_timer, |
1030 | 1.03M | stats.tablet_reader_init_delete_condition_param_timer_ns); |
1031 | 1.03M | COUNTER_UPDATE(local_state->_block_reader_vcollect_iter_init_timer, |
1032 | 1.03M | stats.block_reader_vcollect_iter_init_timer_ns); |
1033 | 1.03M | COUNTER_UPDATE(local_state->_block_reader_rs_readers_init_timer, |
1034 | 1.03M | stats.block_reader_rs_readers_init_timer_ns); |
1035 | 1.03M | COUNTER_UPDATE(local_state->_block_reader_build_heap_init_timer, |
1036 | 1.03M | stats.block_reader_build_heap_init_timer_ns); |
1037 | | |
1038 | 1.03M | COUNTER_UPDATE(local_state->_rowset_reader_get_segment_iterators_timer, |
1039 | 1.03M | stats.rowset_reader_get_segment_iterators_timer_ns); |
1040 | 1.03M | COUNTER_UPDATE(local_state->_rowset_reader_create_iterators_timer, |
1041 | 1.03M | stats.rowset_reader_create_iterators_timer_ns); |
1042 | 1.03M | COUNTER_UPDATE(local_state->_rowset_reader_init_iterators_timer, |
1043 | 1.03M | stats.rowset_reader_init_iterators_timer_ns); |
1044 | 1.03M | COUNTER_UPDATE(local_state->_rowset_reader_load_segments_timer, |
1045 | 1.03M | stats.rowset_reader_load_segments_timer_ns); |
1046 | | |
1047 | 1.03M | COUNTER_UPDATE(local_state->_segment_iterator_init_timer, stats.segment_iterator_init_timer_ns); |
1048 | 1.03M | COUNTER_UPDATE(local_state->_segment_iterator_init_return_column_iterators_timer, |
1049 | 1.03M | stats.segment_iterator_init_return_column_iterators_timer_ns); |
1050 | 1.03M | COUNTER_UPDATE(local_state->_segment_iterator_init_index_iterators_timer, |
1051 | 1.03M | stats.segment_iterator_init_index_iterators_timer_ns); |
1052 | 1.03M | COUNTER_UPDATE(local_state->_segment_iterator_init_segment_prefetchers_timer, |
1053 | 1.03M | stats.segment_iterator_init_segment_prefetchers_timer_ns); |
1054 | | |
1055 | 1.03M | COUNTER_UPDATE(local_state->_segment_create_column_readers_timer, |
1056 | 1.03M | stats.segment_create_column_readers_timer_ns); |
1057 | 1.03M | COUNTER_UPDATE(local_state->_segment_load_index_timer, stats.segment_load_index_timer_ns); |
1058 | | |
1059 | | // Update metrics |
1060 | 1.03M | DorisMetrics::instance()->query_scan_bytes->increment( |
1061 | 1.03M | local_state->_read_uncompressed_counter->value()); |
1062 | 1.03M | DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value()); |
1063 | 1.03M | auto& tablet = _tablet_reader_params.tablet; |
1064 | 1.03M | tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value()); |
1065 | 1.03M | tablet->query_scan_rows->increment(local_state->_scan_rows->value()); |
1066 | 1.03M | tablet->query_scan_count->increment(1); |
1067 | | |
1068 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_search_filter_counter, |
1069 | 1.03M | stats.rows_ann_index_range_filtered); |
1070 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_filter_counter, stats.rows_ann_index_topn_filtered); |
1071 | 1.03M | COUNTER_UPDATE(local_state->_ann_index_load_costs, stats.ann_index_load_ns); |
1072 | 1.03M | COUNTER_UPDATE(local_state->_ann_ivf_on_disk_load_costs, stats.ann_ivf_on_disk_load_ns); |
1073 | 1.03M | COUNTER_UPDATE(local_state->_ann_ivf_on_disk_cache_hit_cnt, |
1074 | 1.03M | stats.ann_ivf_on_disk_cache_hit_cnt); |
1075 | 1.03M | COUNTER_UPDATE(local_state->_ann_ivf_on_disk_cache_miss_cnt, |
1076 | 1.03M | stats.ann_ivf_on_disk_cache_miss_cnt); |
1077 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_search_costs, stats.ann_index_range_search_ns); |
1078 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_search_cnt, stats.ann_index_range_search_cnt); |
1079 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_engine_search_costs, stats.ann_range_engine_search_ns); |
1080 | | // Engine prepare before search |
1081 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_pre_process_costs, stats.ann_range_pre_process_ns); |
1082 | | // Post process parent: Doris result process + engine convert |
1083 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_post_process_costs, |
1084 | 1.03M | stats.ann_range_result_convert_ns + stats.ann_range_engine_convert_ns); |
1085 | | // Engine convert (child under post-process) |
1086 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_engine_convert_costs, stats.ann_range_engine_convert_ns); |
1087 | | // Doris-side result convert (child under post-process) |
1088 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_result_convert_costs, stats.ann_range_result_convert_ns); |
1089 | | |
1090 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_search_costs, stats.ann_topn_search_ns); |
1091 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_search_cnt, stats.ann_index_topn_search_cnt); |
1092 | 1.03M | COUNTER_UPDATE(local_state->_ann_cache_hit_cnt, stats.ann_index_cache_hits); |
1093 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_cache_hit_cnt, stats.ann_index_range_cache_hits); |
1094 | | |
1095 | | // Detailed ANN timers |
1096 | | // ANN TopN timers with hierarchy |
1097 | | // Engine search time (FAISS) |
1098 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_engine_search_costs, |
1099 | 1.03M | stats.ann_index_topn_engine_search_ns); |
1100 | | // Engine prepare time (allocations/buffer setup before search) |
1101 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_pre_process_costs, |
1102 | 1.03M | stats.ann_index_topn_engine_prepare_ns); |
1103 | | // Post process parent includes Doris result processing + engine convert |
1104 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_post_process_costs, |
1105 | 1.03M | stats.ann_index_topn_result_process_ns + stats.ann_index_topn_engine_convert_ns); |
1106 | | // Engine-side conversion time inside FAISS wrappers (child under post-process) |
1107 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_engine_convert_costs, |
1108 | 1.03M | stats.ann_index_topn_engine_convert_ns); |
1109 | | |
1110 | | // Doris-side result convert costs (show separately as another child counter); use pure process time |
1111 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_result_convert_costs, |
1112 | 1.03M | stats.ann_index_topn_result_process_ns); |
1113 | | |
1114 | 1.03M | COUNTER_UPDATE(local_state->_ann_fallback_brute_force_cnt, stats.ann_fall_back_brute_force_cnt); |
1115 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_fallback_by_small_candidate_cnt, |
1116 | 1.03M | stats.ann_topn_fallback_by_small_candidate_cnt); |
1117 | 1.03M | COUNTER_UPDATE(local_state->_ann_topn_fallback_small_candidate_rows, |
1118 | 1.03M | stats.ann_topn_fallback_small_candidate_rows); |
1119 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_fallback_by_small_candidate_cnt, |
1120 | 1.03M | stats.ann_range_fallback_by_small_candidate_cnt); |
1121 | 1.03M | COUNTER_UPDATE(local_state->_ann_range_fallback_small_candidate_rows, |
1122 | 1.03M | stats.ann_range_fallback_small_candidate_rows); |
1123 | | |
1124 | | // Overhead counter removed; precise instrumentation is reported via engine_prepare above. |
1125 | 1.03M | } |
1126 | | |
1127 | | #ifndef NDEBUG |
1128 | 1.32M | Status OlapScanner::_check_ann_cache_hit_debug_points(const OlapReaderStatistics& stats) { |
1129 | 1.32M | DBUG_EXECUTE_IF("olap_scanner.ann_topn_cache_hits", { |
1130 | 1.32M | auto expected_hits = dp->param<int32_t>("expected_hits", -1); |
1131 | 1.32M | auto min_hits = dp->param<int32_t>("min_hits", -1); |
1132 | 1.32M | if (expected_hits >= 0 && stats.ann_index_cache_hits != expected_hits) { |
1133 | 1.32M | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1134 | 1.32M | "ann_index_cache_hits: {} not equal to expected: {}", |
1135 | 1.32M | stats.ann_index_cache_hits, expected_hits); |
1136 | 1.32M | } |
1137 | 1.32M | if (min_hits >= 0 && stats.ann_index_cache_hits < min_hits) { |
1138 | 1.32M | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1139 | 1.32M | "ann_index_cache_hits: {} less than expected min: {}", |
1140 | 1.32M | stats.ann_index_cache_hits, min_hits); |
1141 | 1.32M | } |
1142 | 1.32M | }) |
1143 | 1.32M | DBUG_EXECUTE_IF("olap_scanner.ann_range_cache_hits", { |
1144 | 1.32M | auto expected_hits = dp->param<int32_t>("expected_hits", -1); |
1145 | 1.32M | auto min_hits = dp->param<int32_t>("min_hits", -1); |
1146 | 1.32M | if (expected_hits >= 0 && stats.ann_index_range_cache_hits != expected_hits) { |
1147 | 1.32M | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1148 | 1.32M | "ann_index_range_cache_hits: {} not equal to expected: {}", |
1149 | 1.32M | stats.ann_index_range_cache_hits, expected_hits); |
1150 | 1.32M | } |
1151 | 1.32M | if (min_hits >= 0 && stats.ann_index_range_cache_hits < min_hits) { |
1152 | 1.32M | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1153 | 1.32M | "ann_index_range_cache_hits: {} less than expected min: {}", |
1154 | 1.32M | stats.ann_index_range_cache_hits, min_hits); |
1155 | 1.32M | } |
1156 | 1.32M | }) |
1157 | 1.32M | return Status::OK(); |
1158 | 1.32M | } |
1159 | | #endif |
1160 | | |
1161 | | #include "common/compile_check_avoid_end.h" |
1162 | | } // namespace doris |