be/src/format/table/iceberg_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/table/iceberg_reader.h" |
19 | | |
20 | | #include <gen_cpp/Descriptors_types.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <gen_cpp/PlanNodes_types.h> |
23 | | #include <gen_cpp/parquet_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <parallel_hashmap/phmap.h> |
26 | | #include <rapidjson/document.h> |
27 | | |
28 | | #include <algorithm> |
29 | | #include <cstring> |
30 | | #include <functional> |
31 | | #include <memory> |
32 | | |
33 | | #include "common/compiler_util.h" // IWYU pragma: keep |
34 | | #include "common/consts.h" |
35 | | #include "common/status.h" |
36 | | #include "core/assert_cast.h" |
37 | | #include "core/block/block.h" |
38 | | #include "core/block/column_with_type_and_name.h" |
39 | | #include "core/column/column.h" |
40 | | #include "core/column/column_string.h" |
41 | | #include "core/column/column_vector.h" |
42 | | #include "core/data_type/data_type_factory.hpp" |
43 | | #include "core/data_type/define_primitive_type.h" |
44 | | #include "core/data_type/primitive_type.h" |
45 | | #include "core/string_ref.h" |
46 | | #include "exprs/aggregate/aggregate_function.h" |
47 | | #include "format/format_common.h" |
48 | | #include "format/generic_reader.h" |
49 | | #include "format/orc/vorc_reader.h" |
50 | | #include "format/parquet/schema_desc.h" |
51 | | #include "format/parquet/vparquet_column_chunk_reader.h" |
52 | | #include "format/table/deletion_vector_reader.h" |
53 | | #include "format/table/iceberg/iceberg_orc_nested_column_utils.h" |
54 | | #include "format/table/iceberg/iceberg_parquet_nested_column_utils.h" |
55 | | #include "format/table/nested_column_access_helper.h" |
56 | | #include "format/table/table_schema_change_helper.h" |
57 | | #include "runtime/runtime_state.h" |
58 | | #include "util/coding.h" |
59 | | |
60 | | namespace cctz { |
61 | | class time_zone; |
62 | | } // namespace cctz |
63 | | namespace doris { |
64 | | class RowDescriptor; |
65 | | class SlotDescriptor; |
66 | | class TupleDescriptor; |
67 | | |
68 | | namespace io { |
69 | | struct IOContext; |
70 | | } // namespace io |
71 | | class VExprContext; |
72 | | } // namespace doris |
73 | | |
74 | | namespace doris { |
75 | | const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id"; |
76 | | |
77 | | bool IcebergTableReader::_is_fully_dictionary_encoded( |
78 | 6 | const tparquet::ColumnMetaData& column_metadata) { |
79 | 12 | const auto is_dictionary_encoding = [](tparquet::Encoding::type encoding) { |
80 | 12 | return encoding == tparquet::Encoding::PLAIN_DICTIONARY || |
81 | 12 | encoding == tparquet::Encoding::RLE_DICTIONARY; |
82 | 12 | }; |
83 | 8 | const auto is_data_page = [](tparquet::PageType::type page_type) { |
84 | 8 | return page_type == tparquet::PageType::DATA_PAGE || |
85 | 8 | page_type == tparquet::PageType::DATA_PAGE_V2; |
86 | 8 | }; |
87 | 6 | const auto is_level_encoding = [](tparquet::Encoding::type encoding) { |
88 | 2 | return encoding == tparquet::Encoding::RLE || encoding == tparquet::Encoding::BIT_PACKED; |
89 | 2 | }; |
90 | | |
91 | | // A column chunk may have a dictionary page but still contain plain-encoded data pages. |
92 | | // Only treat it as dictionary-coded when all data pages are dictionary encoded. |
93 | 6 | if (column_metadata.__isset.encoding_stats) { |
94 | 5 | bool has_data_page_stats = false; |
95 | 8 | for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) { |
96 | 8 | if (is_data_page(enc_stat.page_type) && enc_stat.count > 0) { |
97 | 6 | has_data_page_stats = true; |
98 | 6 | if (!is_dictionary_encoding(enc_stat.encoding)) { |
99 | 2 | return false; |
100 | 2 | } |
101 | 6 | } |
102 | 8 | } |
103 | 3 | if (has_data_page_stats) { |
104 | 2 | return true; |
105 | 2 | } |
106 | 3 | } |
107 | | |
108 | 2 | bool has_dict_encoding = false; |
109 | 2 | bool has_nondict_encoding = false; |
110 | 3 | for (const tparquet::Encoding::type& encoding : column_metadata.encodings) { |
111 | 3 | if (is_dictionary_encoding(encoding)) { |
112 | 1 | has_dict_encoding = true; |
113 | 1 | } |
114 | | |
115 | 3 | if (!is_dictionary_encoding(encoding) && !is_level_encoding(encoding)) { |
116 | 2 | has_nondict_encoding = true; |
117 | 2 | break; |
118 | 2 | } |
119 | 3 | } |
120 | 2 | if (!has_dict_encoding || has_nondict_encoding) { |
121 | 2 | return false; |
122 | 2 | } |
123 | | |
124 | 0 | return true; |
125 | 2 | } |
126 | | |
127 | | // ============================================================================ |
128 | | // IcebergParquetReader: on_before_init_reader (Parquet-specific schema matching) |
129 | | // ============================================================================ |
130 | 1 | Status IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) { |
131 | 1 | _column_descs = ctx->column_descs; |
132 | 1 | _fill_col_name_to_block_idx = ctx->col_name_to_block_idx; |
133 | 1 | _file_format = Fileformat::PARQUET; |
134 | | |
135 | | // Get file metadata schema first (available because _open_file() already ran) |
136 | 1 | const FieldDescriptor* field_desc = nullptr; |
137 | 1 | RETURN_IF_ERROR(this->get_file_metadata_schema(&field_desc)); |
138 | 1 | DCHECK(field_desc != nullptr); |
139 | | |
140 | | // Build table_info_node by field_id or name matching. |
141 | | // This must happen BEFORE column classification so we can use children_column_exists |
142 | | // to check if a column exists in the file (by field ID, not name). |
143 | 1 | if (!get_scan_params().__isset.history_schema_info || |
144 | 1 | get_scan_params().history_schema_info.empty()) [[unlikely]] { |
145 | 1 | RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc, |
146 | 1 | ctx->table_info_node)); |
147 | 1 | } else { |
148 | 0 | bool exist_field_id = true; |
149 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( |
150 | 0 | get_scan_params().history_schema_info.front().root_field, *field_desc, |
151 | 0 | ctx->table_info_node, exist_field_id)); |
152 | 0 | if (!exist_field_id) { |
153 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc, |
154 | 0 | ctx->table_info_node)); |
155 | 0 | } |
156 | 0 | } |
157 | | |
158 | 1 | std::unordered_set<std::string> partition_col_names; |
159 | 1 | if (ctx->range->__isset.columns_from_path_keys) { |
160 | 0 | partition_col_names.insert(ctx->range->columns_from_path_keys.begin(), |
161 | 0 | ctx->range->columns_from_path_keys.end()); |
162 | 0 | } |
163 | | |
164 | | // Single pass: classify columns, detect $row_id, handle partition fallback. |
165 | 1 | bool has_partition_from_path = false; |
166 | 2 | for (auto& desc : *ctx->column_descs) { |
167 | 2 | if (desc.category == ColumnCategory::SYNTHESIZED) { |
168 | 0 | if (desc.name == BeConsts::ICEBERG_ROWID_COL) { |
169 | 0 | this->register_synthesized_column_handler( |
170 | 0 | BeConsts::ICEBERG_ROWID_COL, [this](Block* block, size_t rows) -> Status { |
171 | 0 | return _fill_iceberg_row_id(block, rows); |
172 | 0 | }); |
173 | 0 | continue; |
174 | 0 | } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) { |
175 | 0 | auto topn_row_id_column_iter = _create_topn_row_id_column_iterator(); |
176 | 0 | this->register_synthesized_column_handler( |
177 | 0 | desc.name, |
178 | 0 | [iter = std::move(topn_row_id_column_iter), this, &desc]( |
179 | 0 | Block* block, size_t rows) -> Status { |
180 | 0 | return fill_topn_row_id(iter, desc.name, block, rows); |
181 | 0 | }); |
182 | 0 | continue; |
183 | 0 | } |
184 | 2 | } else if (desc.category == ColumnCategory::REGULAR) { |
185 | | // Partition fallback: if column is a partition key and NOT in the file |
186 | | // (checked via field ID matching in table_info_node), read from path instead. |
187 | 2 | if (partition_col_names.contains(desc.name) && |
188 | 2 | !ctx->table_info_node->children_column_exists(desc.name)) { |
189 | 0 | if (config::enable_iceberg_partition_column_fallback) { |
190 | 0 | desc.category = ColumnCategory::PARTITION_KEY; |
191 | 0 | has_partition_from_path = true; |
192 | 0 | continue; |
193 | 0 | } |
194 | 0 | } |
195 | 2 | ctx->column_names.push_back(desc.name); |
196 | 2 | } else if (desc.category == ColumnCategory::GENERATED) { |
197 | 0 | _init_row_lineage_columns(); |
198 | 0 | if (desc.name == ROW_LINEAGE_ROW_ID) { |
199 | 0 | ctx->column_names.push_back(desc.name); |
200 | 0 | this->register_generated_column_handler( |
201 | 0 | ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) -> Status { |
202 | 0 | return _fill_row_lineage_row_id(block, rows); |
203 | 0 | }); |
204 | 0 | continue; |
205 | 0 | } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) { |
206 | 0 | ctx->column_names.push_back(desc.name); |
207 | 0 | this->register_generated_column_handler( |
208 | 0 | ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER, |
209 | 0 | [this](Block* block, size_t rows) -> Status { |
210 | 0 | return _fill_row_lineage_last_updated_sequence_number(block, rows); |
211 | 0 | }); |
212 | 0 | continue; |
213 | 0 | } |
214 | 0 | } |
215 | 2 | } |
216 | | |
217 | | // Set up partition value extraction if any partition columns need filling from path |
218 | 1 | if (has_partition_from_path) { |
219 | 0 | RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor, |
220 | 0 | _fill_partition_values)); |
221 | 0 | } |
222 | | |
223 | 1 | _all_required_col_names = ctx->column_names; |
224 | | |
225 | | // Create column IDs from field descriptor |
226 | 1 | auto column_id_result = _create_column_ids(field_desc, ctx->tuple_descriptor); |
227 | 1 | ctx->column_ids = std::move(column_id_result.column_ids); |
228 | 1 | ctx->filter_column_ids = std::move(column_id_result.filter_column_ids); |
229 | | |
230 | | // Build field_id -> block_column_name mapping for equality delete filtering. |
231 | | // This was previously done in init_reader() column matching (pre-CRTP refactoring). |
232 | 2 | for (const auto* slot : ctx->tuple_descriptor->slots()) { |
233 | 2 | _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name()); |
234 | 2 | } |
235 | | |
236 | | // Process delete files (must happen before _do_init_reader so expand col IDs are included) |
237 | 1 | RETURN_IF_ERROR(_init_row_filters()); |
238 | | |
239 | | // Add expand column IDs for equality delete and remap expand column names |
240 | | // to match master's behavior: |
241 | | // - Use field_id to find the actual file column name in Parquet schema |
242 | | // - Prefix with __equality_delete_column__ to avoid name conflicts |
243 | | // - Correctly map table_col_name → file_col_name in table_info_node |
244 | 1 | const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; |
245 | 1 | std::unordered_map<int, std::string> field_id_to_file_col_name; |
246 | 4 | for (int i = 0; i < field_desc->size(); ++i) { |
247 | 3 | auto field_schema = field_desc->get_column(i); |
248 | 3 | if (field_schema) { |
249 | 3 | field_id_to_file_col_name[field_schema->field_id] = field_schema->name; |
250 | 3 | } |
251 | 3 | } |
252 | | |
253 | | // Rebuild _expand_col_names with proper file-column-based names |
254 | 1 | std::vector<std::string> new_expand_col_names; |
255 | 1 | for (size_t i = 0; i < _expand_col_names.size(); ++i) { |
256 | 0 | const auto& old_name = _expand_col_names[i]; |
257 | | // Find the field_id for this expand column |
258 | 0 | int field_id = -1; |
259 | 0 | for (auto& [fid, name] : _id_to_block_column_name) { |
260 | 0 | if (name == old_name) { |
261 | 0 | field_id = fid; |
262 | 0 | break; |
263 | 0 | } |
264 | 0 | } |
265 | |
|
266 | 0 | std::string file_col_name = old_name; |
267 | 0 | auto it = field_id_to_file_col_name.find(field_id); |
268 | 0 | if (it != field_id_to_file_col_name.end()) { |
269 | 0 | file_col_name = it->second; |
270 | 0 | } |
271 | |
|
272 | 0 | std::string table_col_name = EQ_DELETE_PRE + file_col_name; |
273 | | |
274 | | // Update _id_to_block_column_name |
275 | 0 | if (field_id >= 0) { |
276 | 0 | _id_to_block_column_name[field_id] = table_col_name; |
277 | 0 | } |
278 | | |
279 | | // Update _expand_columns name |
280 | 0 | if (i < _expand_columns.size()) { |
281 | 0 | _expand_columns[i].name = table_col_name; |
282 | 0 | } |
283 | |
|
284 | 0 | new_expand_col_names.push_back(table_col_name); |
285 | | |
286 | | // Add column IDs |
287 | 0 | if (it != field_id_to_file_col_name.end()) { |
288 | 0 | for (int j = 0; j < field_desc->size(); ++j) { |
289 | 0 | auto field_schema = field_desc->get_column(j); |
290 | 0 | if (field_schema && field_schema->field_id == field_id) { |
291 | 0 | ctx->column_ids.insert(field_schema->get_column_id()); |
292 | 0 | break; |
293 | 0 | } |
294 | 0 | } |
295 | 0 | } |
296 | | |
297 | | // Register in table_info_node: table_col_name → file_col_name |
298 | 0 | ctx->column_names.push_back(table_col_name); |
299 | 0 | ctx->table_info_node->add_children(table_col_name, file_col_name, |
300 | 0 | TableSchemaChangeHelper::ConstNode::get_instance()); |
301 | 0 | } |
302 | 1 | _expand_col_names = std::move(new_expand_col_names); |
303 | | |
304 | | // Enable group filtering for Iceberg |
305 | 1 | _filter_groups = true; |
306 | | |
307 | 1 | return Status::OK(); |
308 | 1 | } |
309 | | |
310 | | // ============================================================================ |
311 | | // IcebergParquetReader: _create_column_ids |
312 | | // ============================================================================ |
313 | | ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc, |
314 | 7 | const TupleDescriptor* tuple_descriptor) { |
315 | 7 | auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc); |
316 | 7 | mutable_field_desc->assign_ids(); |
317 | | |
318 | 7 | std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map; |
319 | 58 | for (int i = 0; i < field_desc->size(); ++i) { |
320 | 51 | auto field_schema = field_desc->get_column(i); |
321 | 51 | if (!field_schema) continue; |
322 | 51 | int iceberg_id = field_schema->field_id; |
323 | 51 | iceberg_id_to_field_schema_map[iceberg_id] = field_schema; |
324 | 51 | } |
325 | | |
326 | 7 | std::set<uint64_t> column_ids; |
327 | 7 | std::set<uint64_t> filter_column_ids; |
328 | | |
329 | 7 | auto process_access_paths = [](const FieldSchema* parquet_field, |
330 | 7 | const std::vector<TColumnAccessPath>& access_paths, |
331 | 14 | std::set<uint64_t>& out_ids) { |
332 | 14 | process_nested_access_paths( |
333 | 14 | parquet_field, access_paths, out_ids, |
334 | 14 | [](const FieldSchema* field) { return field->get_column_id(); }, |
335 | 14 | [](const FieldSchema* field) { return field->get_max_column_id(); }, |
336 | 14 | IcebergParquetNestedColumnUtils::extract_nested_column_ids); |
337 | 14 | }; |
338 | | |
339 | 15 | for (const auto* slot : tuple_descriptor->slots()) { |
340 | 15 | auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id()); |
341 | 15 | if (it == iceberg_id_to_field_schema_map.end()) { |
342 | 0 | continue; |
343 | 0 | } |
344 | 15 | auto field_schema = it->second; |
345 | | |
346 | 15 | if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY && |
347 | 15 | slot->col_type() != TYPE_MAP)) { |
348 | 7 | column_ids.insert(field_schema->column_id); |
349 | 7 | if (slot->is_predicate()) { |
350 | 0 | filter_column_ids.insert(field_schema->column_id); |
351 | 0 | } |
352 | 7 | continue; |
353 | 7 | } |
354 | | |
355 | 8 | const auto& all_access_paths = slot->all_access_paths(); |
356 | 8 | process_access_paths(field_schema, all_access_paths, column_ids); |
357 | | |
358 | 8 | const auto& predicate_access_paths = slot->predicate_access_paths(); |
359 | 8 | if (!predicate_access_paths.empty()) { |
360 | 6 | process_access_paths(field_schema, predicate_access_paths, filter_column_ids); |
361 | 6 | } |
362 | 8 | } |
363 | 7 | return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); |
364 | 7 | } |
365 | | |
366 | | // ============================================================================ |
367 | | // IcebergParquetReader: _read_position_delete_file |
368 | | // ============================================================================ |
369 | | Status IcebergParquetReader::_read_position_delete_file(const TFileRangeDesc* delete_range, |
370 | 0 | DeleteFile* position_delete) { |
371 | 0 | ParquetReader parquet_delete_reader(get_profile(), get_scan_params(), *delete_range, |
372 | 0 | READ_DELETE_FILE_BATCH_SIZE, &get_state()->timezone_obj(), |
373 | 0 | get_io_ctx(), get_state(), _meta_cache); |
374 | | // The delete file range has size=-1 (read whole file). We must disable |
375 | | // row group filtering before init; otherwise _do_init_reader returns EndOfFile |
376 | | // when _filter_groups && _range_size < 0. |
377 | 0 | ParquetInitContext delete_ctx; |
378 | 0 | delete_ctx.filter_groups = false; |
379 | 0 | delete_ctx.column_names = delete_file_col_names; |
380 | 0 | delete_ctx.col_name_to_block_idx = |
381 | 0 | const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX); |
382 | 0 | RETURN_IF_ERROR(parquet_delete_reader.init_reader(&delete_ctx)); |
383 | | |
384 | 0 | const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data(); |
385 | 0 | bool dictionary_coded = true; |
386 | 0 | for (const auto& row_group : meta_data->row_groups) { |
387 | 0 | const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX]; |
388 | 0 | if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) { |
389 | 0 | dictionary_coded = false; |
390 | 0 | break; |
391 | 0 | } |
392 | 0 | } |
393 | 0 | DataTypePtr data_type_file_path {new DataTypeString}; |
394 | 0 | DataTypePtr data_type_pos {new DataTypeInt64}; |
395 | 0 | bool eof = false; |
396 | 0 | while (!eof) { |
397 | 0 | Block block = {dictionary_coded |
398 | 0 | ? ColumnWithTypeAndName {ColumnDictI32::create( |
399 | 0 | FieldType::OLAP_FIELD_TYPE_VARCHAR), |
400 | 0 | data_type_file_path, ICEBERG_FILE_PATH} |
401 | 0 | : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH}, |
402 | |
|
403 | 0 | {data_type_pos, ICEBERG_ROW_POS}}; |
404 | 0 | size_t read_rows = 0; |
405 | 0 | RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof)); |
406 | | |
407 | 0 | if (read_rows <= 0) { |
408 | 0 | break; |
409 | 0 | } |
410 | 0 | _gen_position_delete_file_range(block, position_delete, read_rows, dictionary_coded); |
411 | 0 | } |
412 | 0 | return Status::OK(); |
413 | 0 | }; |
414 | | |
415 | | // ============================================================================ |
416 | | // IcebergOrcReader: on_before_init_reader (ORC-specific schema matching) |
417 | | // ============================================================================ |
418 | 1 | Status IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) { |
419 | 1 | _column_descs = ctx->column_descs; |
420 | 1 | _fill_col_name_to_block_idx = ctx->col_name_to_block_idx; |
421 | 1 | _file_format = Fileformat::ORC; |
422 | | |
423 | | // Get ORC file type first (available because _create_file_reader() already ran) |
424 | 1 | const orc::Type* orc_type_ptr = nullptr; |
425 | 1 | RETURN_IF_ERROR(this->get_file_type(&orc_type_ptr)); |
426 | | |
427 | | // Build table_info_node by field_id or name matching. |
428 | | // This must happen BEFORE column classification so we can use children_column_exists |
429 | | // to check if a column exists in the file (by field ID, not name). |
430 | 1 | if (!get_scan_params().__isset.history_schema_info || |
431 | 1 | get_scan_params().history_schema_info.empty()) [[unlikely]] { |
432 | 1 | RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr, |
433 | 1 | ctx->table_info_node)); |
434 | 1 | } else { |
435 | 0 | bool exist_field_id = true; |
436 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( |
437 | 0 | get_scan_params().history_schema_info.front().root_field, orc_type_ptr, |
438 | 0 | ICEBERG_ORC_ATTRIBUTE, ctx->table_info_node, exist_field_id)); |
439 | 0 | if (!exist_field_id) { |
440 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr, |
441 | 0 | ctx->table_info_node)); |
442 | 0 | } |
443 | 0 | } |
444 | | |
445 | 1 | std::unordered_set<std::string> partition_col_names; |
446 | 1 | if (ctx->range->__isset.columns_from_path_keys) { |
447 | 0 | partition_col_names.insert(ctx->range->columns_from_path_keys.begin(), |
448 | 0 | ctx->range->columns_from_path_keys.end()); |
449 | 0 | } |
450 | | |
451 | | // Single pass: classify columns, detect $row_id, handle partition fallback. |
452 | 1 | bool has_partition_from_path = false; |
453 | 2 | for (auto& desc : *ctx->column_descs) { |
454 | 2 | if (desc.category == ColumnCategory::SYNTHESIZED) { |
455 | 0 | if (desc.name == BeConsts::ICEBERG_ROWID_COL) { |
456 | 0 | this->register_synthesized_column_handler( |
457 | 0 | BeConsts::ICEBERG_ROWID_COL, [this](Block* block, size_t rows) -> Status { |
458 | 0 | return _fill_iceberg_row_id(block, rows); |
459 | 0 | }); |
460 | 0 | continue; |
461 | 0 | } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) { |
462 | 0 | auto topn_row_id_column_iter = _create_topn_row_id_column_iterator(); |
463 | 0 | this->register_synthesized_column_handler( |
464 | 0 | desc.name, |
465 | 0 | [iter = std::move(topn_row_id_column_iter), this, &desc]( |
466 | 0 | Block* block, size_t rows) -> Status { |
467 | 0 | return fill_topn_row_id(iter, desc.name, block, rows); |
468 | 0 | }); |
469 | 0 | continue; |
470 | 0 | } |
471 | 2 | } else if (desc.category == ColumnCategory::REGULAR) { |
472 | | // Partition fallback: if column is a partition key and NOT in the file |
473 | | // (checked via field ID matching in table_info_node), read from path instead. |
474 | 2 | if (partition_col_names.contains(desc.name) && |
475 | 2 | !ctx->table_info_node->children_column_exists(desc.name)) { |
476 | 0 | if (config::enable_iceberg_partition_column_fallback) { |
477 | 0 | desc.category = ColumnCategory::PARTITION_KEY; |
478 | 0 | has_partition_from_path = true; |
479 | 0 | continue; |
480 | 0 | } |
481 | 0 | } |
482 | 2 | ctx->column_names.push_back(desc.name); |
483 | 2 | } else if (desc.category == ColumnCategory::GENERATED) { |
484 | 0 | _init_row_lineage_columns(); |
485 | 0 | if (desc.name == ROW_LINEAGE_ROW_ID) { |
486 | 0 | ctx->column_names.push_back(desc.name); |
487 | 0 | this->register_generated_column_handler( |
488 | 0 | ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) -> Status { |
489 | 0 | return _fill_row_lineage_row_id(block, rows); |
490 | 0 | }); |
491 | 0 | continue; |
492 | 0 | } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) { |
493 | 0 | ctx->column_names.push_back(desc.name); |
494 | 0 | this->register_generated_column_handler( |
495 | 0 | ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER, |
496 | 0 | [this](Block* block, size_t rows) -> Status { |
497 | 0 | return _fill_row_lineage_last_updated_sequence_number(block, rows); |
498 | 0 | }); |
499 | 0 | continue; |
500 | 0 | } |
501 | 0 | } |
502 | 2 | } |
503 | | |
504 | 1 | if (has_partition_from_path) { |
505 | 0 | RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor, |
506 | 0 | _fill_partition_values)); |
507 | 0 | } |
508 | | |
509 | 1 | _all_required_col_names = ctx->column_names; |
510 | | |
511 | | // Create column IDs from ORC type |
512 | 1 | auto column_id_result = _create_column_ids(orc_type_ptr, ctx->tuple_descriptor); |
513 | 1 | ctx->column_ids = std::move(column_id_result.column_ids); |
514 | 1 | ctx->filter_column_ids = std::move(column_id_result.filter_column_ids); |
515 | | |
516 | | // Build field_id -> block_column_name mapping for equality delete filtering. |
517 | 2 | for (const auto* slot : ctx->tuple_descriptor->slots()) { |
518 | 2 | _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name()); |
519 | 2 | } |
520 | | |
521 | | // Process delete files (must happen before _do_init_reader so expand col IDs are included) |
522 | 1 | RETURN_IF_ERROR(_init_row_filters()); |
523 | | |
524 | | // Add expand column IDs for equality delete and remap expand column names |
525 | | // (matching master's behavior with __equality_delete_column__ prefix) |
526 | 1 | const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; |
527 | 1 | std::unordered_map<int, std::string> field_id_to_file_col_name; |
528 | 4 | for (uint64_t i = 0; i < orc_type_ptr->getSubtypeCount(); ++i) { |
529 | 3 | std::string col_name = orc_type_ptr->getFieldName(i); |
530 | 3 | const orc::Type* sub_type = orc_type_ptr->getSubtype(i); |
531 | 3 | if (sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
532 | 3 | int fid = std::stoi(sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
533 | 3 | field_id_to_file_col_name[fid] = col_name; |
534 | 3 | } |
535 | 3 | } |
536 | | |
537 | 1 | std::vector<std::string> new_expand_col_names; |
538 | 1 | for (size_t i = 0; i < _expand_col_names.size(); ++i) { |
539 | 0 | const auto& old_name = _expand_col_names[i]; |
540 | 0 | int field_id = -1; |
541 | 0 | for (auto& [fid, name] : _id_to_block_column_name) { |
542 | 0 | if (name == old_name) { |
543 | 0 | field_id = fid; |
544 | 0 | break; |
545 | 0 | } |
546 | 0 | } |
547 | |
|
548 | 0 | std::string file_col_name = old_name; |
549 | 0 | auto it = field_id_to_file_col_name.find(field_id); |
550 | 0 | if (it != field_id_to_file_col_name.end()) { |
551 | 0 | file_col_name = it->second; |
552 | 0 | } |
553 | |
|
554 | 0 | std::string table_col_name = EQ_DELETE_PRE + file_col_name; |
555 | |
|
556 | 0 | if (field_id >= 0) { |
557 | 0 | _id_to_block_column_name[field_id] = table_col_name; |
558 | 0 | } |
559 | 0 | if (i < _expand_columns.size()) { |
560 | 0 | _expand_columns[i].name = table_col_name; |
561 | 0 | } |
562 | 0 | new_expand_col_names.push_back(table_col_name); |
563 | | |
564 | | // Add column IDs |
565 | 0 | if (it != field_id_to_file_col_name.end()) { |
566 | 0 | for (uint64_t j = 0; j < orc_type_ptr->getSubtypeCount(); ++j) { |
567 | 0 | const orc::Type* sub_type = orc_type_ptr->getSubtype(j); |
568 | 0 | if (orc_type_ptr->getFieldName(j) == file_col_name) { |
569 | 0 | ctx->column_ids.insert(sub_type->getColumnId()); |
570 | 0 | break; |
571 | 0 | } |
572 | 0 | } |
573 | 0 | } |
574 | |
|
575 | 0 | ctx->column_names.push_back(table_col_name); |
576 | 0 | ctx->table_info_node->add_children(table_col_name, file_col_name, |
577 | 0 | TableSchemaChangeHelper::ConstNode::get_instance()); |
578 | 0 | } |
579 | 1 | _expand_col_names = std::move(new_expand_col_names); |
580 | | |
581 | 1 | return Status::OK(); |
582 | 1 | } |
583 | | |
584 | | // ============================================================================ |
585 | | // IcebergOrcReader: _create_column_ids |
586 | | // ============================================================================ |
587 | | ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, |
588 | 7 | const TupleDescriptor* tuple_descriptor) { |
589 | 7 | std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map; |
590 | 58 | for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) { |
591 | 51 | auto orc_sub_type = orc_type->getSubtype(i); |
592 | 51 | if (!orc_sub_type) continue; |
593 | 51 | if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
594 | 0 | continue; |
595 | 0 | } |
596 | 51 | int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
597 | 51 | iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type; |
598 | 51 | } |
599 | | |
600 | 7 | std::set<uint64_t> column_ids; |
601 | 7 | std::set<uint64_t> filter_column_ids; |
602 | | |
603 | 7 | auto process_access_paths = [](const orc::Type* orc_field, |
604 | 7 | const std::vector<TColumnAccessPath>& access_paths, |
605 | 14 | std::set<uint64_t>& out_ids) { |
606 | 14 | process_nested_access_paths( |
607 | 14 | orc_field, access_paths, out_ids, |
608 | 14 | [](const orc::Type* type) { return type->getColumnId(); }, |
609 | 14 | [](const orc::Type* type) { return type->getMaximumColumnId(); }, |
610 | 14 | IcebergOrcNestedColumnUtils::extract_nested_column_ids); |
611 | 14 | }; |
612 | | |
613 | 15 | for (const auto* slot : tuple_descriptor->slots()) { |
614 | 15 | auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id()); |
615 | 15 | if (it == iceberg_id_to_orc_type_map.end()) { |
616 | 0 | continue; |
617 | 0 | } |
618 | 15 | const orc::Type* orc_field = it->second; |
619 | | |
620 | 15 | if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY && |
621 | 15 | slot->col_type() != TYPE_MAP)) { |
622 | 7 | column_ids.insert(orc_field->getColumnId()); |
623 | 7 | if (slot->is_predicate()) { |
624 | 0 | filter_column_ids.insert(orc_field->getColumnId()); |
625 | 0 | } |
626 | 7 | continue; |
627 | 7 | } |
628 | | |
629 | 8 | const auto& all_access_paths = slot->all_access_paths(); |
630 | 8 | process_access_paths(orc_field, all_access_paths, column_ids); |
631 | | |
632 | 8 | const auto& predicate_access_paths = slot->predicate_access_paths(); |
633 | 8 | if (!predicate_access_paths.empty()) { |
634 | 6 | process_access_paths(orc_field, predicate_access_paths, filter_column_ids); |
635 | 6 | } |
636 | 8 | } |
637 | | |
638 | 7 | return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); |
639 | 7 | } |
640 | | |
641 | | // ============================================================================ |
642 | | // IcebergOrcReader: _read_position_delete_file |
643 | | // ============================================================================ |
644 | | Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, |
645 | 0 | DeleteFile* position_delete) { |
646 | 0 | OrcReader orc_delete_reader(get_profile(), get_state(), get_scan_params(), *delete_range, |
647 | 0 | READ_DELETE_FILE_BATCH_SIZE, get_state()->timezone(), get_io_ctx(), |
648 | 0 | _meta_cache); |
649 | 0 | OrcInitContext delete_ctx; |
650 | 0 | delete_ctx.column_names = delete_file_col_names; |
651 | 0 | delete_ctx.col_name_to_block_idx = |
652 | 0 | const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX); |
653 | 0 | RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_ctx)); |
654 | | |
655 | 0 | bool eof = false; |
656 | 0 | DataTypePtr data_type_file_path {new DataTypeString}; |
657 | 0 | DataTypePtr data_type_pos {new DataTypeInt64}; |
658 | 0 | while (!eof) { |
659 | 0 | Block block = {{data_type_file_path, ICEBERG_FILE_PATH}, {data_type_pos, ICEBERG_ROW_POS}}; |
660 | |
|
661 | 0 | size_t read_rows = 0; |
662 | 0 | RETURN_IF_ERROR(orc_delete_reader.get_next_block(&block, &read_rows, &eof)); |
663 | | |
664 | 0 | _gen_position_delete_file_range(block, position_delete, read_rows, false); |
665 | 0 | } |
666 | 0 | return Status::OK(); |
667 | 0 | } |
668 | | |
669 | | } // namespace doris |