Coverage Report

Created: 2026-07-02 12:30

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/iceberg_reader.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <glog/logging.h>
25
#include <parallel_hashmap/phmap.h>
26
#include <rapidjson/document.h>
27
28
#include <algorithm>
29
#include <cstring>
30
#include <functional>
31
#include <memory>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/consts.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/column/column_string.h"
41
#include "core/column/column_vector.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "core/data_type/define_primitive_type.h"
44
#include "core/data_type/primitive_type.h"
45
#include "core/string_ref.h"
46
#include "exprs/aggregate/aggregate_function.h"
47
#include "format/format_common.h"
48
#include "format/generic_reader.h"
49
#include "format/orc/vorc_reader.h"
50
#include "format/parquet/schema_desc.h"
51
#include "format/parquet/vparquet_column_chunk_reader.h"
52
#include "format/table/deletion_vector_reader.h"
53
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
54
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
55
#include "format/table/nested_column_access_helper.h"
56
#include "format/table/table_schema_change_helper.h"
57
#include "runtime/runtime_state.h"
58
#include "util/coding.h"
59
60
namespace cctz {
61
class time_zone;
62
} // namespace cctz
63
namespace doris {
64
class RowDescriptor;
65
class SlotDescriptor;
66
class TupleDescriptor;
67
68
namespace io {
69
struct IOContext;
70
} // namespace io
71
class VExprContext;
72
} // namespace doris
73
74
namespace doris {
75
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
76
77
bool IcebergTableReader::_is_fully_dictionary_encoded(
78
6
        const tparquet::ColumnMetaData& column_metadata) {
79
12
    const auto is_dictionary_encoding = [](tparquet::Encoding::type encoding) {
80
12
        return encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
81
12
               encoding == tparquet::Encoding::RLE_DICTIONARY;
82
12
    };
83
8
    const auto is_data_page = [](tparquet::PageType::type page_type) {
84
8
        return page_type == tparquet::PageType::DATA_PAGE ||
85
8
               page_type == tparquet::PageType::DATA_PAGE_V2;
86
8
    };
87
6
    const auto is_level_encoding = [](tparquet::Encoding::type encoding) {
88
2
        return encoding == tparquet::Encoding::RLE || encoding == tparquet::Encoding::BIT_PACKED;
89
2
    };
90
91
    // A column chunk may have a dictionary page but still contain plain-encoded data pages.
92
    // Only treat it as dictionary-coded when all data pages are dictionary encoded.
93
6
    if (column_metadata.__isset.encoding_stats) {
94
5
        bool has_data_page_stats = false;
95
8
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
96
8
            if (is_data_page(enc_stat.page_type) && enc_stat.count > 0) {
97
6
                has_data_page_stats = true;
98
6
                if (!is_dictionary_encoding(enc_stat.encoding)) {
99
2
                    return false;
100
2
                }
101
6
            }
102
8
        }
103
3
        if (has_data_page_stats) {
104
2
            return true;
105
2
        }
106
3
    }
107
108
2
    bool has_dict_encoding = false;
109
2
    bool has_nondict_encoding = false;
110
3
    for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
111
3
        if (is_dictionary_encoding(encoding)) {
112
1
            has_dict_encoding = true;
113
1
        }
114
115
3
        if (!is_dictionary_encoding(encoding) && !is_level_encoding(encoding)) {
116
2
            has_nondict_encoding = true;
117
2
            break;
118
2
        }
119
3
    }
120
2
    if (!has_dict_encoding || has_nondict_encoding) {
121
2
        return false;
122
2
    }
123
124
0
    return true;
125
2
}
126
127
// ============================================================================
128
// IcebergParquetReader: on_before_init_reader (Parquet-specific schema matching)
129
// ============================================================================
130
44
Status IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
131
44
    _column_descs = ctx->column_descs;
132
44
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
133
44
    _file_format = Fileformat::PARQUET;
134
135
    // Get file metadata schema first (available because _open_file() already ran)
136
44
    const FieldDescriptor* field_desc = nullptr;
137
44
    RETURN_IF_ERROR(this->get_file_metadata_schema(&field_desc));
138
44
    DCHECK(field_desc != nullptr);
139
140
    // Build table_info_node by field_id or name matching.
141
    // This must happen BEFORE column classification so we can use children_column_exists
142
    // to check if a column exists in the file (by field ID, not name).
143
44
    if (!get_scan_params().__isset.history_schema_info ||
144
44
        get_scan_params().history_schema_info.empty()) [[unlikely]] {
145
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc,
146
1
                                                            ctx->table_info_node));
147
43
    } else {
148
43
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id_with_name_mapping(
149
43
                get_scan_params().history_schema_info.front().root_field, *field_desc,
150
43
                ctx->table_info_node));
151
43
    }
152
153
44
    std::unordered_set<std::string> partition_col_names;
154
44
    if (ctx->range->__isset.columns_from_path_keys) {
155
19
        partition_col_names.insert(ctx->range->columns_from_path_keys.begin(),
156
19
                                   ctx->range->columns_from_path_keys.end());
157
19
    }
158
159
    // Single pass: classify columns, detect $row_id, handle partition fallback.
160
44
    bool has_partition_from_path = false;
161
363
    for (const auto& desc : *ctx->column_descs) {
162
363
        if (desc.category == ColumnCategory::SYNTHESIZED) {
163
0
            if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
164
0
                this->register_synthesized_column_handler(
165
0
                        BeConsts::ICEBERG_ROWID_COL, [this](Block* block, size_t rows) -> Status {
166
0
                            return _fill_iceberg_row_id(block, rows);
167
0
                        });
168
0
                continue;
169
0
            } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
170
0
                auto topn_row_id_column_iter = _create_topn_row_id_column_iterator();
171
0
                this->register_synthesized_column_handler(
172
0
                        desc.name,
173
0
                        [iter = std::move(topn_row_id_column_iter), this, &desc](
174
0
                                Block* block, size_t rows) -> Status {
175
0
                            return fill_topn_row_id(iter, desc.name, block, rows);
176
0
                        });
177
0
                continue;
178
0
            }
179
363
        } else if (desc.category == ColumnCategory::PARTITION_KEY) {
180
19
            bool has_partition_value = partition_col_names.contains(desc.name);
181
19
            bool exists_in_file = ctx->table_info_node->children_column_exists(desc.name);
182
19
            if (!has_partition_value || exists_in_file) {
183
                // Keep PARTITION_KEY category stable for scan planning, but still read
184
                // from file when the column exists there.
185
19
                ctx->column_names.push_back(desc.name);
186
19
                continue;
187
19
            }
188
0
            has_partition_from_path = true;
189
344
        } else if (desc.category == ColumnCategory::REGULAR) {
190
344
            ctx->column_names.push_back(desc.name);
191
344
        } else if (desc.category == ColumnCategory::GENERATED) {
192
0
            _init_row_lineage_columns();
193
0
            if (desc.name == ROW_LINEAGE_ROW_ID) {
194
0
                ctx->column_names.push_back(desc.name);
195
0
                this->register_generated_column_handler(
196
0
                        ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) -> Status {
197
0
                            return _fill_row_lineage_row_id(block, rows);
198
0
                        });
199
0
                continue;
200
0
            } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
201
0
                ctx->column_names.push_back(desc.name);
202
0
                this->register_generated_column_handler(
203
0
                        ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
204
0
                        [this](Block* block, size_t rows) -> Status {
205
0
                            return _fill_row_lineage_last_updated_sequence_number(block, rows);
206
0
                        });
207
0
                continue;
208
0
            }
209
0
        }
210
363
    }
211
212
    // Set up partition value extraction if any partition columns need filling from path
213
44
    if (has_partition_from_path) {
214
0
        RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
215
0
                                                  _fill_partition_values,
216
0
                                                  &_fill_partition_value_is_null));
217
0
    }
218
219
44
    _all_required_col_names = ctx->column_names;
220
221
    // Create column IDs from field descriptor
222
44
    auto column_id_result = _create_column_ids(field_desc, ctx->tuple_descriptor);
223
44
    ctx->column_ids = std::move(column_id_result.column_ids);
224
44
    ctx->filter_column_ids = std::move(column_id_result.filter_column_ids);
225
226
    // Build field_id -> block_column_name mapping for equality delete filtering.
227
    // This was previously done in init_reader() column matching (pre-CRTP refactoring).
228
363
    for (const auto* slot : ctx->tuple_descriptor->slots()) {
229
363
        _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name());
230
363
    }
231
232
    // Process delete files (must happen before _do_init_reader so expand col IDs are included)
233
44
    RETURN_IF_ERROR(_init_row_filters());
234
235
    // Add expand column IDs for equality delete and remap expand column names
236
    // to match master's behavior:
237
    // - Use field_id to find the actual file column name in Parquet schema
238
    // - Prefix with __equality_delete_column__ to avoid name conflicts
239
    // - Correctly map table_col_name → file_col_name in table_info_node
240
44
    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
241
44
    std::unordered_map<int, std::string> field_id_to_file_col_name;
242
448
    for (int i = 0; i < field_desc->size(); ++i) {
243
404
        auto field_schema = field_desc->get_column(i);
244
404
        if (field_schema) {
245
404
            field_id_to_file_col_name[field_schema->field_id] = field_schema->name;
246
404
        }
247
404
    }
248
249
    // Rebuild _expand_col_names with proper file-column-based names
250
44
    std::vector<std::string> new_expand_col_names;
251
44
    for (size_t i = 0; i < _expand_col_names.size(); ++i) {
252
0
        const auto& old_name = _expand_col_names[i];
253
        // Find the field_id for this expand column
254
0
        int field_id = -1;
255
0
        for (auto& [fid, name] : _id_to_block_column_name) {
256
0
            if (name == old_name) {
257
0
                field_id = fid;
258
0
                break;
259
0
            }
260
0
        }
261
262
0
        std::string file_col_name = old_name;
263
0
        auto it = field_id_to_file_col_name.find(field_id);
264
0
        if (it != field_id_to_file_col_name.end()) {
265
0
            file_col_name = it->second;
266
0
        }
267
268
0
        std::string table_col_name = EQ_DELETE_PRE + file_col_name;
269
270
        // Update _id_to_block_column_name
271
0
        if (field_id >= 0) {
272
0
            _id_to_block_column_name[field_id] = table_col_name;
273
0
        }
274
275
        // Update _expand_columns name
276
0
        if (i < _expand_columns.size()) {
277
0
            _expand_columns[i].name = table_col_name;
278
0
        }
279
280
0
        new_expand_col_names.push_back(table_col_name);
281
282
        // Add column IDs
283
0
        if (it != field_id_to_file_col_name.end()) {
284
0
            for (int j = 0; j < field_desc->size(); ++j) {
285
0
                auto field_schema = field_desc->get_column(j);
286
0
                if (field_schema && field_schema->field_id == field_id) {
287
0
                    ctx->column_ids.insert(field_schema->get_column_id());
288
0
                    break;
289
0
                }
290
0
            }
291
0
        }
292
293
        // Register in table_info_node: table_col_name → file_col_name
294
0
        ctx->column_names.push_back(table_col_name);
295
0
        ctx->table_info_node->add_children(table_col_name, file_col_name,
296
0
                                           TableSchemaChangeHelper::ConstNode::get_instance());
297
0
    }
298
44
    _expand_col_names = std::move(new_expand_col_names);
299
300
    // Enable group filtering for Iceberg
301
44
    _filter_groups = true;
302
303
44
    return Status::OK();
304
44
}
305
306
// ============================================================================
307
// IcebergParquetReader: _create_column_ids
308
// ============================================================================
309
ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
310
50
                                                        const TupleDescriptor* tuple_descriptor) {
311
50
    auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc);
312
50
    mutable_field_desc->assign_ids();
313
314
50
    std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map;
315
502
    for (int i = 0; i < field_desc->size(); ++i) {
316
452
        auto field_schema = field_desc->get_column(i);
317
452
        if (!field_schema) continue;
318
452
        int iceberg_id = field_schema->field_id;
319
452
        iceberg_id_to_field_schema_map[iceberg_id] = field_schema;
320
452
    }
321
322
50
    std::set<uint64_t> column_ids;
323
50
    std::set<uint64_t> filter_column_ids;
324
325
50
    auto process_access_paths = [](const FieldSchema* parquet_field,
326
50
                                   const std::vector<TColumnAccessPath>& access_paths,
327
50
                                   std::set<uint64_t>& out_ids) {
328
14
        process_nested_access_paths(
329
14
                parquet_field, access_paths, out_ids,
330
14
                [](const FieldSchema* field) { return field->get_column_id(); },
331
14
                [](const FieldSchema* field) { return field->get_max_column_id(); },
332
14
                IcebergParquetNestedColumnUtils::extract_nested_column_ids);
333
14
    };
334
335
376
    for (const auto* slot : tuple_descriptor->slots()) {
336
376
        auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id());
337
376
        if (it == iceberg_id_to_field_schema_map.end()) {
338
9
            continue;
339
9
        }
340
367
        auto field_schema = it->second;
341
342
367
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
343
367
             slot->col_type() != TYPE_MAP)) {
344
359
            column_ids.insert(field_schema->column_id);
345
359
            if (slot->is_predicate()) {
346
0
                filter_column_ids.insert(field_schema->column_id);
347
0
            }
348
359
            continue;
349
359
        }
350
351
8
        const auto& all_access_paths = slot->all_access_paths();
352
8
        process_access_paths(field_schema, all_access_paths, column_ids);
353
354
8
        const auto& predicate_access_paths = slot->predicate_access_paths();
355
8
        if (!predicate_access_paths.empty()) {
356
6
            process_access_paths(field_schema, predicate_access_paths, filter_column_ids);
357
6
        }
358
8
    }
359
50
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
360
50
}
361
362
// ============================================================================
363
// IcebergParquetReader: _read_position_delete_file
364
// ============================================================================
365
Status IcebergParquetReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
366
0
                                                        DeleteFile* position_delete) {
367
0
    ParquetReader parquet_delete_reader(get_profile(), get_scan_params(), *delete_range,
368
0
                                        READ_DELETE_FILE_BATCH_SIZE, &get_state()->timezone_obj(),
369
0
                                        get_io_ctx(), get_state(), _meta_cache);
370
    // The delete file range has size=-1 (read whole file). We must disable
371
    // row group filtering before init; otherwise _do_init_reader returns EndOfFile
372
    // when _filter_groups && _range_size < 0.
373
0
    ParquetInitContext delete_ctx;
374
0
    delete_ctx.filter_groups = false;
375
0
    delete_ctx.column_names = delete_file_col_names;
376
0
    delete_ctx.col_name_to_block_idx =
377
0
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX);
378
0
    RETURN_IF_ERROR(parquet_delete_reader.init_reader(&delete_ctx));
379
380
0
    const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data();
381
0
    bool dictionary_coded = true;
382
0
    for (const auto& row_group : meta_data->row_groups) {
383
0
        const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX];
384
0
        if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
385
0
            dictionary_coded = false;
386
0
            break;
387
0
        }
388
0
    }
389
0
    DataTypePtr data_type_file_path {new DataTypeString};
390
0
    DataTypePtr data_type_pos {new DataTypeInt64};
391
0
    bool eof = false;
392
0
    while (!eof) {
393
0
        Block block = {dictionary_coded
394
0
                               ? ColumnWithTypeAndName {ColumnDictI32::create(),
395
0
                                                        data_type_file_path, ICEBERG_FILE_PATH}
396
0
                               : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH},
397
398
0
                       {data_type_pos, ICEBERG_ROW_POS}};
399
0
        size_t read_rows = 0;
400
0
        RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof));
401
402
0
        if (read_rows <= 0) {
403
0
            break;
404
0
        }
405
0
        _gen_position_delete_file_range(block, position_delete, read_rows, dictionary_coded);
406
0
    }
407
0
    return Status::OK();
408
0
};
409
410
// ============================================================================
411
// IcebergOrcReader: on_before_init_reader (ORC-specific schema matching)
412
// ============================================================================
413
3.28k
Status IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
414
3.28k
    _column_descs = ctx->column_descs;
415
3.28k
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
416
3.28k
    _file_format = Fileformat::ORC;
417
418
    // Get ORC file type first (available because _create_file_reader() already ran)
419
3.28k
    const orc::Type* orc_type_ptr = nullptr;
420
3.28k
    RETURN_IF_ERROR(this->get_file_type(&orc_type_ptr));
421
422
    // Build table_info_node by field_id or name matching.
423
    // This must happen BEFORE column classification so we can use children_column_exists
424
    // to check if a column exists in the file (by field ID, not name).
425
3.28k
    if (!get_scan_params().__isset.history_schema_info ||
426
3.28k
        get_scan_params().history_schema_info.empty()) [[unlikely]] {
427
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr,
428
1
                                                        ctx->table_info_node));
429
3.28k
    } else {
430
3.28k
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id_with_name_mapping(
431
3.28k
                get_scan_params().history_schema_info.front().root_field, orc_type_ptr,
432
3.28k
                ICEBERG_ORC_ATTRIBUTE, ctx->table_info_node));
433
3.28k
    }
434
435
3.28k
    std::unordered_set<std::string> partition_col_names;
436
3.28k
    if (ctx->range->__isset.columns_from_path_keys) {
437
353
        partition_col_names.insert(ctx->range->columns_from_path_keys.begin(),
438
353
                                   ctx->range->columns_from_path_keys.end());
439
353
    }
440
441
    // Single pass: classify columns, detect $row_id, handle partition fallback.
442
3.28k
    bool has_partition_from_path = false;
443
16.0k
    for (const auto& desc : *ctx->column_descs) {
444
16.0k
        if (desc.category == ColumnCategory::SYNTHESIZED) {
445
163
            if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
446
47
                this->register_synthesized_column_handler(
447
47
                        BeConsts::ICEBERG_ROWID_COL, [this](Block* block, size_t rows) -> Status {
448
45
                            return _fill_iceberg_row_id(block, rows);
449
45
                        });
450
47
                continue;
451
116
            } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
452
116
                auto topn_row_id_column_iter = _create_topn_row_id_column_iterator();
453
116
                this->register_synthesized_column_handler(
454
116
                        desc.name,
455
116
                        [iter = std::move(topn_row_id_column_iter), this, &desc](
456
116
                                Block* block, size_t rows) -> Status {
457
111
                            return fill_topn_row_id(iter, desc.name, block, rows);
458
111
                        });
459
116
                continue;
460
116
            }
461
15.8k
        } else if (desc.category == ColumnCategory::PARTITION_KEY) {
462
448
            bool has_partition_value = partition_col_names.contains(desc.name);
463
448
            bool exists_in_file = ctx->table_info_node->children_column_exists(desc.name);
464
448
            if (!has_partition_value || exists_in_file) {
465
448
                ctx->column_names.push_back(desc.name);
466
448
                continue;
467
448
            }
468
0
            has_partition_from_path = true;
469
15.4k
        } else if (desc.category == ColumnCategory::REGULAR) {
470
15.2k
            ctx->column_names.push_back(desc.name);
471
15.2k
        } else if (desc.category == ColumnCategory::GENERATED) {
472
207
            _init_row_lineage_columns();
473
207
            if (desc.name == ROW_LINEAGE_ROW_ID) {
474
110
                ctx->column_names.push_back(desc.name);
475
110
                this->register_generated_column_handler(
476
111
                        ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) -> Status {
477
111
                            return _fill_row_lineage_row_id(block, rows);
478
111
                        });
479
110
                continue;
480
110
            } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
481
97
                ctx->column_names.push_back(desc.name);
482
97
                this->register_generated_column_handler(
483
97
                        ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
484
97
                        [this](Block* block, size_t rows) -> Status {
485
97
                            return _fill_row_lineage_last_updated_sequence_number(block, rows);
486
97
                        });
487
97
                continue;
488
97
            }
489
207
        }
490
16.0k
    }
491
492
3.28k
    if (has_partition_from_path) {
493
0
        RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
494
0
                                                  _fill_partition_values,
495
0
                                                  &_fill_partition_value_is_null));
496
0
    }
497
498
3.28k
    _all_required_col_names = ctx->column_names;
499
500
    // Create column IDs from ORC type
501
3.28k
    auto column_id_result = _create_column_ids(orc_type_ptr, ctx->tuple_descriptor);
502
3.28k
    ctx->column_ids = std::move(column_id_result.column_ids);
503
3.28k
    ctx->filter_column_ids = std::move(column_id_result.filter_column_ids);
504
505
    // Build field_id -> block_column_name mapping for equality delete filtering.
506
16.0k
    for (const auto* slot : ctx->tuple_descriptor->slots()) {
507
16.0k
        _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name());
508
16.0k
    }
509
510
    // Process delete files (must happen before _do_init_reader so expand col IDs are included)
511
3.28k
    RETURN_IF_ERROR(_init_row_filters());
512
513
    // Add expand column IDs for equality delete and remap expand column names
514
    // (matching master's behavior with __equality_delete_column__ prefix)
515
3.28k
    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
516
3.28k
    std::unordered_map<int, std::string> field_id_to_file_col_name;
517
25.5k
    for (uint64_t i = 0; i < orc_type_ptr->getSubtypeCount(); ++i) {
518
22.3k
        std::string col_name = orc_type_ptr->getFieldName(i);
519
22.3k
        const orc::Type* sub_type = orc_type_ptr->getSubtype(i);
520
22.3k
        if (sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
521
22.3k
            int fid = std::stoi(sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
522
22.3k
            field_id_to_file_col_name[fid] = col_name;
523
22.3k
        }
524
22.3k
    }
525
526
3.28k
    std::vector<std::string> new_expand_col_names;
527
3.44k
    for (size_t i = 0; i < _expand_col_names.size(); ++i) {
528
159
        const auto& old_name = _expand_col_names[i];
529
159
        int field_id = -1;
530
193
        for (auto& [fid, name] : _id_to_block_column_name) {
531
193
            if (name == old_name) {
532
159
                field_id = fid;
533
159
                break;
534
159
            }
535
193
        }
536
537
159
        std::string file_col_name = old_name;
538
159
        auto it = field_id_to_file_col_name.find(field_id);
539
159
        if (it != field_id_to_file_col_name.end()) {
540
159
            file_col_name = it->second;
541
159
        }
542
543
159
        std::string table_col_name = EQ_DELETE_PRE + file_col_name;
544
545
159
        if (field_id >= 0) {
546
159
            _id_to_block_column_name[field_id] = table_col_name;
547
159
        }
548
159
        if (i < _expand_columns.size()) {
549
159
            _expand_columns[i].name = table_col_name;
550
159
        }
551
159
        new_expand_col_names.push_back(table_col_name);
552
553
        // Add column IDs
554
159
        if (it != field_id_to_file_col_name.end()) {
555
292
            for (uint64_t j = 0; j < orc_type_ptr->getSubtypeCount(); ++j) {
556
292
                const orc::Type* sub_type = orc_type_ptr->getSubtype(j);
557
292
                if (orc_type_ptr->getFieldName(j) == file_col_name) {
558
159
                    ctx->column_ids.insert(sub_type->getColumnId());
559
159
                    break;
560
159
                }
561
292
            }
562
159
        }
563
564
159
        ctx->column_names.push_back(table_col_name);
565
159
        ctx->table_info_node->add_children(table_col_name, file_col_name,
566
159
                                           TableSchemaChangeHelper::ConstNode::get_instance());
567
159
    }
568
3.28k
    _expand_col_names = std::move(new_expand_col_names);
569
570
3.28k
    return Status::OK();
571
3.28k
}
572
573
// ============================================================================
574
// IcebergOrcReader: _create_column_ids
575
// ============================================================================
576
ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type,
577
3.27k
                                                    const TupleDescriptor* tuple_descriptor) {
578
3.27k
    std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map;
579
25.6k
    for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) {
580
22.3k
        auto orc_sub_type = orc_type->getSubtype(i);
581
22.3k
        if (!orc_sub_type) continue;
582
22.3k
        if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
583
0
            continue;
584
0
        }
585
22.3k
        int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
586
22.3k
        iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type;
587
22.3k
    }
588
589
3.27k
    std::set<uint64_t> column_ids;
590
3.27k
    std::set<uint64_t> filter_column_ids;
591
592
3.27k
    auto process_access_paths = [](const orc::Type* orc_field,
593
3.27k
                                   const std::vector<TColumnAccessPath>& access_paths,
594
7.43k
                                   std::set<uint64_t>& out_ids) {
595
7.43k
        process_nested_access_paths(
596
7.43k
                orc_field, access_paths, out_ids,
597
7.43k
                [](const orc::Type* type) { return type->getColumnId(); },
598
7.43k
                [](const orc::Type* type) { return type->getMaximumColumnId(); },
599
7.43k
                IcebergOrcNestedColumnUtils::extract_nested_column_ids);
600
7.43k
    };
601
602
16.0k
    for (const auto* slot : tuple_descriptor->slots()) {
603
16.0k
        auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id());
604
16.0k
        if (it == iceberg_id_to_orc_type_map.end()) {
605
1.26k
            continue;
606
1.26k
        }
607
14.7k
        const orc::Type* orc_field = it->second;
608
609
14.7k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
610
14.7k
             slot->col_type() != TYPE_MAP)) {
611
8.13k
            column_ids.insert(orc_field->getColumnId());
612
8.13k
            if (slot->is_predicate()) {
613
1.09k
                filter_column_ids.insert(orc_field->getColumnId());
614
1.09k
            }
615
8.13k
            continue;
616
8.13k
        }
617
618
6.66k
        const auto& all_access_paths = slot->all_access_paths();
619
6.66k
        process_access_paths(orc_field, all_access_paths, column_ids);
620
621
6.66k
        const auto& predicate_access_paths = slot->predicate_access_paths();
622
6.66k
        if (!predicate_access_paths.empty()) {
623
778
            process_access_paths(orc_field, predicate_access_paths, filter_column_ids);
624
778
        }
625
6.66k
    }
626
627
3.27k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
628
3.27k
}
629
630
// ============================================================================
631
// IcebergOrcReader: _read_position_delete_file
632
// ============================================================================
633
Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
634
484
                                                    DeleteFile* position_delete) {
635
484
    OrcReader orc_delete_reader(get_profile(), get_state(), get_scan_params(), *delete_range,
636
484
                                READ_DELETE_FILE_BATCH_SIZE, get_state()->timezone(), get_io_ctx(),
637
484
                                _meta_cache);
638
484
    OrcInitContext delete_ctx;
639
484
    delete_ctx.column_names = delete_file_col_names;
640
484
    delete_ctx.col_name_to_block_idx =
641
484
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX);
642
484
    RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_ctx));
643
644
484
    bool eof = false;
645
484
    DataTypePtr data_type_file_path {new DataTypeString};
646
484
    DataTypePtr data_type_pos {new DataTypeInt64};
647
1.45k
    while (!eof) {
648
968
        Block block = {{data_type_file_path, ICEBERG_FILE_PATH}, {data_type_pos, ICEBERG_ROW_POS}};
649
650
968
        size_t read_rows = 0;
651
968
        RETURN_IF_ERROR(orc_delete_reader.get_next_block(&block, &read_rows, &eof));
652
653
968
        _gen_position_delete_file_range(block, position_delete, read_rows, false);
654
968
    }
655
484
    return Status::OK();
656
484
}
657
658
} // namespace doris