be/src/exec/scan/file_scanner_v2.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/file_scanner_v2.h" |
19 | | |
20 | | #include <gen_cpp/Exprs_types.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <map> |
25 | | #include <memory> |
26 | | #include <optional> |
27 | | #include <string> |
28 | | #include <utility> |
29 | | |
30 | | #include "common/cast_set.h" |
31 | | #include "common/config.h" |
32 | | #include "common/consts.h" |
33 | | #include "common/status.h" |
34 | | #include "core/assert_cast.h" |
35 | | #include "core/block/column_with_type_and_name.h" |
36 | | #include "core/column/column.h" |
37 | | #include "core/data_type/data_type.h" |
38 | | #include "core/data_type/data_type_nullable.h" |
39 | | #include "core/data_type_serde/data_type_serde.h" |
40 | | #include "core/string_ref.h" |
41 | | #include "exec/common/util.hpp" |
42 | | #include "exec/operator/scan_operator.h" |
43 | | #include "exec/scan/access_path_parser.h" |
44 | | #include "exprs/vexpr.h" |
45 | | #include "exprs/vexpr_context.h" |
46 | | #include "exprs/vslot_ref.h" |
47 | | #include "format/format_common.h" |
48 | | #include "format_v2/column_mapper.h" |
49 | | #include "format_v2/jni/iceberg_sys_table_reader.h" |
50 | | #include "format_v2/jni/jdbc_reader.h" |
51 | | #include "format_v2/table/hive_reader.h" |
52 | | #include "format_v2/table/hudi_reader.h" |
53 | | #include "format_v2/table/iceberg_reader.h" |
54 | | #include "format_v2/table/paimon_reader.h" |
55 | | #include "format_v2/table_reader.h" |
56 | | #include "io/fs/file_meta_cache.h" |
57 | | #include "io/io_common.h" |
58 | | #include "runtime/descriptors.h" |
59 | | #include "runtime/exec_env.h" |
60 | | #include "runtime/runtime_state.h" |
61 | | #include "service/backend_options.h" |
62 | | #include "storage/id_manager.h" |
63 | | |
64 | | namespace doris { |
65 | | namespace { |
66 | | |
67 | 31 | std::string table_format_name(const TFileRangeDesc& range) { |
68 | 31 | return range.__isset.table_format_params ? range.table_format_params.table_format_type |
69 | 31 | : "NotSet"; |
70 | 31 | } |
71 | | |
72 | | TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params, |
73 | 34 | const TFileRangeDesc& range) { |
74 | 34 | return range.__isset.format_type ? range.format_type : params.format_type; |
75 | 34 | } |
76 | | |
77 | 28 | bool is_supported_table_format(const TFileRangeDesc& range) { |
78 | 28 | const auto table_format = table_format_name(range); |
79 | 28 | if (table_format == "hudi" && range.__isset.table_format_params && |
80 | 28 | range.table_format_params.__isset.hudi_params && |
81 | 28 | range.table_format_params.hudi_params.__isset.delta_logs && |
82 | 28 | !range.table_format_params.hudi_params.delta_logs.empty()) { |
83 | | // Hudi MOR splits need log-file merge semantics and must stay on the existing JNI path. |
84 | | // FileScannerV2 currently supports native Parquet data files only. |
85 | 1 | return false; |
86 | 1 | } |
87 | 27 | return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" || |
88 | 27 | table_format == "iceberg" || table_format == "paimon" || table_format == "hudi"; |
89 | 28 | } |
90 | | |
91 | 3 | bool is_supported_jni_table_format(const TFileRangeDesc& range) { |
92 | 3 | const auto table_format = table_format_name(range); |
93 | 3 | if (table_format == "paimon") { |
94 | 0 | return range.__isset.table_format_params && |
95 | 0 | range.table_format_params.__isset.paimon_params && |
96 | 0 | range.table_format_params.paimon_params.__isset.reader_type && |
97 | 0 | range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI; |
98 | 0 | } |
99 | 3 | return table_format == "jdbc" || table_format == "iceberg"; |
100 | 3 | } |
101 | | |
102 | 15 | bool is_csv_format(TFileFormatType::type format_type) { |
103 | 15 | switch (format_type) { |
104 | 2 | case TFileFormatType::FORMAT_CSV_PLAIN: |
105 | 3 | case TFileFormatType::FORMAT_CSV_GZ: |
106 | 4 | case TFileFormatType::FORMAT_CSV_BZ2: |
107 | 5 | case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
108 | 6 | case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
109 | 7 | case TFileFormatType::FORMAT_CSV_LZOP: |
110 | 8 | case TFileFormatType::FORMAT_CSV_DEFLATE: |
111 | 9 | case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
112 | 10 | case TFileFormatType::FORMAT_PROTO: |
113 | 10 | return true; |
114 | 5 | default: |
115 | 5 | return false; |
116 | 15 | } |
117 | 15 | } |
118 | | |
119 | 5 | bool is_text_format(TFileFormatType::type format_type) { |
120 | 5 | return format_type == TFileFormatType::FORMAT_TEXT; |
121 | 5 | } |
122 | | |
123 | 6 | bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { |
124 | 6 | if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || |
125 | 6 | column_name == BeConsts::ICEBERG_ROWID_COL) { |
126 | 2 | return false; |
127 | 2 | } |
128 | 4 | return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY |
129 | 4 | : !slot_info.is_file_slot; |
130 | 6 | } |
131 | | |
132 | 8 | bool is_data_file_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { |
133 | 8 | if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || |
134 | 8 | column_name == BeConsts::ICEBERG_ROWID_COL) { |
135 | 2 | return false; |
136 | 2 | } |
137 | | // CSV and other non-self-describing formats need FE slot descriptors for only the columns that |
138 | | // are physically read from the file. Partition/default/virtual columns stay in TableReader's |
139 | | // mapping layer and are materialized after the file-local block is read. New FE provides an |
140 | | // explicit category; old FE falls back to `is_file_slot`. |
141 | 6 | if (slot_info.__isset.category) { |
142 | 4 | return slot_info.category == TColumnCategory::REGULAR || |
143 | 4 | slot_info.category == TColumnCategory::GENERATED; |
144 | 4 | } |
145 | 2 | return slot_info.is_file_slot; |
146 | 6 | } |
147 | | |
148 | | Status rewrite_slot_refs_to_global_index( |
149 | | VExprSPtr* expr, |
150 | 4 | const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) { |
151 | 4 | DORIS_CHECK(expr != nullptr); |
152 | 4 | if (*expr == nullptr) { |
153 | 0 | return Status::OK(); |
154 | 0 | } |
155 | 4 | if ((*expr)->is_slot_ref()) { |
156 | 3 | const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get()); |
157 | 3 | const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id()); |
158 | 3 | if (global_index_it == slot_id_to_global_index.end()) { |
159 | 1 | DORIS_CHECK(slot_ref->slot_id() >= 0); |
160 | 1 | const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id())); |
161 | 1 | *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()), |
162 | 1 | cast_set<int>(global_index.value()), -1, |
163 | 1 | slot_ref->data_type(), slot_ref->column_name()); |
164 | 1 | RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr)); |
165 | 1 | return Status::OK(); |
166 | 1 | } |
167 | 2 | const auto global_index = global_index_it->second; |
168 | 2 | *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()), |
169 | 2 | cast_set<int>(global_index.value()), -1, |
170 | 2 | slot_ref->data_type(), slot_ref->column_name()); |
171 | 2 | RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr)); |
172 | 2 | return Status::OK(); |
173 | 2 | } |
174 | 1 | auto children = (*expr)->children(); |
175 | 1 | for (auto& child : children) { |
176 | 1 | if (child == nullptr) { |
177 | 0 | continue; |
178 | 0 | } |
179 | 1 | RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index)); |
180 | 1 | } |
181 | 1 | (*expr)->set_children(std::move(children)); |
182 | 1 | return Status::OK(); |
183 | 1 | } |
184 | | |
185 | | } // namespace |
186 | | |
187 | | #ifdef BE_TEST |
188 | | Status FileScannerV2::TEST_to_file_format(TFileFormatType::type format_type, |
189 | 13 | format::FileFormat* file_format) { |
190 | 13 | return _to_file_format(format_type, file_format); |
191 | 13 | } |
192 | | |
193 | | bool FileScannerV2::TEST_is_partition_slot(const TFileScanSlotInfo& slot_info, |
194 | 6 | const std::string& column_name) { |
195 | 6 | return is_partition_slot(slot_info, column_name); |
196 | 6 | } |
197 | | |
198 | | bool FileScannerV2::TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info, |
199 | 8 | const std::string& column_name) { |
200 | 8 | return is_data_file_slot(slot_info, column_name); |
201 | 8 | } |
202 | | |
203 | | Status FileScannerV2::TEST_rewrite_slot_refs_to_global_index( |
204 | | VExprSPtr* expr, |
205 | 3 | const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) { |
206 | 3 | return rewrite_slot_refs_to_global_index(expr, slot_id_to_global_index); |
207 | 3 | } |
208 | | #endif |
209 | | |
210 | 34 | bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) { |
211 | 34 | const auto format_type = get_range_format_type(params, range); |
212 | 34 | if (format_type == TFileFormatType::FORMAT_PARQUET) { |
213 | 16 | return is_supported_table_format(range); |
214 | 18 | } else if (format_type == TFileFormatType::FORMAT_JNI) { |
215 | 3 | return is_supported_jni_table_format(range); |
216 | 15 | } else if (is_csv_format(format_type) || is_text_format(format_type)) { |
217 | 12 | return is_supported_table_format(range); |
218 | 12 | } else { |
219 | 3 | LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2"; |
220 | 3 | return false; |
221 | 3 | } |
222 | 34 | } |
223 | | |
224 | | FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit, |
225 | | std::shared_ptr<SplitSourceConnector> split_source, |
226 | | RuntimeProfile* profile, ShardedKVCache* kv_cache, |
227 | | const std::unordered_map<std::string, int>* colname_to_slot_id) |
228 | 0 | : Scanner(state, local_state, limit, profile), |
229 | 0 | _split_source(std::move(split_source)), |
230 | 0 | _kv_cache(kv_cache) { |
231 | 0 | (void)colname_to_slot_id; |
232 | 0 | if (state->get_query_ctx() != nullptr && |
233 | 0 | state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) { |
234 | 0 | _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]); |
235 | 0 | } else { |
236 | 0 | _params = _split_source->get_params(); |
237 | 0 | } |
238 | 0 | } |
239 | | |
240 | 0 | Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
241 | 0 | RETURN_IF_ERROR(Scanner::init(state, conjuncts)); |
242 | 0 | _get_block_timer = |
243 | 0 | ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1); |
244 | 0 | _file_counter = |
245 | 0 | ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1); |
246 | 0 | _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
247 | 0 | "FileReadBytes", TUnit::BYTES, 1); |
248 | 0 | _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
249 | 0 | "FileReadCalls", TUnit::UNIT, 1); |
250 | 0 | _file_read_time_counter = |
251 | 0 | ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1); |
252 | 0 | _file_cache_statistics = std::make_unique<io::FileCacheStatistics>(); |
253 | 0 | _file_reader_stats = std::make_unique<io::FileReaderStats>(); |
254 | 0 | RETURN_IF_ERROR(_init_io_ctx()); |
255 | 0 | _io_ctx->file_cache_stats = _file_cache_statistics.get(); |
256 | 0 | _io_ctx->file_reader_stats = _file_reader_stats.get(); |
257 | 0 | _io_ctx->is_disposable = _state->query_options().disable_file_cache; |
258 | 0 | return Status::OK(); |
259 | 0 | } |
260 | | |
261 | 0 | Status FileScannerV2::_open_impl(RuntimeState* state) { |
262 | 0 | RETURN_IF_CANCELLED(state); |
263 | 0 | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
264 | 0 | RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range)); |
265 | 0 | if (_first_scan_range) { |
266 | 0 | RETURN_IF_ERROR(_create_table_reader_for_format(_current_range, &_table_reader)); |
267 | 0 | DORIS_CHECK(_table_reader != nullptr); |
268 | 0 | RETURN_IF_ERROR(_init_expr_ctxes()); |
269 | 0 | RETURN_IF_ERROR(_init_table_reader(_current_range)); |
270 | 0 | } |
271 | 0 | return Status::OK(); |
272 | 0 | } |
273 | | |
274 | 0 | Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
275 | 0 | while (true) { |
276 | 0 | RETURN_IF_CANCELLED(state); |
277 | 0 | if (!_has_prepared_split) { |
278 | 0 | RETURN_IF_ERROR(_prepare_next_split(eof)); |
279 | 0 | if (*eof) { |
280 | 0 | return Status::OK(); |
281 | 0 | } |
282 | 0 | } |
283 | | |
284 | 0 | { |
285 | 0 | SCOPED_TIMER(_get_block_timer); |
286 | 0 | RETURN_IF_ERROR(_table_reader->get_block(block, eof)); |
287 | 0 | } |
288 | 0 | if (*eof) { |
289 | 0 | _state->update_num_finished_scan_range(1); |
290 | 0 | _has_prepared_split = false; |
291 | 0 | *eof = false; |
292 | 0 | continue; |
293 | 0 | } |
294 | 0 | return Status::OK(); |
295 | 0 | } |
296 | 0 | } |
297 | | |
298 | 0 | Status FileScannerV2::_prepare_next_split(bool* eos) { |
299 | 0 | bool has_next = _first_scan_range; |
300 | 0 | if (!_first_scan_range) { |
301 | 0 | RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range)); |
302 | 0 | } |
303 | 0 | _first_scan_range = false; |
304 | 0 | if (!has_next || _should_stop) { |
305 | 0 | *eos = true; |
306 | 0 | return Status::OK(); |
307 | 0 | } |
308 | 0 | DORIS_CHECK(_table_reader != nullptr); |
309 | 0 | _current_range_path = _current_range.path; |
310 | 0 | RETURN_IF_ERROR(_prepare_table_reader_split(_current_range)); |
311 | 0 | COUNTER_UPDATE(_file_counter, 1); |
312 | 0 | _has_prepared_split = true; |
313 | 0 | *eos = false; |
314 | 0 | return Status::OK(); |
315 | 0 | } |
316 | | |
317 | 0 | Status FileScannerV2::_init_table_reader(const TFileRangeDesc& range) { |
318 | 0 | const auto format_type = get_range_format_type(*_params, range); |
319 | 0 | format::FileFormat file_format; |
320 | 0 | RETURN_IF_ERROR(_to_file_format(format_type, &file_format)); |
321 | 0 | DORIS_CHECK(_table_reader != nullptr); |
322 | |
|
323 | 0 | format::TableColumnPredicates table_column_predicates; |
324 | 0 | RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates)); |
325 | 0 | VExprContextSPtrs table_conjuncts; |
326 | 0 | RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts)); |
327 | 0 | RETURN_IF_ERROR(_table_reader->init({ |
328 | 0 | .projected_columns = _projected_columns, |
329 | 0 | .column_predicates = std::move(table_column_predicates), |
330 | 0 | .conjuncts = std::move(table_conjuncts), |
331 | 0 | .format = file_format, |
332 | 0 | .scan_params = const_cast<TFileScanRangeParams*>(_params), |
333 | 0 | .io_ctx = _io_ctx, |
334 | 0 | .runtime_state = _state, |
335 | 0 | .scanner_profile = _local_state->scanner_profile(), |
336 | 0 | .file_slot_descs = &_file_slot_descs, |
337 | 0 | .push_down_agg_type = _local_state->get_push_down_agg_type(), |
338 | 0 | .condition_cache_digest = _local_state->get_condition_cache_digest(), |
339 | 0 | })); |
340 | 0 | return Status::OK(); |
341 | 0 | } |
342 | | |
343 | | Status FileScannerV2::_create_table_reader_for_format( |
344 | 0 | const TFileRangeDesc& range, std::unique_ptr<format::TableReader>* reader) const { |
345 | 0 | DORIS_CHECK(reader != nullptr); |
346 | 0 | const auto table_format = table_format_name(range); |
347 | 0 | if (table_format == "NotSet" || table_format == "tvf") { |
348 | 0 | *reader = std::make_unique<format::TableReader>(); |
349 | 0 | } else if (table_format == "hive") { |
350 | 0 | *reader = format::hive::HiveReader::create_unique(); |
351 | 0 | } else if (table_format == "iceberg") { |
352 | 0 | if (get_range_format_type(*_params, range) == TFileFormatType::FORMAT_JNI) { |
353 | 0 | *reader = std::make_unique<format::iceberg::IcebergSysTableJniReader>(); |
354 | 0 | } else { |
355 | 0 | *reader = std::make_unique<format::iceberg::IcebergTableReader>(); |
356 | 0 | } |
357 | 0 | } else if (table_format == "paimon") { |
358 | 0 | *reader = std::make_unique<format::paimon::PaimonHybridReader>(); |
359 | 0 | } else if (table_format == "hudi") { |
360 | 0 | *reader = format::hudi::HudiReader::create_unique(); |
361 | 0 | } else if (table_format == "jdbc") { |
362 | 0 | *reader = std::make_unique<format::jdbc::JdbcJniReader>(); |
363 | 0 | } else { |
364 | 0 | return Status::NotSupported("FileScannerV2 does not support table format {}", table_format); |
365 | 0 | } |
366 | 0 | return Status::OK(); |
367 | 0 | } |
368 | | |
369 | 0 | Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) { |
370 | 0 | std::map<std::string, Field> partition_values; |
371 | 0 | RETURN_IF_ERROR(_generate_partition_values(range, &partition_values)); |
372 | 0 | RETURN_IF_ERROR(_table_reader->prepare_split({ |
373 | 0 | .partition_values = std::move(partition_values), |
374 | 0 | .cache = _kv_cache, |
375 | 0 | .current_range = range, |
376 | 0 | .global_rowid_context = _create_global_rowid_context(range), |
377 | 0 | })); |
378 | 0 | return Status::OK(); |
379 | 0 | } |
380 | | |
381 | 0 | bool FileScannerV2::_should_enable_file_meta_cache() const { |
382 | 0 | return ExecEnv::GetInstance()->file_meta_cache()->enabled() && |
383 | 0 | _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3; |
384 | 0 | } |
385 | | |
386 | | std::optional<format::GlobalRowIdContext> FileScannerV2::_create_global_rowid_context( |
387 | 0 | const TFileRangeDesc& range) const { |
388 | 0 | if (!_need_global_rowid_column) { |
389 | 0 | return std::nullopt; |
390 | 0 | } |
391 | 0 | auto& id_file_map = _state->get_id_file_map(); |
392 | 0 | DORIS_CHECK(id_file_map != nullptr); |
393 | 0 | const auto file_id = id_file_map->get_file_mapping_id( |
394 | 0 | std::make_shared<FileMapping>(_local_state->cast<FileScanLocalState>().parent_id(), |
395 | 0 | range, _should_enable_file_meta_cache())); |
396 | 0 | return format::GlobalRowIdContext { |
397 | 0 | .version = IdManager::ID_VERSION, |
398 | 0 | .backend_id = BackendOptions::get_backend_id(), |
399 | 0 | .file_id = file_id, |
400 | 0 | }; |
401 | 0 | } |
402 | | |
403 | | Status FileScannerV2::_generate_partition_values( |
404 | 0 | const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const { |
405 | 0 | DORIS_CHECK(partition_values != nullptr); |
406 | 0 | partition_values->clear(); |
407 | 0 | if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) { |
408 | 0 | return Status::OK(); |
409 | 0 | } |
410 | 0 | DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size()); |
411 | 0 | for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) { |
412 | 0 | const auto& key = range.columns_from_path_keys[idx]; |
413 | 0 | const auto it = _partition_slot_descs.find(key); |
414 | 0 | if (it == _partition_slot_descs.end()) { |
415 | 0 | continue; |
416 | 0 | } |
417 | 0 | const auto& value = range.columns_from_path[idx]; |
418 | 0 | const bool is_null = range.__isset.columns_from_path_is_null && |
419 | 0 | idx < range.columns_from_path_is_null.size() && |
420 | 0 | range.columns_from_path_is_null[idx]; |
421 | 0 | Field field; |
422 | 0 | DORIS_CHECK(it->second.slot_desc != nullptr); |
423 | 0 | RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field)); |
424 | 0 | partition_values->emplace(it->second.canonical_name, std::move(field)); |
425 | 0 | } |
426 | 0 | return Status::OK(); |
427 | 0 | } |
428 | | |
429 | | Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc, |
430 | | const std::string& value, bool is_null, |
431 | 0 | Field* field) const { |
432 | 0 | DORIS_CHECK(slot_desc != nullptr); |
433 | 0 | DORIS_CHECK(field != nullptr); |
434 | 0 | if (is_null) { |
435 | 0 | *field = Field::create_field<TYPE_NULL>(Null()); |
436 | 0 | return Status::OK(); |
437 | 0 | } |
438 | 0 | const auto data_type = remove_nullable(slot_desc->get_data_type_ptr()); |
439 | 0 | auto column = data_type->create_column(); |
440 | 0 | auto serde = data_type->get_serde(); |
441 | 0 | DataTypeSerDe::FormatOptions options; |
442 | 0 | options.converted_from_string = true; |
443 | 0 | StringRef ref(value.data(), value.size()); |
444 | 0 | RETURN_IF_ERROR(serde->from_string(ref, *column, options)); |
445 | 0 | DORIS_CHECK(column->size() == 1); |
446 | 0 | *field = (*column)[0]; |
447 | 0 | return Status::OK(); |
448 | 0 | } |
449 | | |
450 | 0 | Status FileScannerV2::_init_expr_ctxes() { |
451 | 0 | _slot_id_to_desc.clear(); |
452 | 0 | _slot_id_to_global_index.clear(); |
453 | 0 | _partition_slot_descs.clear(); |
454 | 0 | _file_slot_descs.clear(); |
455 | 0 | for (const auto* slot_desc : _output_tuple_desc->slots()) { |
456 | 0 | _slot_id_to_desc.emplace(slot_desc->id(), slot_desc); |
457 | 0 | } |
458 | 0 | DORIS_CHECK(_table_reader != nullptr); |
459 | 0 | RETURN_IF_ERROR(_build_projected_columns(*_table_reader)); |
460 | 0 | return Status::OK(); |
461 | 0 | } |
462 | | |
463 | 0 | Status FileScannerV2::_build_projected_columns(const format::TableReader& table_reader) { |
464 | 0 | _projected_columns.clear(); |
465 | 0 | _projected_columns.reserve(_params->required_slots.size()); |
466 | 0 | _need_global_rowid_column = false; |
467 | 0 | format::ProjectedColumnBuildContext build_context { |
468 | 0 | .scan_params = _params, |
469 | 0 | .range = &_current_range, |
470 | 0 | .runtime_state = _state, |
471 | 0 | }; |
472 | |
|
473 | 0 | for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) { |
474 | 0 | const auto& slot_info = _params->required_slots[slot_idx]; |
475 | 0 | const auto it = _slot_id_to_desc.find(slot_info.slot_id); |
476 | 0 | if (it == _slot_id_to_desc.end()) { |
477 | 0 | return Status::InternalError("Unknown source slot descriptor, slot_id={}", |
478 | 0 | slot_info.slot_id); |
479 | 0 | } |
480 | 0 | auto column = _build_table_column(it->second); |
481 | 0 | if (column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) { |
482 | 0 | _need_global_rowid_column = true; |
483 | 0 | } |
484 | 0 | RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr)); |
485 | 0 | build_context.schema_column.reset(); |
486 | 0 | RETURN_IF_ERROR(table_reader.annotate_projected_column(slot_info, &build_context, &column)); |
487 | | // Build nested children from access paths generated by the slot's access-path |
488 | | // expressions. A projected column can therefore contain only a subset of the schema |
489 | | // column's nested children. |
490 | 0 | RETURN_IF_ERROR(AccessPathParser::build_nested_children( |
491 | 0 | &column, it->second, |
492 | 0 | build_context.schema_column.has_value() ? &*build_context.schema_column : nullptr)); |
493 | 0 | if (is_partition_slot(slot_info, column.name)) { |
494 | 0 | column.is_partition_key = true; |
495 | 0 | _partition_slot_descs.emplace( |
496 | 0 | column.name, |
497 | 0 | PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name}); |
498 | 0 | for (const auto& alias : column.name_mapping) { |
499 | 0 | _partition_slot_descs.emplace( |
500 | 0 | alias, |
501 | 0 | PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name}); |
502 | 0 | } |
503 | 0 | } else if (is_data_file_slot(slot_info, column.name)) { |
504 | 0 | _file_slot_descs.push_back(const_cast<SlotDescriptor*>(it->second)); |
505 | 0 | } |
506 | 0 | const auto global_index = format::GlobalIndex(slot_idx); |
507 | 0 | _slot_id_to_global_index.emplace(slot_info.slot_id, global_index); |
508 | 0 | _projected_columns.push_back(std::move(column)); |
509 | 0 | } |
510 | 0 | RETURN_IF_ERROR(table_reader.validate_projected_columns(build_context)); |
511 | 0 | return Status::OK(); |
512 | 0 | } |
513 | | |
514 | | Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info, |
515 | 0 | VExprContextSPtr* ctx) const { |
516 | 0 | DORIS_CHECK(ctx != nullptr); |
517 | 0 | if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) { |
518 | 0 | return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx); |
519 | 0 | } |
520 | | |
521 | 0 | if (_params->__isset.default_value_of_src_slot) { |
522 | 0 | const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id); |
523 | 0 | if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) { |
524 | 0 | return VExpr::create_expr_tree(it->second, *ctx); |
525 | 0 | } |
526 | 0 | } |
527 | 0 | return Status::OK(); |
528 | 0 | } |
529 | | |
530 | 0 | format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) { |
531 | 0 | DORIS_CHECK(slot_desc != nullptr); |
532 | 0 | format::ColumnDefinition column; |
533 | | // TODO(gabriel): why always BY_NAME here? |
534 | 0 | column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name()); |
535 | 0 | column.name = slot_desc->col_name(); |
536 | 0 | column.type = slot_desc->get_data_type_ptr(); |
537 | 0 | return column; |
538 | 0 | } |
539 | | |
540 | | Status FileScannerV2::_build_table_column_predicates( |
541 | 0 | format::TableColumnPredicates* predicates) const { |
542 | 0 | DORIS_CHECK(predicates != nullptr); |
543 | 0 | predicates->clear(); |
544 | 0 | const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates; |
545 | 0 | for (const auto& [slot_id, slot_predicate_list] : slot_predicates) { |
546 | 0 | const auto it = _slot_id_to_desc.find(slot_id); |
547 | 0 | if (it == _slot_id_to_desc.end()) { |
548 | 0 | continue; |
549 | 0 | } |
550 | 0 | const auto global_index_it = _slot_id_to_global_index.find(slot_id); |
551 | 0 | if (global_index_it == _slot_id_to_global_index.end()) { |
552 | 0 | continue; |
553 | 0 | } |
554 | 0 | (*predicates)[global_index_it->second] = slot_predicate_list; |
555 | 0 | } |
556 | 0 | return Status::OK(); |
557 | 0 | } |
558 | | |
559 | 0 | Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const { |
560 | 0 | DORIS_CHECK(conjuncts != nullptr); |
561 | 0 | conjuncts->clear(); |
562 | 0 | conjuncts->reserve(_conjuncts.size()); |
563 | 0 | for (const auto& conjunct : _conjuncts) { |
564 | 0 | VExprSPtr root; |
565 | 0 | RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root)); |
566 | 0 | RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index)); |
567 | 0 | conjuncts->push_back(VExprContext::create_shared(std::move(root))); |
568 | 0 | } |
569 | 0 | return Status::OK(); |
570 | 0 | } |
571 | | |
572 | 0 | TFileFormatType::type FileScannerV2::_get_current_format_type() const { |
573 | 0 | return get_range_format_type(*_params, _current_range); |
574 | 0 | } |
575 | | |
576 | | Status FileScannerV2::_to_file_format(TFileFormatType::type format_type, |
577 | 13 | format::FileFormat* file_format) { |
578 | 13 | DORIS_CHECK(file_format != nullptr); |
579 | 13 | switch (format_type) { |
580 | 1 | case TFileFormatType::FORMAT_PARQUET: |
581 | 1 | *file_format = format::FileFormat::PARQUET; |
582 | 1 | return Status::OK(); |
583 | 1 | case TFileFormatType::FORMAT_JNI: |
584 | 1 | *file_format = format::FileFormat::JNI; |
585 | 1 | return Status::OK(); |
586 | 1 | case TFileFormatType::FORMAT_CSV_PLAIN: |
587 | 2 | case TFileFormatType::FORMAT_CSV_GZ: |
588 | 3 | case TFileFormatType::FORMAT_CSV_BZ2: |
589 | 4 | case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
590 | 5 | case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
591 | 6 | case TFileFormatType::FORMAT_CSV_LZOP: |
592 | 7 | case TFileFormatType::FORMAT_CSV_DEFLATE: |
593 | 8 | case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
594 | 9 | case TFileFormatType::FORMAT_PROTO: |
595 | 9 | *file_format = format::FileFormat::CSV; |
596 | 9 | return Status::OK(); |
597 | 1 | case TFileFormatType::FORMAT_TEXT: |
598 | 1 | *file_format = format::FileFormat::TEXT; |
599 | 1 | return Status::OK(); |
600 | 1 | default: |
601 | 1 | return Status::NotSupported("FileScannerV2 does not support file format {}", |
602 | 1 | to_string(format_type)); |
603 | 13 | } |
604 | 13 | } |
605 | | |
606 | 0 | Status FileScannerV2::_init_io_ctx() { |
607 | 0 | _io_ctx = std::make_shared<io::IOContext>(); |
608 | 0 | _io_ctx->query_id = &_state->query_id(); |
609 | 0 | return Status::OK(); |
610 | 0 | } |
611 | | |
612 | 0 | Status FileScannerV2::close(RuntimeState* state) { |
613 | 0 | if (!_try_close()) { |
614 | 0 | return Status::OK(); |
615 | 0 | } |
616 | 0 | if (_table_reader != nullptr) { |
617 | 0 | RETURN_IF_ERROR(_table_reader->close()); |
618 | 0 | _report_condition_cache_profile(); |
619 | 0 | _table_reader.reset(); |
620 | 0 | } |
621 | 0 | return Scanner::close(state); |
622 | 0 | } |
623 | | |
624 | 0 | void FileScannerV2::try_stop() { |
625 | 0 | Scanner::try_stop(); |
626 | 0 | if (_io_ctx) { |
627 | 0 | _io_ctx->should_stop = true; |
628 | 0 | } |
629 | 0 | } |
630 | | |
631 | 0 | void FileScannerV2::update_realtime_counters() { |
632 | 0 | if (_file_reader_stats == nullptr) { |
633 | 0 | return; |
634 | 0 | } |
635 | 0 | const int64_t bytes_read = _file_reader_stats->read_bytes; |
636 | 0 | COUNTER_SET(_file_read_bytes_counter, bytes_read); |
637 | 0 | COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls)); |
638 | 0 | COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns)); |
639 | 0 | } |
640 | | |
641 | 0 | void FileScannerV2::_collect_profile_before_close() { |
642 | 0 | _report_file_reader_predicate_filtered_rows(); |
643 | 0 | Scanner::_collect_profile_before_close(); |
644 | 0 | if (_file_reader_stats != nullptr) { |
645 | 0 | COUNTER_SET(_file_read_bytes_counter, cast_set<int64_t>(_file_reader_stats->read_bytes)); |
646 | 0 | COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls)); |
647 | 0 | COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns)); |
648 | 0 | } |
649 | | // Query profiles can be collected before Scanner::close() runs. Publish condition-cache |
650 | | // counters here as well, using deltas so this method and close() cannot double count. |
651 | 0 | _report_condition_cache_profile(); |
652 | 0 | } |
653 | | |
654 | 0 | void FileScannerV2::_report_file_reader_predicate_filtered_rows() { |
655 | 0 | const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0; |
656 | 0 | const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows; |
657 | 0 | if (filtered_delta > 0) { |
658 | | // File readers can evaluate localized conjuncts before a block reaches Scanner. Count |
659 | | // those rows as scanner-level unselected rows so load statistics stay identical no matter |
660 | | // whether a predicate is pushed down or evaluated by Scanner::_filter_output_block(). |
661 | 0 | _counter.num_rows_unselected += filtered_delta; |
662 | 0 | _reported_predicate_filtered_rows = filtered_rows; |
663 | 0 | } |
664 | 0 | } |
665 | | |
666 | 0 | void FileScannerV2::_report_condition_cache_profile() { |
667 | 0 | auto* local_state = static_cast<FileScanLocalState*>(_local_state); |
668 | 0 | const int64_t hit_count = |
669 | 0 | _table_reader != nullptr ? _table_reader->condition_cache_hit_count() : 0; |
670 | 0 | const int64_t hit_delta = hit_count - _reported_condition_cache_hit_count; |
671 | 0 | if (hit_delta > 0) { |
672 | 0 | COUNTER_UPDATE(local_state->_condition_cache_hit_counter, hit_delta); |
673 | 0 | _reported_condition_cache_hit_count = hit_count; |
674 | 0 | } |
675 | 0 | const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->condition_cache_filtered_rows : 0; |
676 | 0 | const int64_t filtered_delta = filtered_rows - _reported_condition_cache_filtered_rows; |
677 | 0 | if (filtered_delta > 0) { |
678 | 0 | COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, filtered_delta); |
679 | 0 | _reported_condition_cache_filtered_rows = filtered_rows; |
680 | 0 | } |
681 | 0 | } |
682 | | |
683 | | } // namespace doris |