be/src/exec/scan/file_scanner_v2.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/file_scanner_v2.h" |
19 | | |
20 | | #include <gen_cpp/Exprs_types.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <map> |
25 | | #include <memory> |
26 | | #include <optional> |
27 | | #include <string> |
28 | | #include <utility> |
29 | | |
30 | | #include "common/cast_set.h" |
31 | | #include "common/config.h" |
32 | | #include "common/consts.h" |
33 | | #include "common/status.h" |
34 | | #include "core/assert_cast.h" |
35 | | #include "core/block/column_with_type_and_name.h" |
36 | | #include "core/column/column.h" |
37 | | #include "core/data_type/data_type.h" |
38 | | #include "core/data_type/data_type_nullable.h" |
39 | | #include "core/data_type_serde/data_type_serde.h" |
40 | | #include "core/string_ref.h" |
41 | | #include "exec/common/util.hpp" |
42 | | #include "exec/operator/scan_operator.h" |
43 | | #include "exec/scan/access_path_parser.h" |
44 | | #include "exprs/runtime_filter_expr.h" |
45 | | #include "exprs/vexpr.h" |
46 | | #include "exprs/vexpr_context.h" |
47 | | #include "exprs/vslot_ref.h" |
48 | | #include "format/format_common.h" |
49 | | #include "format_v2/column_mapper.h" |
50 | | #include "format_v2/jni/iceberg_sys_table_reader.h" |
51 | | #include "format_v2/jni/jdbc_reader.h" |
52 | | #include "format_v2/jni/max_compute_jni_reader.h" |
53 | | #include "format_v2/jni/trino_connector_jni_reader.h" |
54 | | #include "format_v2/table/hive_reader.h" |
55 | | #include "format_v2/table/hudi_reader.h" |
56 | | #include "format_v2/table/iceberg_reader.h" |
57 | | #include "format_v2/table/paimon_reader.h" |
58 | | #include "format_v2/table/remote_doris_reader.h" |
59 | | #include "format_v2/table_reader.h" |
60 | | #include "io/fs/file_meta_cache.h" |
61 | | #include "io/io_common.h" |
62 | | #include "runtime/descriptors.h" |
63 | | #include "runtime/exec_env.h" |
64 | | #include "runtime/runtime_state.h" |
65 | | #include "service/backend_options.h" |
66 | | #include "storage/id_manager.h" |
67 | | |
68 | | namespace doris { |
69 | | namespace { |
70 | | |
71 | 971 | std::string table_format_name(const TFileRangeDesc& range) { |
72 | 971 | return range.__isset.table_format_params ? range.table_format_params.table_format_type |
73 | 971 | : "NotSet"; |
74 | 971 | } |
75 | | |
76 | | TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params, |
77 | 2.00k | const TFileRangeDesc& range) { |
78 | 2.00k | return range.__isset.format_type ? range.format_type : params.format_type; |
79 | 2.00k | } |
80 | | |
81 | 497 | bool is_supported_table_format(const TFileRangeDesc& range) { |
82 | 497 | const auto table_format = table_format_name(range); |
83 | 497 | if (table_format == "hudi" && range.__isset.table_format_params && |
84 | 497 | range.table_format_params.__isset.hudi_params && |
85 | 497 | range.table_format_params.hudi_params.__isset.delta_logs && |
86 | 497 | !range.table_format_params.hudi_params.delta_logs.empty()) { |
87 | | // Hudi MOR splits need log-file merge semantics and must stay on the existing JNI path. |
88 | | // FileScannerV2 currently supports native Parquet data files only. |
89 | 1 | return false; |
90 | 1 | } |
91 | 496 | return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" || |
92 | 496 | table_format == "iceberg" || table_format == "paimon" || table_format == "hudi"; |
93 | 497 | } |
94 | | |
95 | 3 | bool is_supported_arrow_table_format(const TFileRangeDesc& range) { |
96 | 3 | return table_format_name(range) == "remote_doris"; |
97 | 3 | } |
98 | | |
99 | 5 | bool is_supported_jni_table_format(const TFileRangeDesc& range) { |
100 | 5 | const auto table_format = table_format_name(range); |
101 | 5 | if (table_format == "paimon") { |
102 | 0 | return range.__isset.table_format_params && |
103 | 0 | range.table_format_params.__isset.paimon_params && |
104 | 0 | range.table_format_params.paimon_params.__isset.reader_type && |
105 | 0 | range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI; |
106 | 0 | } |
107 | 5 | return table_format == "jdbc" || table_format == "iceberg" || table_format == "hudi" || |
108 | 5 | table_format == "max_compute" || table_format == "trino_connector"; |
109 | 5 | } |
110 | | |
111 | 472 | bool is_csv_format(TFileFormatType::type format_type) { |
112 | 472 | switch (format_type) { |
113 | 338 | case TFileFormatType::FORMAT_CSV_PLAIN: |
114 | 339 | case TFileFormatType::FORMAT_CSV_GZ: |
115 | 340 | case TFileFormatType::FORMAT_CSV_BZ2: |
116 | 341 | case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
117 | 342 | case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
118 | 343 | case TFileFormatType::FORMAT_CSV_LZOP: |
119 | 344 | case TFileFormatType::FORMAT_CSV_DEFLATE: |
120 | 345 | case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
121 | 346 | case TFileFormatType::FORMAT_PROTO: |
122 | 346 | return true; |
123 | 126 | default: |
124 | 126 | return false; |
125 | 472 | } |
126 | 472 | } |
127 | | |
128 | 126 | bool is_text_format(TFileFormatType::type format_type) { |
129 | 126 | return format_type == TFileFormatType::FORMAT_TEXT; |
130 | 126 | } |
131 | | |
132 | 124 | bool is_json_format(TFileFormatType::type format_type) { |
133 | 124 | return format_type == TFileFormatType::FORMAT_JSON; |
134 | 124 | } |
135 | | |
136 | 104 | bool is_native_format(TFileFormatType::type format_type) { |
137 | 104 | return format_type == TFileFormatType::FORMAT_NATIVE; |
138 | 104 | } |
139 | | |
140 | 3.07k | bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { |
141 | 3.07k | if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || |
142 | 3.07k | column_name == BeConsts::ICEBERG_ROWID_COL) { |
143 | 11 | return false; |
144 | 11 | } |
145 | 3.05k | return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY |
146 | 3.05k | : !slot_info.is_file_slot; |
147 | 3.07k | } |
148 | | |
149 | 3.07k | bool is_data_file_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { |
150 | 3.07k | if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || |
151 | 3.07k | column_name == BeConsts::ICEBERG_ROWID_COL) { |
152 | 11 | return false; |
153 | 11 | } |
154 | | // CSV and other non-self-describing formats need FE slot descriptors for only the columns that |
155 | | // are physically read from the file. Partition/default/virtual columns stay in TableReader's |
156 | | // mapping layer and are materialized after the file-local block is read. New FE provides an |
157 | | // explicit category; old FE falls back to `is_file_slot`. |
158 | 3.06k | if (slot_info.__isset.category) { |
159 | 3.05k | return slot_info.category == TColumnCategory::REGULAR || |
160 | 3.05k | slot_info.category == TColumnCategory::GENERATED; |
161 | 3.05k | } |
162 | 2 | return slot_info.is_file_slot; |
163 | 3.06k | } |
164 | | |
165 | | Status rewrite_slot_refs_to_global_index( |
166 | | VExprSPtr* expr, |
167 | 29 | const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) { |
168 | 29 | DORIS_CHECK(expr != nullptr); |
169 | 29 | if (*expr == nullptr) { |
170 | 0 | return Status::OK(); |
171 | 0 | } |
172 | 29 | if (auto* runtime_filter = dynamic_cast<RuntimeFilterExpr*>(expr->get()); |
173 | 29 | runtime_filter != nullptr) { |
174 | 1 | auto impl = runtime_filter->get_impl(); |
175 | 1 | DORIS_CHECK(impl != nullptr); |
176 | 1 | RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&impl, slot_id_to_global_index)); |
177 | 1 | runtime_filter->set_impl(std::move(impl)); |
178 | 1 | return Status::OK(); |
179 | 1 | } |
180 | 28 | if ((*expr)->is_slot_ref()) { |
181 | 13 | const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get()); |
182 | 13 | const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id()); |
183 | 13 | if (global_index_it == slot_id_to_global_index.end()) { |
184 | 1 | DORIS_CHECK(slot_ref->slot_id() >= 0); |
185 | 1 | const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id())); |
186 | 1 | *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()), |
187 | 1 | cast_set<int>(global_index.value()), -1, |
188 | 1 | slot_ref->data_type(), slot_ref->column_name()); |
189 | 1 | RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr)); |
190 | 1 | return Status::OK(); |
191 | 1 | } |
192 | 12 | const auto global_index = global_index_it->second; |
193 | 12 | *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()), |
194 | 12 | cast_set<int>(global_index.value()), -1, |
195 | 12 | slot_ref->data_type(), slot_ref->column_name()); |
196 | 12 | RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr)); |
197 | 12 | return Status::OK(); |
198 | 12 | } |
199 | 15 | auto children = (*expr)->children(); |
200 | 15 | for (auto& child : children) { |
201 | 15 | if (child == nullptr) { |
202 | 0 | continue; |
203 | 0 | } |
204 | 15 | RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index)); |
205 | 15 | } |
206 | 15 | (*expr)->set_children(std::move(children)); |
207 | 15 | return Status::OK(); |
208 | 15 | } |
209 | | |
210 | | } // namespace |
211 | | |
212 | | #ifdef BE_TEST |
213 | | Status FileScannerV2::TEST_to_file_format(TFileFormatType::type format_type, |
214 | | format::FileFormat* file_format) { |
215 | | return _to_file_format(format_type, file_format); |
216 | | } |
217 | | |
218 | | bool FileScannerV2::TEST_is_partition_slot(const TFileScanSlotInfo& slot_info, |
219 | | const std::string& column_name) { |
220 | | return is_partition_slot(slot_info, column_name); |
221 | | } |
222 | | |
223 | | bool FileScannerV2::TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info, |
224 | | const std::string& column_name) { |
225 | | return is_data_file_slot(slot_info, column_name); |
226 | | } |
227 | | |
228 | | Status FileScannerV2::TEST_rewrite_slot_refs_to_global_index( |
229 | | VExprSPtr* expr, |
230 | | const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) { |
231 | | return rewrite_slot_refs_to_global_index(expr, slot_id_to_global_index); |
232 | | } |
233 | | #endif |
234 | | |
235 | 607 | bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) { |
236 | 607 | const auto format_type = get_range_format_type(params, range); |
237 | 607 | if (format_type == TFileFormatType::FORMAT_PARQUET) { |
238 | 126 | return is_supported_table_format(range); |
239 | 481 | } else if (format_type == TFileFormatType::FORMAT_ARROW) { |
240 | 3 | return is_supported_arrow_table_format(range); |
241 | 478 | } else if (format_type == TFileFormatType::FORMAT_JNI) { |
242 | 5 | return is_supported_jni_table_format(range); |
243 | 473 | } else if (is_csv_format(format_type) || is_text_format(format_type) || |
244 | 473 | is_json_format(format_type) || is_native_format(format_type)) { |
245 | 371 | return is_supported_table_format(range); |
246 | 371 | } else { |
247 | 102 | LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2"; |
248 | 102 | return false; |
249 | 102 | } |
250 | 607 | } |
251 | | |
252 | | FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit, |
253 | | std::shared_ptr<SplitSourceConnector> split_source, |
254 | | RuntimeProfile* profile, ShardedKVCache* kv_cache, |
255 | | const std::unordered_map<std::string, int>* colname_to_slot_id) |
256 | 469 | : Scanner(state, local_state, limit, profile), |
257 | 469 | _split_source(std::move(split_source)), |
258 | 469 | _kv_cache(kv_cache) { |
259 | 469 | (void)colname_to_slot_id; |
260 | 469 | if (state->get_query_ctx() != nullptr && |
261 | 469 | state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) { |
262 | 468 | _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]); |
263 | 468 | } else { |
264 | 1 | _params = _split_source->get_params(); |
265 | 1 | } |
266 | 469 | } |
267 | | |
268 | 468 | Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
269 | 468 | RETURN_IF_ERROR(Scanner::init(state, conjuncts)); |
270 | 468 | _get_block_timer = |
271 | 468 | ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1); |
272 | 468 | _file_counter = |
273 | 468 | ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1); |
274 | 468 | _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
275 | 468 | "FileReadBytes", TUnit::BYTES, 1); |
276 | 468 | _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
277 | 468 | "FileReadCalls", TUnit::UNIT, 1); |
278 | 468 | _file_read_time_counter = |
279 | 468 | ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1); |
280 | 468 | _adaptive_batch_predicted_rows_counter = ADD_COUNTER_WITH_LEVEL( |
281 | 468 | _local_state->scanner_profile(), "AdaptiveBatchPredictedRows", TUnit::UNIT, 1); |
282 | 468 | _adaptive_batch_actual_bytes_counter = ADD_COUNTER_WITH_LEVEL( |
283 | 468 | _local_state->scanner_profile(), "AdaptiveBatchActualBytes", TUnit::BYTES, 1); |
284 | 468 | _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL( |
285 | 468 | _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1); |
286 | 468 | _file_cache_statistics = std::make_unique<io::FileCacheStatistics>(); |
287 | 468 | _file_reader_stats = std::make_unique<io::FileReaderStats>(); |
288 | 468 | RETURN_IF_ERROR(_init_io_ctx()); |
289 | 468 | _io_ctx->file_cache_stats = _file_cache_statistics.get(); |
290 | 468 | _io_ctx->file_reader_stats = _file_reader_stats.get(); |
291 | 468 | _io_ctx->is_disposable = _state->query_options().disable_file_cache; |
292 | 468 | return Status::OK(); |
293 | 468 | } |
294 | | |
295 | 469 | Status FileScannerV2::_open_impl(RuntimeState* state) { |
296 | 469 | RETURN_IF_CANCELLED(state); |
297 | 469 | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
298 | 469 | RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range)); |
299 | 469 | if (_first_scan_range) { |
300 | 467 | RETURN_IF_ERROR(_create_table_reader_for_format(_current_range, &_table_reader)); |
301 | 467 | DORIS_CHECK(_table_reader != nullptr); |
302 | 467 | RETURN_IF_ERROR(_init_expr_ctxes()); |
303 | 467 | RETURN_IF_ERROR(_init_table_reader(_current_range)); |
304 | 467 | } |
305 | 469 | return Status::OK(); |
306 | 469 | } |
307 | | |
308 | 1.12k | Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
309 | 1.59k | while (true) { |
310 | 1.59k | RETURN_IF_CANCELLED(state); |
311 | 1.59k | if (!_has_prepared_split) { |
312 | 938 | RETURN_IF_ERROR(_prepare_next_split(eof)); |
313 | 938 | if (*eof) { |
314 | 469 | return Status::OK(); |
315 | 469 | } |
316 | 938 | } |
317 | | |
318 | 1.12k | { |
319 | 1.12k | SCOPED_TIMER(_get_block_timer); |
320 | 1.12k | if (_should_run_adaptive_batch_size()) { |
321 | 1.11k | _table_reader->set_batch_size(_predict_reader_batch_rows()); |
322 | 1.11k | } |
323 | 1.12k | RETURN_IF_ERROR(_table_reader->get_block(block, eof)); |
324 | 1.12k | } |
325 | 1.12k | if (*eof) { |
326 | 469 | _state->update_num_finished_scan_range(1); |
327 | 469 | _has_prepared_split = false; |
328 | 469 | *eof = false; |
329 | 469 | continue; |
330 | 469 | } |
331 | 657 | _update_adaptive_batch_size(*block); |
332 | 657 | return Status::OK(); |
333 | 1.12k | } |
334 | 1.12k | } |
335 | | |
336 | 938 | Status FileScannerV2::_prepare_next_split(bool* eos) { |
337 | 938 | bool has_next = _first_scan_range; |
338 | 938 | if (!_first_scan_range) { |
339 | 471 | RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range)); |
340 | 471 | } |
341 | 938 | _first_scan_range = false; |
342 | 938 | if (!has_next || _should_stop) { |
343 | 469 | *eos = true; |
344 | 469 | return Status::OK(); |
345 | 469 | } |
346 | 469 | DORIS_CHECK(_table_reader != nullptr); |
347 | 469 | _current_range_path = _current_range.path; |
348 | 469 | _init_adaptive_batch_size_state(get_range_format_type(*_params, _current_range)); |
349 | 469 | RETURN_IF_ERROR(_prepare_table_reader_split(_current_range)); |
350 | 469 | COUNTER_UPDATE(_file_counter, 1); |
351 | 469 | _has_prepared_split = true; |
352 | 469 | *eos = false; |
353 | 469 | return Status::OK(); |
354 | 469 | } |
355 | | |
356 | 467 | Status FileScannerV2::_init_table_reader(const TFileRangeDesc& range) { |
357 | 467 | const auto format_type = get_range_format_type(*_params, range); |
358 | 467 | format::FileFormat file_format; |
359 | 467 | RETURN_IF_ERROR(_to_file_format(format_type, &file_format)); |
360 | 467 | DORIS_CHECK(_table_reader != nullptr); |
361 | | |
362 | 467 | format::TableColumnPredicates table_column_predicates; |
363 | 467 | RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates)); |
364 | 467 | VExprContextSPtrs table_conjuncts; |
365 | 467 | RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts)); |
366 | 467 | RETURN_IF_ERROR(_table_reader->init({ |
367 | 467 | .projected_columns = _projected_columns, |
368 | 467 | .column_predicates = std::move(table_column_predicates), |
369 | 467 | .conjuncts = std::move(table_conjuncts), |
370 | 467 | .format = file_format, |
371 | 467 | .scan_params = const_cast<TFileScanRangeParams*>(_params), |
372 | 467 | .io_ctx = _io_ctx, |
373 | 467 | .runtime_state = _state, |
374 | 467 | .scanner_profile = _local_state->scanner_profile(), |
375 | 467 | .file_slot_descs = &_file_slot_descs, |
376 | 467 | .push_down_agg_type = _local_state->get_push_down_agg_type(), |
377 | 467 | .condition_cache_digest = _local_state->get_condition_cache_digest(), |
378 | 467 | })); |
379 | 467 | return Status::OK(); |
380 | 467 | } |
381 | | |
382 | | Status FileScannerV2::_create_table_reader_for_format( |
383 | 467 | const TFileRangeDesc& range, std::unique_ptr<format::TableReader>* reader) const { |
384 | 467 | DORIS_CHECK(reader != nullptr); |
385 | 467 | const auto table_format = table_format_name(range); |
386 | 467 | if (table_format == "NotSet" || table_format == "tvf") { |
387 | 465 | *reader = std::make_unique<format::TableReader>(); |
388 | 465 | } else if (table_format == "hive") { |
389 | 0 | *reader = format::hive::HiveReader::create_unique(); |
390 | 2 | } else if (table_format == "iceberg") { |
391 | 0 | if (get_range_format_type(*_params, range) == TFileFormatType::FORMAT_JNI) { |
392 | 0 | *reader = std::make_unique<format::iceberg::IcebergSysTableJniReader>(); |
393 | 0 | } else { |
394 | 0 | *reader = std::make_unique<format::iceberg::IcebergTableReader>(); |
395 | 0 | } |
396 | 2 | } else if (table_format == "paimon") { |
397 | 0 | *reader = std::make_unique<format::paimon::PaimonHybridReader>(); |
398 | 2 | } else if (table_format == "hudi") { |
399 | 0 | *reader = std::make_unique<format::hudi::HudiHybridReader>(); |
400 | 2 | } else if (table_format == "jdbc") { |
401 | 2 | *reader = std::make_unique<format::jdbc::JdbcJniReader>(); |
402 | 2 | } else if (table_format == "max_compute") { |
403 | 0 | const auto* mc_desc = |
404 | 0 | static_cast<const MaxComputeTableDescriptor*>(_output_tuple_desc->table_desc()); |
405 | 0 | RETURN_IF_ERROR(mc_desc->init_status()); |
406 | 0 | *reader = std::make_unique<format::max_compute::MaxComputeJniReader>(mc_desc); |
407 | 0 | } else if (table_format == "trino_connector") { |
408 | 0 | *reader = std::make_unique<format::trino_connector::TrinoConnectorJniReader>(); |
409 | 0 | } else if (table_format == "remote_doris") { |
410 | 0 | *reader = std::make_unique<format::remote_doris::RemoteDorisReader>(); |
411 | 0 | } else { |
412 | 0 | return Status::NotSupported("FileScannerV2 does not support table format {}", table_format); |
413 | 0 | } |
414 | 467 | return Status::OK(); |
415 | 467 | } |
416 | | |
417 | 466 | Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) { |
418 | 466 | std::map<std::string, Field> partition_values; |
419 | 466 | RETURN_IF_ERROR(_generate_partition_values(range, &partition_values)); |
420 | 466 | format::FileFormat current_split_format; |
421 | 466 | RETURN_IF_ERROR(_to_file_format(get_range_format_type(*_params, range), ¤t_split_format)); |
422 | 466 | RETURN_IF_ERROR(_table_reader->prepare_split({ |
423 | 466 | .partition_values = std::move(partition_values), |
424 | 466 | .cache = _kv_cache, |
425 | 466 | .current_range = range, |
426 | 466 | .current_split_format = current_split_format, |
427 | 466 | .global_rowid_context = _create_global_rowid_context(range), |
428 | 466 | })); |
429 | 466 | return Status::OK(); |
430 | 466 | } |
431 | | |
432 | 9 | bool FileScannerV2::_should_enable_file_meta_cache() const { |
433 | 9 | return ExecEnv::GetInstance()->file_meta_cache()->enabled() && |
434 | 9 | _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3; |
435 | 9 | } |
436 | | |
437 | | std::optional<format::GlobalRowIdContext> FileScannerV2::_create_global_rowid_context( |
438 | 467 | const TFileRangeDesc& range) const { |
439 | 467 | if (!_need_global_rowid_column) { |
440 | 459 | return std::nullopt; |
441 | 459 | } |
442 | 8 | auto& id_file_map = _state->get_id_file_map(); |
443 | 8 | DORIS_CHECK(id_file_map != nullptr); |
444 | 8 | const auto file_id = id_file_map->get_file_mapping_id( |
445 | 8 | std::make_shared<FileMapping>(_local_state->cast<FileScanLocalState>().parent_id(), |
446 | 8 | range, _should_enable_file_meta_cache())); |
447 | 8 | return format::GlobalRowIdContext { |
448 | 8 | .version = IdManager::ID_VERSION, |
449 | 8 | .backend_id = BackendOptions::get_backend_id(), |
450 | 8 | .file_id = file_id, |
451 | 8 | }; |
452 | 467 | } |
453 | | |
454 | | Status FileScannerV2::_generate_partition_values( |
455 | 467 | const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const { |
456 | 467 | DORIS_CHECK(partition_values != nullptr); |
457 | 467 | partition_values->clear(); |
458 | 467 | if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) { |
459 | 467 | return Status::OK(); |
460 | 467 | } |
461 | 0 | DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size()); |
462 | 0 | for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) { |
463 | 0 | const auto& key = range.columns_from_path_keys[idx]; |
464 | 0 | const auto it = _partition_slot_descs.find(key); |
465 | 0 | if (it == _partition_slot_descs.end()) { |
466 | 0 | continue; |
467 | 0 | } |
468 | 0 | const auto& value = range.columns_from_path[idx]; |
469 | 0 | const bool is_null = range.__isset.columns_from_path_is_null && |
470 | 0 | idx < range.columns_from_path_is_null.size() && |
471 | 0 | range.columns_from_path_is_null[idx]; |
472 | 0 | Field field; |
473 | 0 | DORIS_CHECK(it->second.slot_desc != nullptr); |
474 | 0 | RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field)); |
475 | 0 | partition_values->emplace(it->second.canonical_name, std::move(field)); |
476 | 0 | } |
477 | 0 | return Status::OK(); |
478 | 0 | } |
479 | | |
480 | | Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc, |
481 | | const std::string& value, bool is_null, |
482 | 0 | Field* field) const { |
483 | 0 | DORIS_CHECK(slot_desc != nullptr); |
484 | 0 | DORIS_CHECK(field != nullptr); |
485 | 0 | if (is_null) { |
486 | 0 | *field = Field::create_field<TYPE_NULL>(Null()); |
487 | 0 | return Status::OK(); |
488 | 0 | } |
489 | 0 | const auto data_type = remove_nullable(slot_desc->get_data_type_ptr()); |
490 | 0 | auto column = data_type->create_column(); |
491 | 0 | auto serde = data_type->get_serde(); |
492 | 0 | DataTypeSerDe::FormatOptions options; |
493 | 0 | options.converted_from_string = true; |
494 | 0 | StringRef ref(value.data(), value.size()); |
495 | 0 | RETURN_IF_ERROR(serde->from_string(ref, *column, options)); |
496 | 0 | DORIS_CHECK(column->size() == 1); |
497 | 0 | *field = (*column)[0]; |
498 | 0 | return Status::OK(); |
499 | 0 | } |
500 | | |
501 | 467 | Status FileScannerV2::_init_expr_ctxes() { |
502 | 467 | _slot_id_to_desc.clear(); |
503 | 467 | _slot_id_to_global_index.clear(); |
504 | 467 | _partition_slot_descs.clear(); |
505 | 467 | _file_slot_descs.clear(); |
506 | 3.06k | for (const auto* slot_desc : _output_tuple_desc->slots()) { |
507 | 3.06k | _slot_id_to_desc.emplace(slot_desc->id(), slot_desc); |
508 | 3.06k | } |
509 | 467 | DORIS_CHECK(_table_reader != nullptr); |
510 | 467 | RETURN_IF_ERROR(_build_projected_columns(*_table_reader)); |
511 | 467 | return Status::OK(); |
512 | 467 | } |
513 | | |
514 | 467 | Status FileScannerV2::_build_projected_columns(const format::TableReader& table_reader) { |
515 | 467 | _projected_columns.clear(); |
516 | 467 | _projected_columns.reserve(_params->required_slots.size()); |
517 | 467 | _need_global_rowid_column = false; |
518 | 467 | format::ProjectedColumnBuildContext build_context { |
519 | 467 | .scan_params = _params, |
520 | 467 | .range = &_current_range, |
521 | 467 | .runtime_state = _state, |
522 | 467 | }; |
523 | | |
524 | 3.53k | for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) { |
525 | 3.06k | const auto& slot_info = _params->required_slots[slot_idx]; |
526 | 3.06k | const auto it = _slot_id_to_desc.find(slot_info.slot_id); |
527 | 3.06k | if (it == _slot_id_to_desc.end()) { |
528 | 0 | return Status::InternalError("Unknown source slot descriptor, slot_id={}", |
529 | 0 | slot_info.slot_id); |
530 | 0 | } |
531 | 3.06k | auto column = _build_table_column(it->second); |
532 | 3.06k | if (column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) { |
533 | 9 | _need_global_rowid_column = true; |
534 | 9 | } |
535 | 3.06k | RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr)); |
536 | 3.06k | build_context.schema_column.reset(); |
537 | 3.06k | RETURN_IF_ERROR(table_reader.annotate_projected_column(slot_info, &build_context, &column)); |
538 | | // Build nested children from access paths generated by the slot's access-path |
539 | | // expressions. A projected column can therefore contain only a subset of the schema |
540 | | // column's nested children. |
541 | 3.06k | RETURN_IF_ERROR(AccessPathParser::build_nested_children( |
542 | 3.06k | &column, it->second, |
543 | 3.06k | build_context.schema_column.has_value() ? &*build_context.schema_column : nullptr)); |
544 | 3.06k | if (is_partition_slot(slot_info, column.name)) { |
545 | 0 | column.is_partition_key = true; |
546 | 0 | _partition_slot_descs.emplace( |
547 | 0 | column.name, |
548 | 0 | PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name}); |
549 | 0 | for (const auto& alias : column.name_mapping) { |
550 | 0 | _partition_slot_descs.emplace( |
551 | 0 | alias, |
552 | 0 | PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name}); |
553 | 0 | } |
554 | 3.06k | } else if (is_data_file_slot(slot_info, column.name)) { |
555 | 3.05k | _file_slot_descs.push_back(const_cast<SlotDescriptor*>(it->second)); |
556 | 3.05k | } |
557 | 3.06k | const auto global_index = format::GlobalIndex(slot_idx); |
558 | 3.06k | _slot_id_to_global_index.emplace(slot_info.slot_id, global_index); |
559 | 3.06k | _projected_columns.push_back(std::move(column)); |
560 | 3.06k | } |
561 | 467 | RETURN_IF_ERROR(table_reader.validate_projected_columns(build_context)); |
562 | 467 | return Status::OK(); |
563 | 467 | } |
564 | | |
565 | | Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info, |
566 | 3.06k | VExprContextSPtr* ctx) const { |
567 | 3.06k | DORIS_CHECK(ctx != nullptr); |
568 | 3.06k | if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) { |
569 | 3.05k | return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx); |
570 | 3.05k | } |
571 | | |
572 | 10 | if (_params->__isset.default_value_of_src_slot) { |
573 | 10 | const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id); |
574 | 10 | if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) { |
575 | 0 | return VExpr::create_expr_tree(it->second, *ctx); |
576 | 0 | } |
577 | 10 | } |
578 | 10 | return Status::OK(); |
579 | 10 | } |
580 | | |
581 | 3.06k | format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) { |
582 | 3.06k | DORIS_CHECK(slot_desc != nullptr); |
583 | 3.06k | format::ColumnDefinition column; |
584 | | // TODO(gabriel): why always BY_NAME here? |
585 | 3.06k | column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name()); |
586 | 3.06k | column.name = slot_desc->col_name(); |
587 | 3.06k | column.type = slot_desc->get_data_type_ptr(); |
588 | 3.06k | return column; |
589 | 3.06k | } |
590 | | |
591 | | Status FileScannerV2::_build_table_column_predicates( |
592 | 467 | format::TableColumnPredicates* predicates) const { |
593 | 467 | DORIS_CHECK(predicates != nullptr); |
594 | 467 | predicates->clear(); |
595 | 467 | const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates; |
596 | 3.06k | for (const auto& [slot_id, slot_predicate_list] : slot_predicates) { |
597 | 3.06k | const auto it = _slot_id_to_desc.find(slot_id); |
598 | 3.06k | if (it == _slot_id_to_desc.end()) { |
599 | 0 | continue; |
600 | 0 | } |
601 | 3.06k | const auto global_index_it = _slot_id_to_global_index.find(slot_id); |
602 | 3.06k | if (global_index_it == _slot_id_to_global_index.end()) { |
603 | 0 | continue; |
604 | 0 | } |
605 | 3.06k | (*predicates)[global_index_it->second] = slot_predicate_list; |
606 | 3.06k | } |
607 | 467 | return Status::OK(); |
608 | 467 | } |
609 | | |
610 | 467 | Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const { |
611 | 467 | DORIS_CHECK(conjuncts != nullptr); |
612 | 467 | conjuncts->clear(); |
613 | 467 | conjuncts->reserve(_conjuncts.size()); |
614 | 467 | for (const auto& conjunct : _conjuncts) { |
615 | 9 | VExprSPtr root; |
616 | 9 | RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root)); |
617 | 9 | RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index)); |
618 | 9 | conjuncts->push_back(VExprContext::create_shared(std::move(root))); |
619 | 9 | } |
620 | 467 | return Status::OK(); |
621 | 467 | } |
622 | | |
623 | 0 | TFileFormatType::type FileScannerV2::_get_current_format_type() const { |
624 | 0 | return get_range_format_type(*_params, _current_range); |
625 | 0 | } |
626 | | |
627 | | Status FileScannerV2::_to_file_format(TFileFormatType::type format_type, |
628 | 950 | format::FileFormat* file_format) { |
629 | 950 | DORIS_CHECK(file_format != nullptr); |
630 | 950 | switch (format_type) { |
631 | 221 | case TFileFormatType::FORMAT_PARQUET: |
632 | 221 | *file_format = format::FileFormat::PARQUET; |
633 | 221 | return Status::OK(); |
634 | 5 | case TFileFormatType::FORMAT_JNI: |
635 | 5 | *file_format = format::FileFormat::JNI; |
636 | 5 | return Status::OK(); |
637 | 670 | case TFileFormatType::FORMAT_CSV_PLAIN: |
638 | 671 | case TFileFormatType::FORMAT_CSV_GZ: |
639 | 672 | case TFileFormatType::FORMAT_CSV_BZ2: |
640 | 673 | case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
641 | 674 | case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
642 | 675 | case TFileFormatType::FORMAT_CSV_LZOP: |
643 | 676 | case TFileFormatType::FORMAT_CSV_DEFLATE: |
644 | 677 | case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
645 | 678 | case TFileFormatType::FORMAT_PROTO: |
646 | 678 | *file_format = format::FileFormat::CSV; |
647 | 678 | return Status::OK(); |
648 | 1 | case TFileFormatType::FORMAT_TEXT: |
649 | 1 | *file_format = format::FileFormat::TEXT; |
650 | 1 | return Status::OK(); |
651 | 37 | case TFileFormatType::FORMAT_JSON: |
652 | 37 | *file_format = format::FileFormat::JSON; |
653 | 37 | return Status::OK(); |
654 | 5 | case TFileFormatType::FORMAT_NATIVE: |
655 | 5 | *file_format = format::FileFormat::NATIVE; |
656 | 5 | return Status::OK(); |
657 | 1 | case TFileFormatType::FORMAT_ARROW: |
658 | 1 | *file_format = format::FileFormat::ARROW; |
659 | 1 | return Status::OK(); |
660 | 1 | default: |
661 | 1 | return Status::NotSupported("FileScannerV2 does not support file format {}", |
662 | 1 | to_string(format_type)); |
663 | 950 | } |
664 | 950 | } |
665 | | |
666 | 469 | Status FileScannerV2::_init_io_ctx() { |
667 | 469 | _io_ctx = std::make_shared<io::IOContext>(); |
668 | 469 | _io_ctx->query_id = &_state->query_id(); |
669 | 469 | return Status::OK(); |
670 | 469 | } |
671 | | |
672 | 469 | void FileScannerV2::_reset_adaptive_batch_size_state() { |
673 | 469 | _block_size_predictor.reset(); |
674 | 469 | COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0)); |
675 | 469 | COUNTER_SET(_adaptive_batch_actual_bytes_counter, int64_t(0)); |
676 | 469 | } |
677 | | |
678 | 468 | void FileScannerV2::_init_adaptive_batch_size_state(TFileFormatType::type format_type) { |
679 | 468 | _reset_adaptive_batch_size_state(); |
680 | 468 | if (!_should_enable_adaptive_batch_size(format_type)) { |
681 | 2 | return; |
682 | 2 | } |
683 | | |
684 | | // V2 native file readers do not have reliable row-width hints before the first batch. Start |
685 | | // every split with a small probe, then learn bytes-per-row from the materialized table block |
686 | | // and keep later batches close to RuntimeState::preferred_block_size_bytes(). |
687 | 466 | _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>( |
688 | 466 | _state->preferred_block_size_bytes(), 0.0, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS, |
689 | 466 | _state->batch_size()); |
690 | 466 | } |
691 | | |
692 | 468 | bool FileScannerV2::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const { |
693 | 468 | if (!config::enable_adaptive_batch_size) { |
694 | 0 | return false; |
695 | 0 | } |
696 | 468 | switch (format_type) { |
697 | 110 | case TFileFormatType::FORMAT_PARQUET: |
698 | 110 | case TFileFormatType::FORMAT_ORC: |
699 | 445 | case TFileFormatType::FORMAT_CSV_PLAIN: |
700 | 445 | case TFileFormatType::FORMAT_CSV_GZ: |
701 | 445 | case TFileFormatType::FORMAT_CSV_BZ2: |
702 | 445 | case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
703 | 445 | case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
704 | 445 | case TFileFormatType::FORMAT_CSV_LZOP: |
705 | 445 | case TFileFormatType::FORMAT_CSV_DEFLATE: |
706 | 445 | case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
707 | 445 | case TFileFormatType::FORMAT_PROTO: |
708 | 445 | case TFileFormatType::FORMAT_TEXT: |
709 | 463 | case TFileFormatType::FORMAT_JSON: |
710 | 465 | case TFileFormatType::FORMAT_JNI: |
711 | 465 | return true; |
712 | 2 | default: |
713 | 2 | return false; |
714 | 468 | } |
715 | 468 | } |
716 | | |
717 | 1.78k | bool FileScannerV2::_should_run_adaptive_batch_size() const { |
718 | | // COUNT pushdown emits synthetic rows from file metadata and does not materialize file columns, |
719 | | // so there is no useful row-width sample to learn from. |
720 | 1.78k | return _block_size_predictor != nullptr && |
721 | 1.78k | _local_state->get_push_down_agg_type() != TPushAggOp::type::COUNT; |
722 | 1.78k | } |
723 | | |
724 | 1.11k | size_t FileScannerV2::_predict_reader_batch_rows() { |
725 | 1.11k | DORIS_CHECK(_block_size_predictor != nullptr); |
726 | | // Before history exists this returns the probe row count; after update(), it returns roughly |
727 | | // preferred_block_size_bytes / EWMA(bytes_per_row), capped by RuntimeState::batch_size(). |
728 | 1.11k | const size_t predicted_rows = _block_size_predictor->predict_next_rows(); |
729 | 1.11k | COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows)); |
730 | 1.11k | return predicted_rows; |
731 | 1.11k | } |
732 | | |
733 | 657 | void FileScannerV2::_update_adaptive_batch_size(const Block& block) { |
734 | 657 | if (!_should_run_adaptive_batch_size()) { |
735 | 7 | return; |
736 | 7 | } |
737 | 650 | COUNTER_SET(_adaptive_batch_actual_bytes_counter, static_cast<int64_t>(block.bytes())); |
738 | 650 | if (block.rows() == 0) { |
739 | 0 | return; |
740 | 0 | } |
741 | | // The sample is taken after TableReader has finalized file-local columns to table columns. |
742 | | // This matches the memory shape seen by upstream operators and catches very wide nested |
743 | | // columns, such as map/string payloads, after the first probe batch. |
744 | 650 | if (!_block_size_predictor->has_history()) { |
745 | 467 | COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1); |
746 | 467 | } |
747 | 650 | _block_size_predictor->update(block); |
748 | 650 | } |
749 | | |
750 | 469 | Status FileScannerV2::close(RuntimeState* state) { |
751 | 469 | if (!_try_close()) { |
752 | 0 | return Status::OK(); |
753 | 0 | } |
754 | 469 | if (_table_reader != nullptr) { |
755 | 467 | RETURN_IF_ERROR(_table_reader->close()); |
756 | 467 | _report_condition_cache_profile(); |
757 | 467 | _table_reader.reset(); |
758 | 467 | } |
759 | 469 | return Scanner::close(state); |
760 | 469 | } |
761 | | |
762 | 469 | void FileScannerV2::try_stop() { |
763 | 469 | Scanner::try_stop(); |
764 | 469 | if (_io_ctx) { |
765 | 469 | _io_ctx->should_stop = true; |
766 | 469 | } |
767 | 469 | } |
768 | | |
769 | 886 | void FileScannerV2::update_realtime_counters() { |
770 | 886 | if (_file_reader_stats == nullptr) { |
771 | 0 | return; |
772 | 0 | } |
773 | 886 | const int64_t bytes_read = _file_reader_stats->read_bytes; |
774 | 886 | COUNTER_SET(_file_read_bytes_counter, bytes_read); |
775 | 886 | COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls)); |
776 | 886 | COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns)); |
777 | 886 | } |
778 | | |
779 | 469 | void FileScannerV2::_collect_profile_before_close() { |
780 | 469 | _report_file_reader_predicate_filtered_rows(); |
781 | 469 | Scanner::_collect_profile_before_close(); |
782 | 469 | if (_file_reader_stats != nullptr) { |
783 | 469 | COUNTER_SET(_file_read_bytes_counter, cast_set<int64_t>(_file_reader_stats->read_bytes)); |
784 | 469 | COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls)); |
785 | 469 | COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns)); |
786 | 469 | } |
787 | | // Query profiles can be collected before Scanner::close() runs. Publish condition-cache |
788 | | // counters here as well, using deltas so this method and close() cannot double count. |
789 | 469 | _report_condition_cache_profile(); |
790 | 469 | } |
791 | | |
792 | 469 | bool FileScannerV2::_should_update_load_counters() const { |
793 | 469 | if (_is_load) { |
794 | 0 | return true; |
795 | 0 | } |
796 | | // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a |
797 | | // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by |
798 | | // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM |
799 | | // is only reachable from such load entries, never from normal queries, so use it to |
800 | | // identify these scanners. |
801 | 469 | return (_params != nullptr && _params->__isset.file_type && |
802 | 469 | _params->file_type == TFileType::FILE_STREAM) || |
803 | 469 | (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM); |
804 | 469 | } |
805 | | |
806 | 469 | void FileScannerV2::_report_file_reader_predicate_filtered_rows() { |
807 | 469 | const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0; |
808 | 469 | const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows; |
809 | 469 | if (filtered_delta > 0) { |
810 | | // File readers can evaluate localized conjuncts before a block reaches Scanner. Count |
811 | | // those rows as scanner-level unselected rows so load statistics stay identical no matter |
812 | | // whether a predicate is pushed down or evaluated by Scanner::_filter_output_block(). |
813 | 9 | _counter.num_rows_unselected += filtered_delta; |
814 | 9 | _reported_predicate_filtered_rows = filtered_rows; |
815 | 9 | } |
816 | 469 | } |
817 | | |
818 | 936 | void FileScannerV2::_report_condition_cache_profile() { |
819 | 936 | auto* local_state = static_cast<FileScanLocalState*>(_local_state); |
820 | 936 | const int64_t hit_count = |
821 | 936 | _table_reader != nullptr ? _table_reader->condition_cache_hit_count() : 0; |
822 | 936 | const int64_t hit_delta = hit_count - _reported_condition_cache_hit_count; |
823 | 936 | if (hit_delta > 0) { |
824 | 0 | COUNTER_UPDATE(local_state->_condition_cache_hit_counter, hit_delta); |
825 | 0 | _reported_condition_cache_hit_count = hit_count; |
826 | 0 | } |
827 | 936 | const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->condition_cache_filtered_rows : 0; |
828 | 936 | const int64_t filtered_delta = filtered_rows - _reported_condition_cache_filtered_rows; |
829 | 936 | if (filtered_delta > 0) { |
830 | 0 | COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, filtered_delta); |
831 | 0 | _reported_condition_cache_filtered_rows = filtered_rows; |
832 | 0 | } |
833 | 936 | } |
834 | | |
835 | | } // namespace doris |