Coverage Report

Created: 2026-04-22 10:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/iceberg_reader.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <glog/logging.h>
25
#include <parallel_hashmap/phmap.h>
26
#include <rapidjson/document.h>
27
28
#include <algorithm>
29
#include <cstring>
30
#include <functional>
31
#include <memory>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/consts.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/column/column_string.h"
41
#include "core/column/column_vector.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "core/data_type/define_primitive_type.h"
44
#include "core/data_type/primitive_type.h"
45
#include "core/string_ref.h"
46
#include "exprs/aggregate/aggregate_function.h"
47
#include "format/format_common.h"
48
#include "format/generic_reader.h"
49
#include "format/orc/vorc_reader.h"
50
#include "format/parquet/schema_desc.h"
51
#include "format/parquet/vparquet_column_chunk_reader.h"
52
#include "format/table/deletion_vector_reader.h"
53
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
54
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
55
#include "format/table/nested_column_access_helper.h"
56
#include "format/table/table_schema_change_helper.h"
57
#include "runtime/runtime_state.h"
58
#include "util/coding.h"
59
60
namespace cctz {
61
class time_zone;
62
} // namespace cctz
63
namespace doris {
64
class RowDescriptor;
65
class SlotDescriptor;
66
class TupleDescriptor;
67
68
namespace io {
69
struct IOContext;
70
} // namespace io
71
class VExprContext;
72
} // namespace doris
73
74
namespace doris {
75
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
76
77
bool IcebergTableReader::_is_fully_dictionary_encoded(
78
6
        const tparquet::ColumnMetaData& column_metadata) {
79
12
    const auto is_dictionary_encoding = [](tparquet::Encoding::type encoding) {
80
12
        return encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
81
12
               encoding == tparquet::Encoding::RLE_DICTIONARY;
82
12
    };
83
8
    const auto is_data_page = [](tparquet::PageType::type page_type) {
84
8
        return page_type == tparquet::PageType::DATA_PAGE ||
85
8
               page_type == tparquet::PageType::DATA_PAGE_V2;
86
8
    };
87
6
    const auto is_level_encoding = [](tparquet::Encoding::type encoding) {
88
2
        return encoding == tparquet::Encoding::RLE || encoding == tparquet::Encoding::BIT_PACKED;
89
2
    };
90
91
    // A column chunk may have a dictionary page but still contain plain-encoded data pages.
92
    // Only treat it as dictionary-coded when all data pages are dictionary encoded.
93
6
    if (column_metadata.__isset.encoding_stats) {
94
5
        bool has_data_page_stats = false;
95
8
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
96
8
            if (is_data_page(enc_stat.page_type) && enc_stat.count > 0) {
97
6
                has_data_page_stats = true;
98
6
                if (!is_dictionary_encoding(enc_stat.encoding)) {
99
2
                    return false;
100
2
                }
101
6
            }
102
8
        }
103
3
        if (has_data_page_stats) {
104
2
            return true;
105
2
        }
106
3
    }
107
108
2
    bool has_dict_encoding = false;
109
2
    bool has_nondict_encoding = false;
110
3
    for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
111
3
        if (is_dictionary_encoding(encoding)) {
112
1
            has_dict_encoding = true;
113
1
        }
114
115
3
        if (!is_dictionary_encoding(encoding) && !is_level_encoding(encoding)) {
116
2
            has_nondict_encoding = true;
117
2
            break;
118
2
        }
119
3
    }
120
2
    if (!has_dict_encoding || has_nondict_encoding) {
121
2
        return false;
122
2
    }
123
124
0
    return true;
125
2
}
126
127
// ============================================================================
128
// IcebergParquetReader: on_before_init_reader (Parquet-specific schema matching)
129
// ============================================================================
130
18.2k
Status IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
131
18.2k
    _column_descs = ctx->column_descs;
132
18.2k
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
133
18.2k
    _file_format = Fileformat::PARQUET;
134
135
    // Get file metadata schema first (available because _open_file() already ran)
136
18.2k
    const FieldDescriptor* field_desc = nullptr;
137
18.2k
    RETURN_IF_ERROR(this->get_file_metadata_schema(&field_desc));
138
18.2k
    DCHECK(field_desc != nullptr);
139
140
    // Build table_info_node by field_id or name matching.
141
    // This must happen BEFORE column classification so we can use children_column_exists
142
    // to check if a column exists in the file (by field ID, not name).
143
18.2k
    if (!get_scan_params().__isset.history_schema_info ||
144
18.2k
        get_scan_params().history_schema_info.empty()) [[unlikely]] {
145
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc,
146
1
                                                            ctx->table_info_node));
147
18.2k
    } else {
148
18.2k
        bool exist_field_id = true;
149
18.2k
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
150
18.2k
                get_scan_params().history_schema_info.front().root_field, *field_desc,
151
18.2k
                ctx->table_info_node, exist_field_id));
152
18.2k
        if (!exist_field_id) {
153
2
            RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc,
154
2
                                                                ctx->table_info_node));
155
2
        }
156
18.2k
    }
157
158
18.2k
    std::unordered_set<std::string> partition_col_names;
159
18.2k
    if (ctx->range->__isset.columns_from_path_keys) {
160
1.82k
        partition_col_names.insert(ctx->range->columns_from_path_keys.begin(),
161
1.82k
                                   ctx->range->columns_from_path_keys.end());
162
1.82k
    }
163
164
    // Single pass: classify columns, detect $row_id, handle partition fallback.
165
18.2k
    bool has_partition_from_path = false;
166
54.7k
    for (auto& desc : *ctx->column_descs) {
167
54.7k
        if (desc.category == ColumnCategory::SYNTHESIZED) {
168
324
            if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
169
78
                this->register_synthesized_column_handler(
170
78
                        BeConsts::ICEBERG_ROWID_COL, [this](Block* block, size_t rows) -> Status {
171
78
                            return _fill_iceberg_row_id(block, rows);
172
78
                        });
173
78
                continue;
174
246
            } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
175
246
                auto topn_row_id_column_iter = _create_topn_row_id_column_iterator();
176
246
                this->register_synthesized_column_handler(
177
246
                        desc.name,
178
246
                        [iter = std::move(topn_row_id_column_iter), this, &desc](
179
246
                                Block* block, size_t rows) -> Status {
180
238
                            return fill_topn_row_id(iter, desc.name, block, rows);
181
238
                        });
182
246
                continue;
183
246
            }
184
54.4k
        } else if (desc.category == ColumnCategory::REGULAR) {
185
            // Partition fallback: if column is a partition key and NOT in the file
186
            // (checked via field ID matching in table_info_node), read from path instead.
187
54.0k
            if (partition_col_names.contains(desc.name) &&
188
54.0k
                !ctx->table_info_node->children_column_exists(desc.name)) {
189
0
                if (config::enable_iceberg_partition_column_fallback) {
190
0
                    desc.category = ColumnCategory::PARTITION_KEY;
191
0
                    has_partition_from_path = true;
192
0
                    continue;
193
0
                }
194
0
            }
195
54.0k
            ctx->column_names.push_back(desc.name);
196
54.0k
        } else if (desc.category == ColumnCategory::GENERATED) {
197
438
            _init_row_lineage_columns();
198
438
            if (desc.name == ROW_LINEAGE_ROW_ID) {
199
236
                ctx->column_names.push_back(desc.name);
200
236
                this->register_generated_column_handler(
201
236
                        ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) -> Status {
202
236
                            return _fill_row_lineage_row_id(block, rows);
203
236
                        });
204
236
                continue;
205
236
            } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
206
202
                ctx->column_names.push_back(desc.name);
207
202
                this->register_generated_column_handler(
208
202
                        ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
209
202
                        [this](Block* block, size_t rows) -> Status {
210
202
                            return _fill_row_lineage_last_updated_sequence_number(block, rows);
211
202
                        });
212
202
                continue;
213
202
            }
214
438
        }
215
54.7k
    }
216
217
    // Set up partition value extraction if any partition columns need filling from path
218
18.2k
    if (has_partition_from_path) {
219
0
        RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
220
0
                                                  _fill_partition_values));
221
0
    }
222
223
18.2k
    _all_required_col_names = ctx->column_names;
224
225
    // Create column IDs from field descriptor
226
18.2k
    auto column_id_result = _create_column_ids(field_desc, ctx->tuple_descriptor);
227
18.2k
    ctx->column_ids = std::move(column_id_result.column_ids);
228
18.2k
    ctx->filter_column_ids = std::move(column_id_result.filter_column_ids);
229
230
    // Build field_id -> block_column_name mapping for equality delete filtering.
231
    // This was previously done in init_reader() column matching (pre-CRTP refactoring).
232
54.7k
    for (const auto* slot : ctx->tuple_descriptor->slots()) {
233
54.7k
        _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name());
234
54.7k
    }
235
236
    // Process delete files (must happen before _do_init_reader so expand col IDs are included)
237
18.2k
    RETURN_IF_ERROR(_init_row_filters());
238
239
    // Add expand column IDs for equality delete and remap expand column names
240
    // to match master's behavior:
241
    // - Use field_id to find the actual file column name in Parquet schema
242
    // - Prefix with __equality_delete_column__ to avoid name conflicts
243
    // - Correctly map table_col_name → file_col_name in table_info_node
244
18.2k
    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
245
18.2k
    std::unordered_map<int, std::string> field_id_to_file_col_name;
246
142k
    for (int i = 0; i < field_desc->size(); ++i) {
247
124k
        auto field_schema = field_desc->get_column(i);
248
124k
        if (field_schema) {
249
124k
            field_id_to_file_col_name[field_schema->field_id] = field_schema->name;
250
124k
        }
251
124k
    }
252
253
    // Rebuild _expand_col_names with proper file-column-based names
254
18.2k
    std::vector<std::string> new_expand_col_names;
255
18.5k
    for (size_t i = 0; i < _expand_col_names.size(); ++i) {
256
318
        const auto& old_name = _expand_col_names[i];
257
        // Find the field_id for this expand column
258
318
        int field_id = -1;
259
386
        for (auto& [fid, name] : _id_to_block_column_name) {
260
386
            if (name == old_name) {
261
318
                field_id = fid;
262
318
                break;
263
318
            }
264
386
        }
265
266
318
        std::string file_col_name = old_name;
267
318
        auto it = field_id_to_file_col_name.find(field_id);
268
318
        if (it != field_id_to_file_col_name.end()) {
269
318
            file_col_name = it->second;
270
318
        }
271
272
318
        std::string table_col_name = EQ_DELETE_PRE + file_col_name;
273
274
        // Update _id_to_block_column_name
275
318
        if (field_id >= 0) {
276
318
            _id_to_block_column_name[field_id] = table_col_name;
277
318
        }
278
279
        // Update _expand_columns name
280
318
        if (i < _expand_columns.size()) {
281
318
            _expand_columns[i].name = table_col_name;
282
318
        }
283
284
318
        new_expand_col_names.push_back(table_col_name);
285
286
        // Add column IDs
287
318
        if (it != field_id_to_file_col_name.end()) {
288
584
            for (int j = 0; j < field_desc->size(); ++j) {
289
584
                auto field_schema = field_desc->get_column(j);
290
584
                if (field_schema && field_schema->field_id == field_id) {
291
318
                    ctx->column_ids.insert(field_schema->get_column_id());
292
318
                    break;
293
318
                }
294
584
            }
295
318
        }
296
297
        // Register in table_info_node: table_col_name → file_col_name
298
318
        ctx->column_names.push_back(table_col_name);
299
318
        ctx->table_info_node->add_children(table_col_name, file_col_name,
300
318
                                           TableSchemaChangeHelper::ConstNode::get_instance());
301
318
    }
302
18.2k
    _expand_col_names = std::move(new_expand_col_names);
303
304
    // Enable group filtering for Iceberg
305
18.2k
    _filter_groups = true;
306
307
18.2k
    return Status::OK();
308
18.2k
}
309
310
// ============================================================================
311
// IcebergParquetReader: _create_column_ids
312
// ============================================================================
313
ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
314
18.1k
                                                        const TupleDescriptor* tuple_descriptor) {
315
18.1k
    auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc);
316
18.1k
    mutable_field_desc->assign_ids();
317
318
18.1k
    std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map;
319
142k
    for (int i = 0; i < field_desc->size(); ++i) {
320
124k
        auto field_schema = field_desc->get_column(i);
321
124k
        if (!field_schema) continue;
322
124k
        int iceberg_id = field_schema->field_id;
323
124k
        iceberg_id_to_field_schema_map[iceberg_id] = field_schema;
324
124k
    }
325
326
18.1k
    std::set<uint64_t> column_ids;
327
18.1k
    std::set<uint64_t> filter_column_ids;
328
329
18.1k
    auto process_access_paths = [](const FieldSchema* parquet_field,
330
18.1k
                                   const std::vector<TColumnAccessPath>& access_paths,
331
18.1k
                                   std::set<uint64_t>& out_ids) {
332
15.7k
        process_nested_access_paths(
333
15.7k
                parquet_field, access_paths, out_ids,
334
15.7k
                [](const FieldSchema* field) { return field->get_column_id(); },
335
15.7k
                [](const FieldSchema* field) { return field->get_max_column_id(); },
336
15.7k
                IcebergParquetNestedColumnUtils::extract_nested_column_ids);
337
15.7k
    };
338
339
54.7k
    for (const auto* slot : tuple_descriptor->slots()) {
340
54.7k
        auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id());
341
54.7k
        if (it == iceberg_id_to_field_schema_map.end()) {
342
3.57k
            continue;
343
3.57k
        }
344
51.2k
        auto field_schema = it->second;
345
346
51.2k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
347
51.2k
             slot->col_type() != TYPE_MAP)) {
348
36.9k
            column_ids.insert(field_schema->column_id);
349
36.9k
            if (slot->is_predicate()) {
350
5.95k
                filter_column_ids.insert(field_schema->column_id);
351
5.95k
            }
352
36.9k
            continue;
353
36.9k
        }
354
355
14.2k
        const auto& all_access_paths = slot->all_access_paths();
356
14.2k
        process_access_paths(field_schema, all_access_paths, column_ids);
357
358
14.2k
        const auto& predicate_access_paths = slot->predicate_access_paths();
359
14.2k
        if (!predicate_access_paths.empty()) {
360
1.58k
            process_access_paths(field_schema, predicate_access_paths, filter_column_ids);
361
1.58k
        }
362
14.2k
    }
363
18.1k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
364
18.1k
}
365
366
// ============================================================================
367
// IcebergParquetReader: _read_position_delete_file
368
// ============================================================================
369
Status IcebergParquetReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
370
790
                                                        DeleteFile* position_delete) {
371
790
    ParquetReader parquet_delete_reader(get_profile(), get_scan_params(), *delete_range,
372
790
                                        READ_DELETE_FILE_BATCH_SIZE, &get_state()->timezone_obj(),
373
790
                                        get_io_ctx(), get_state(), _meta_cache);
374
    // The delete file range has size=-1 (read whole file). We must disable
375
    // row group filtering before init; otherwise _do_init_reader returns EndOfFile
376
    // when _filter_groups && _range_size < 0.
377
790
    ParquetInitContext delete_ctx;
378
790
    delete_ctx.filter_groups = false;
379
790
    delete_ctx.column_names = delete_file_col_names;
380
790
    delete_ctx.col_name_to_block_idx =
381
790
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX);
382
790
    RETURN_IF_ERROR(parquet_delete_reader.init_reader(&delete_ctx));
383
384
790
    const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data();
385
790
    bool dictionary_coded = true;
386
790
    for (const auto& row_group : meta_data->row_groups) {
387
788
        const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX];
388
790
        if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
389
370
            dictionary_coded = false;
390
370
            break;
391
370
        }
392
788
    }
393
790
    DataTypePtr data_type_file_path {new DataTypeString};
394
790
    DataTypePtr data_type_pos {new DataTypeInt64};
395
790
    bool eof = false;
396
1.58k
    while (!eof) {
397
790
        Block block = {dictionary_coded
398
790
                               ? ColumnWithTypeAndName {ColumnDictI32::create(
399
420
                                                                FieldType::OLAP_FIELD_TYPE_VARCHAR),
400
420
                                                        data_type_file_path, ICEBERG_FILE_PATH}
401
790
                               : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH},
402
403
790
                       {data_type_pos, ICEBERG_ROW_POS}};
404
790
        size_t read_rows = 0;
405
790
        RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof));
406
407
790
        if (read_rows <= 0) {
408
0
            break;
409
0
        }
410
790
        _gen_position_delete_file_range(block, position_delete, read_rows, dictionary_coded);
411
790
    }
412
790
    return Status::OK();
413
790
};
414
415
// ============================================================================
416
// IcebergOrcReader: on_before_init_reader (ORC-specific schema matching)
417
// ============================================================================
418
6.48k
Status IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
419
6.48k
    _column_descs = ctx->column_descs;
420
6.48k
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
421
6.48k
    _file_format = Fileformat::ORC;
422
423
    // Get ORC file type first (available because _create_file_reader() already ran)
424
6.48k
    const orc::Type* orc_type_ptr = nullptr;
425
6.48k
    RETURN_IF_ERROR(this->get_file_type(&orc_type_ptr));
426
427
    // Build table_info_node by field_id or name matching.
428
    // This must happen BEFORE column classification so we can use children_column_exists
429
    // to check if a column exists in the file (by field ID, not name).
430
6.48k
    if (!get_scan_params().__isset.history_schema_info ||
431
6.48k
        get_scan_params().history_schema_info.empty()) [[unlikely]] {
432
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr,
433
1
                                                        ctx->table_info_node));
434
6.48k
    } else {
435
6.48k
        bool exist_field_id = true;
436
6.48k
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
437
6.48k
                get_scan_params().history_schema_info.front().root_field, orc_type_ptr,
438
6.48k
                ICEBERG_ORC_ATTRIBUTE, ctx->table_info_node, exist_field_id));
439
6.48k
        if (!exist_field_id) {
440
0
            RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr,
441
0
                                                            ctx->table_info_node));
442
0
        }
443
6.48k
    }
444
445
6.48k
    std::unordered_set<std::string> partition_col_names;
446
6.48k
    if (ctx->range->__isset.columns_from_path_keys) {
447
286
        partition_col_names.insert(ctx->range->columns_from_path_keys.begin(),
448
286
                                   ctx->range->columns_from_path_keys.end());
449
286
    }
450
451
    // Single pass: classify columns, detect $row_id, handle partition fallback.
452
6.48k
    bool has_partition_from_path = false;
453
31.8k
    for (auto& desc : *ctx->column_descs) {
454
31.8k
        if (desc.category == ColumnCategory::SYNTHESIZED) {
455
308
            if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
456
78
                this->register_synthesized_column_handler(
457
78
                        BeConsts::ICEBERG_ROWID_COL, [this](Block* block, size_t rows) -> Status {
458
74
                            return _fill_iceberg_row_id(block, rows);
459
74
                        });
460
78
                continue;
461
230
            } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
462
230
                auto topn_row_id_column_iter = _create_topn_row_id_column_iterator();
463
230
                this->register_synthesized_column_handler(
464
230
                        desc.name,
465
230
                        [iter = std::move(topn_row_id_column_iter), this, &desc](
466
230
                                Block* block, size_t rows) -> Status {
467
184
                            return fill_topn_row_id(iter, desc.name, block, rows);
468
184
                        });
469
230
                continue;
470
230
            }
471
31.5k
        } else if (desc.category == ColumnCategory::REGULAR) {
472
            // Partition fallback: if column is a partition key and NOT in the file
473
            // (checked via field ID matching in table_info_node), read from path instead.
474
31.1k
            if (partition_col_names.contains(desc.name) &&
475
31.1k
                !ctx->table_info_node->children_column_exists(desc.name)) {
476
0
                if (config::enable_iceberg_partition_column_fallback) {
477
0
                    desc.category = ColumnCategory::PARTITION_KEY;
478
0
                    has_partition_from_path = true;
479
0
                    continue;
480
0
                }
481
0
            }
482
31.1k
            ctx->column_names.push_back(desc.name);
483
31.1k
        } else if (desc.category == ColumnCategory::GENERATED) {
484
430
            _init_row_lineage_columns();
485
430
            if (desc.name == ROW_LINEAGE_ROW_ID) {
486
234
                ctx->column_names.push_back(desc.name);
487
234
                this->register_generated_column_handler(
488
236
                        ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) -> Status {
489
236
                            return _fill_row_lineage_row_id(block, rows);
490
236
                        });
491
234
                continue;
492
234
            } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
493
202
                ctx->column_names.push_back(desc.name);
494
202
                this->register_generated_column_handler(
495
202
                        ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
496
202
                        [this](Block* block, size_t rows) -> Status {
497
202
                            return _fill_row_lineage_last_updated_sequence_number(block, rows);
498
202
                        });
499
202
                continue;
500
202
            }
501
430
        }
502
31.8k
    }
503
504
6.48k
    if (has_partition_from_path) {
505
0
        RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
506
0
                                                  _fill_partition_values));
507
0
    }
508
509
6.48k
    _all_required_col_names = ctx->column_names;
510
511
    // Create column IDs from ORC type
512
6.48k
    auto column_id_result = _create_column_ids(orc_type_ptr, ctx->tuple_descriptor);
513
6.48k
    ctx->column_ids = std::move(column_id_result.column_ids);
514
6.48k
    ctx->filter_column_ids = std::move(column_id_result.filter_column_ids);
515
516
    // Build field_id -> block_column_name mapping for equality delete filtering.
517
31.8k
    for (const auto* slot : ctx->tuple_descriptor->slots()) {
518
31.8k
        _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name());
519
31.8k
    }
520
521
    // Process delete files (must happen before _do_init_reader so expand col IDs are included)
522
6.48k
    RETURN_IF_ERROR(_init_row_filters());
523
524
    // Add expand column IDs for equality delete and remap expand column names
525
    // (matching master's behavior with __equality_delete_column__ prefix)
526
6.48k
    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
527
6.48k
    std::unordered_map<int, std::string> field_id_to_file_col_name;
528
50.8k
    for (uint64_t i = 0; i < orc_type_ptr->getSubtypeCount(); ++i) {
529
44.4k
        std::string col_name = orc_type_ptr->getFieldName(i);
530
44.4k
        const orc::Type* sub_type = orc_type_ptr->getSubtype(i);
531
44.4k
        if (sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
532
44.4k
            int fid = std::stoi(sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
533
44.4k
            field_id_to_file_col_name[fid] = col_name;
534
44.4k
        }
535
44.4k
    }
536
537
6.48k
    std::vector<std::string> new_expand_col_names;
538
6.80k
    for (size_t i = 0; i < _expand_col_names.size(); ++i) {
539
318
        const auto& old_name = _expand_col_names[i];
540
318
        int field_id = -1;
541
386
        for (auto& [fid, name] : _id_to_block_column_name) {
542
386
            if (name == old_name) {
543
318
                field_id = fid;
544
318
                break;
545
318
            }
546
386
        }
547
548
318
        std::string file_col_name = old_name;
549
318
        auto it = field_id_to_file_col_name.find(field_id);
550
318
        if (it != field_id_to_file_col_name.end()) {
551
318
            file_col_name = it->second;
552
318
        }
553
554
318
        std::string table_col_name = EQ_DELETE_PRE + file_col_name;
555
556
318
        if (field_id >= 0) {
557
318
            _id_to_block_column_name[field_id] = table_col_name;
558
318
        }
559
318
        if (i < _expand_columns.size()) {
560
318
            _expand_columns[i].name = table_col_name;
561
318
        }
562
318
        new_expand_col_names.push_back(table_col_name);
563
564
        // Add column IDs
565
318
        if (it != field_id_to_file_col_name.end()) {
566
584
            for (uint64_t j = 0; j < orc_type_ptr->getSubtypeCount(); ++j) {
567
584
                const orc::Type* sub_type = orc_type_ptr->getSubtype(j);
568
584
                if (orc_type_ptr->getFieldName(j) == file_col_name) {
569
318
                    ctx->column_ids.insert(sub_type->getColumnId());
570
318
                    break;
571
318
                }
572
584
            }
573
318
        }
574
575
318
        ctx->column_names.push_back(table_col_name);
576
318
        ctx->table_info_node->add_children(table_col_name, file_col_name,
577
318
                                           TableSchemaChangeHelper::ConstNode::get_instance());
578
318
    }
579
6.48k
    _expand_col_names = std::move(new_expand_col_names);
580
581
6.48k
    return Status::OK();
582
6.48k
}
583
584
// ============================================================================
585
// IcebergOrcReader: _create_column_ids
586
// ============================================================================
587
ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type,
588
6.47k
                                                    const TupleDescriptor* tuple_descriptor) {
589
6.47k
    std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map;
590
50.9k
    for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) {
591
44.4k
        auto orc_sub_type = orc_type->getSubtype(i);
592
44.4k
        if (!orc_sub_type) continue;
593
44.4k
        if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
594
0
            continue;
595
0
        }
596
44.4k
        int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
597
44.4k
        iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type;
598
44.4k
    }
599
600
6.47k
    std::set<uint64_t> column_ids;
601
6.47k
    std::set<uint64_t> filter_column_ids;
602
603
6.47k
    auto process_access_paths = [](const orc::Type* orc_field,
604
6.47k
                                   const std::vector<TColumnAccessPath>& access_paths,
605
14.8k
                                   std::set<uint64_t>& out_ids) {
606
14.8k
        process_nested_access_paths(
607
14.8k
                orc_field, access_paths, out_ids,
608
14.8k
                [](const orc::Type* type) { return type->getColumnId(); },
609
14.8k
                [](const orc::Type* type) { return type->getMaximumColumnId(); },
610
14.8k
                IcebergOrcNestedColumnUtils::extract_nested_column_ids);
611
14.8k
    };
612
613
31.8k
    for (const auto* slot : tuple_descriptor->slots()) {
614
31.8k
        auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id());
615
31.8k
        if (it == iceberg_id_to_orc_type_map.end()) {
616
2.48k
            continue;
617
2.48k
        }
618
29.4k
        const orc::Type* orc_field = it->second;
619
620
29.4k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
621
29.4k
             slot->col_type() != TYPE_MAP)) {
622
16.0k
            column_ids.insert(orc_field->getColumnId());
623
16.0k
            if (slot->is_predicate()) {
624
2.28k
                filter_column_ids.insert(orc_field->getColumnId());
625
2.28k
            }
626
16.0k
            continue;
627
16.0k
        }
628
629
13.3k
        const auto& all_access_paths = slot->all_access_paths();
630
13.3k
        process_access_paths(orc_field, all_access_paths, column_ids);
631
632
13.3k
        const auto& predicate_access_paths = slot->predicate_access_paths();
633
13.3k
        if (!predicate_access_paths.empty()) {
634
1.55k
            process_access_paths(orc_field, predicate_access_paths, filter_column_ids);
635
1.55k
        }
636
13.3k
    }
637
638
6.47k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
639
6.47k
}
640
641
// ============================================================================
642
// IcebergOrcReader: _read_position_delete_file
643
// ============================================================================
644
Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
645
968
                                                    DeleteFile* position_delete) {
646
968
    OrcReader orc_delete_reader(get_profile(), get_state(), get_scan_params(), *delete_range,
647
968
                                READ_DELETE_FILE_BATCH_SIZE, get_state()->timezone(), get_io_ctx(),
648
968
                                _meta_cache);
649
968
    OrcInitContext delete_ctx;
650
968
    delete_ctx.column_names = delete_file_col_names;
651
968
    delete_ctx.col_name_to_block_idx =
652
968
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX);
653
968
    RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_ctx));
654
655
968
    bool eof = false;
656
968
    DataTypePtr data_type_file_path {new DataTypeString};
657
968
    DataTypePtr data_type_pos {new DataTypeInt64};
658
2.90k
    while (!eof) {
659
1.93k
        Block block = {{data_type_file_path, ICEBERG_FILE_PATH}, {data_type_pos, ICEBERG_ROW_POS}};
660
661
1.93k
        size_t read_rows = 0;
662
1.93k
        RETURN_IF_ERROR(orc_delete_reader.get_next_block(&block, &read_rows, &eof));
663
664
1.93k
        _gen_position_delete_file_range(block, position_delete, read_rows, false);
665
1.93k
    }
666
968
    return Status::OK();
667
968
}
668
669
} // namespace doris