Coverage Report

Created: 2026-04-07 18:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/iceberg_reader.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <glog/logging.h>
25
#include <parallel_hashmap/phmap.h>
26
#include <rapidjson/document.h>
27
28
#include <algorithm>
29
#include <cstring>
30
#include <functional>
31
#include <memory>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/consts.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/column/column_string.h"
41
#include "core/column/column_vector.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "core/data_type/define_primitive_type.h"
44
#include "core/data_type/primitive_type.h"
45
#include "core/string_ref.h"
46
#include "exprs/aggregate/aggregate_function.h"
47
#include "format/format_common.h"
48
#include "format/generic_reader.h"
49
#include "format/orc/vorc_reader.h"
50
#include "format/parquet/schema_desc.h"
51
#include "format/parquet/vparquet_column_chunk_reader.h"
52
#include "format/table/deletion_vector_reader.h"
53
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
54
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
55
#include "format/table/nested_column_access_helper.h"
56
#include "format/table/table_schema_change_helper.h"
57
#include "runtime/runtime_state.h"
58
#include "util/coding.h"
59
60
namespace cctz {
61
#include "common/compile_check_begin.h"
62
class time_zone;
63
} // namespace cctz
64
namespace doris {
65
class RowDescriptor;
66
class SlotDescriptor;
67
class TupleDescriptor;
68
69
namespace io {
70
struct IOContext;
71
} // namespace io
72
class VExprContext;
73
} // namespace doris
74
75
namespace doris {
76
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
77
78
bool IcebergTableReader::_is_fully_dictionary_encoded(
79
6
        const tparquet::ColumnMetaData& column_metadata) {
80
12
    const auto is_dictionary_encoding = [](tparquet::Encoding::type encoding) {
81
12
        return encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
82
12
               encoding == tparquet::Encoding::RLE_DICTIONARY;
83
12
    };
84
8
    const auto is_data_page = [](tparquet::PageType::type page_type) {
85
8
        return page_type == tparquet::PageType::DATA_PAGE ||
86
8
               page_type == tparquet::PageType::DATA_PAGE_V2;
87
8
    };
88
6
    const auto is_level_encoding = [](tparquet::Encoding::type encoding) {
89
2
        return encoding == tparquet::Encoding::RLE || encoding == tparquet::Encoding::BIT_PACKED;
90
2
    };
91
92
    // A column chunk may have a dictionary page but still contain plain-encoded data pages.
93
    // Only treat it as dictionary-coded when all data pages are dictionary encoded.
94
6
    if (column_metadata.__isset.encoding_stats) {
95
5
        bool has_data_page_stats = false;
96
8
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
97
8
            if (is_data_page(enc_stat.page_type) && enc_stat.count > 0) {
98
6
                has_data_page_stats = true;
99
6
                if (!is_dictionary_encoding(enc_stat.encoding)) {
100
2
                    return false;
101
2
                }
102
6
            }
103
8
        }
104
3
        if (has_data_page_stats) {
105
2
            return true;
106
2
        }
107
3
    }
108
109
2
    bool has_dict_encoding = false;
110
2
    bool has_nondict_encoding = false;
111
3
    for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
112
3
        if (is_dictionary_encoding(encoding)) {
113
1
            has_dict_encoding = true;
114
1
        }
115
116
3
        if (!is_dictionary_encoding(encoding) && !is_level_encoding(encoding)) {
117
2
            has_nondict_encoding = true;
118
2
            break;
119
2
        }
120
3
    }
121
2
    if (!has_dict_encoding || has_nondict_encoding) {
122
2
        return false;
123
2
    }
124
125
0
    return true;
126
2
}
127
128
// ============================================================================
129
// IcebergParquetReader: on_before_init_reader (Parquet-specific schema matching)
130
// ============================================================================
131
1
Status IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
132
1
    _column_descs = ctx->column_descs;
133
1
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
134
1
    _file_format = Fileformat::PARQUET;
135
136
    // Get file metadata schema first (available because _open_file() already ran)
137
1
    const FieldDescriptor* field_desc = nullptr;
138
1
    RETURN_IF_ERROR(this->get_file_metadata_schema(&field_desc));
139
1
    DCHECK(field_desc != nullptr);
140
141
    // Build table_info_node by field_id or name matching.
142
    // This must happen BEFORE column classification so we can use children_column_exists
143
    // to check if a column exists in the file (by field ID, not name).
144
1
    if (!get_scan_params().__isset.history_schema_info ||
145
1
        get_scan_params().history_schema_info.empty()) [[unlikely]] {
146
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc,
147
1
                                                            ctx->table_info_node));
148
1
    } else {
149
0
        bool exist_field_id = true;
150
0
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
151
0
                get_scan_params().history_schema_info.front().root_field, *field_desc,
152
0
                ctx->table_info_node, exist_field_id));
153
0
        if (!exist_field_id) {
154
0
            RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(ctx->tuple_descriptor, *field_desc,
155
0
                                                                ctx->table_info_node));
156
0
        }
157
0
    }
158
159
1
    std::unordered_set<std::string> partition_col_names;
160
1
    if (ctx->range->__isset.columns_from_path_keys) {
161
0
        partition_col_names.insert(ctx->range->columns_from_path_keys.begin(),
162
0
                                   ctx->range->columns_from_path_keys.end());
163
0
    }
164
165
    // Single pass: classify columns, detect $row_id, handle partition fallback.
166
1
    bool has_partition_from_path = false;
167
2
    for (auto& desc : *ctx->column_descs) {
168
2
        if (desc.category == ColumnCategory::SYNTHESIZED &&
169
2
            desc.name == BeConsts::ICEBERG_ROWID_COL) {
170
0
            _need_row_id_column = true;
171
0
            this->register_synthesized_column_handler(BeConsts::ICEBERG_ROWID_COL,
172
0
                                                      [this](Block* block, size_t rows) -> Status {
173
0
                                                          return _fill_iceberg_row_id(block, rows);
174
0
                                                      });
175
0
            continue;
176
0
        }
177
2
        if (desc.category == ColumnCategory::REGULAR) {
178
            // Partition fallback: if column is a partition key and NOT in the file
179
            // (checked via field ID matching in table_info_node), read from path instead.
180
2
            if (partition_col_names.contains(desc.name) &&
181
2
                !ctx->table_info_node->children_column_exists(desc.name)) {
182
0
                if (config::enable_iceberg_partition_column_fallback) {
183
0
                    desc.category = ColumnCategory::PARTITION_KEY;
184
0
                    has_partition_from_path = true;
185
0
                    continue;
186
0
                }
187
0
            }
188
2
            ctx->column_names.push_back(desc.name);
189
2
        } else if (desc.category == ColumnCategory::GENERATED) {
190
0
            ctx->column_names.push_back(desc.name);
191
0
        }
192
2
    }
193
194
    // Set up partition value extraction if any partition columns need filling from path
195
1
    if (has_partition_from_path) {
196
0
        RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
197
0
                                                  _fill_partition_values));
198
0
    }
199
200
1
    _all_required_col_names = ctx->column_names;
201
202
    // Create column IDs from field descriptor
203
1
    auto column_id_result = _create_column_ids(field_desc, ctx->tuple_descriptor);
204
1
    ctx->column_ids = std::move(column_id_result.column_ids);
205
1
    ctx->filter_column_ids = std::move(column_id_result.filter_column_ids);
206
207
    // Build field_id -> block_column_name mapping for equality delete filtering.
208
    // This was previously done in init_reader() column matching (pre-CRTP refactoring).
209
2
    for (const auto* slot : ctx->tuple_descriptor->slots()) {
210
2
        _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name());
211
2
    }
212
213
    // Process delete files (must happen before _do_init_reader so expand col IDs are included)
214
1
    RETURN_IF_ERROR(_init_row_filters());
215
216
    // Add expand column IDs for equality delete and remap expand column names
217
    // to match master's behavior:
218
    // - Use field_id to find the actual file column name in Parquet schema
219
    // - Prefix with __equality_delete_column__ to avoid name conflicts
220
    // - Correctly map table_col_name → file_col_name in table_info_node
221
1
    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
222
1
    std::unordered_map<int, std::string> field_id_to_file_col_name;
223
4
    for (int i = 0; i < field_desc->size(); ++i) {
224
3
        auto field_schema = field_desc->get_column(i);
225
3
        if (field_schema) {
226
3
            field_id_to_file_col_name[field_schema->field_id] = field_schema->name;
227
3
        }
228
3
    }
229
230
    // Rebuild _expand_col_names with proper file-column-based names
231
1
    std::vector<std::string> new_expand_col_names;
232
1
    for (size_t i = 0; i < _expand_col_names.size(); ++i) {
233
0
        const auto& old_name = _expand_col_names[i];
234
        // Find the field_id for this expand column
235
0
        int field_id = -1;
236
0
        for (auto& [fid, name] : _id_to_block_column_name) {
237
0
            if (name == old_name) {
238
0
                field_id = fid;
239
0
                break;
240
0
            }
241
0
        }
242
243
0
        std::string file_col_name = old_name;
244
0
        auto it = field_id_to_file_col_name.find(field_id);
245
0
        if (it != field_id_to_file_col_name.end()) {
246
0
            file_col_name = it->second;
247
0
        }
248
249
0
        std::string table_col_name = EQ_DELETE_PRE + file_col_name;
250
251
        // Update _id_to_block_column_name
252
0
        if (field_id >= 0) {
253
0
            _id_to_block_column_name[field_id] = table_col_name;
254
0
        }
255
256
        // Update _expand_columns name
257
0
        if (i < _expand_columns.size()) {
258
0
            _expand_columns[i].name = table_col_name;
259
0
        }
260
261
0
        new_expand_col_names.push_back(table_col_name);
262
263
        // Add column IDs
264
0
        if (it != field_id_to_file_col_name.end()) {
265
0
            for (int j = 0; j < field_desc->size(); ++j) {
266
0
                auto field_schema = field_desc->get_column(j);
267
0
                if (field_schema && field_schema->field_id == field_id) {
268
0
                    ctx->column_ids.insert(field_schema->get_column_id());
269
0
                    break;
270
0
                }
271
0
            }
272
0
        }
273
274
        // Register in table_info_node: table_col_name → file_col_name
275
0
        ctx->column_names.push_back(table_col_name);
276
0
        ctx->table_info_node->add_children(table_col_name, file_col_name,
277
0
                                           TableSchemaChangeHelper::ConstNode::get_instance());
278
0
    }
279
1
    _expand_col_names = std::move(new_expand_col_names);
280
281
    // Debug logging
282
1
    for (const auto& name : _expand_col_names) {
283
0
        LOG(INFO) << "[EqDeleteDebug] final expand col: " << name;
284
0
    }
285
2
    for (auto& [fid, name] : _id_to_block_column_name) {
286
2
        LOG(INFO) << "[EqDeleteDebug] final _id_to_block_column_name[" << fid << "] = " << name;
287
2
    }
288
289
    // Enable group filtering for Iceberg
290
1
    _filter_groups = true;
291
292
1
    return Status::OK();
293
1
}
294
295
// ============================================================================
296
// IcebergParquetReader: _create_column_ids
297
// ============================================================================
298
ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
299
7
                                                        const TupleDescriptor* tuple_descriptor) {
300
7
    auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc);
301
7
    mutable_field_desc->assign_ids();
302
303
7
    std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map;
304
58
    for (int i = 0; i < field_desc->size(); ++i) {
305
51
        auto field_schema = field_desc->get_column(i);
306
51
        if (!field_schema) continue;
307
51
        int iceberg_id = field_schema->field_id;
308
51
        iceberg_id_to_field_schema_map[iceberg_id] = field_schema;
309
51
    }
310
311
7
    std::set<uint64_t> column_ids;
312
7
    std::set<uint64_t> filter_column_ids;
313
314
7
    auto process_access_paths = [](const FieldSchema* parquet_field,
315
7
                                   const std::vector<TColumnAccessPath>& access_paths,
316
14
                                   std::set<uint64_t>& out_ids) {
317
14
        process_nested_access_paths(
318
14
                parquet_field, access_paths, out_ids,
319
14
                [](const FieldSchema* field) { return field->get_column_id(); },
320
14
                [](const FieldSchema* field) { return field->get_max_column_id(); },
321
14
                IcebergParquetNestedColumnUtils::extract_nested_column_ids);
322
14
    };
323
324
15
    for (const auto* slot : tuple_descriptor->slots()) {
325
15
        auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id());
326
15
        if (it == iceberg_id_to_field_schema_map.end()) {
327
0
            continue;
328
0
        }
329
15
        auto field_schema = it->second;
330
331
15
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
332
15
             slot->col_type() != TYPE_MAP)) {
333
7
            column_ids.insert(field_schema->column_id);
334
7
            if (slot->is_predicate()) {
335
0
                filter_column_ids.insert(field_schema->column_id);
336
0
            }
337
7
            continue;
338
7
        }
339
340
8
        const auto& all_access_paths = slot->all_access_paths();
341
8
        process_access_paths(field_schema, all_access_paths, column_ids);
342
343
8
        const auto& predicate_access_paths = slot->predicate_access_paths();
344
8
        if (!predicate_access_paths.empty()) {
345
6
            process_access_paths(field_schema, predicate_access_paths, filter_column_ids);
346
6
        }
347
8
    }
348
7
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
349
7
}
350
351
// ============================================================================
352
// IcebergParquetReader: _read_position_delete_file
353
// ============================================================================
354
Status IcebergParquetReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
355
0
                                                        DeleteFile* position_delete) {
356
0
    ParquetReader parquet_delete_reader(get_profile(), get_scan_params(), *delete_range,
357
0
                                        READ_DELETE_FILE_BATCH_SIZE, &get_state()->timezone_obj(),
358
0
                                        get_io_ctx(), get_state(), _meta_cache);
359
    // The delete file range has size=-1 (read whole file). We must disable
360
    // row group filtering before init; otherwise _do_init_reader returns EndOfFile
361
    // when _filter_groups && _range_size < 0.
362
0
    ParquetInitContext delete_ctx;
363
0
    delete_ctx.filter_groups = false;
364
0
    delete_ctx.column_names = delete_file_col_names;
365
0
    delete_ctx.col_name_to_block_idx =
366
0
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX);
367
0
    RETURN_IF_ERROR(parquet_delete_reader.init_reader(&delete_ctx));
368
369
0
    const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data();
370
0
    bool dictionary_coded = true;
371
0
    for (const auto& row_group : meta_data->row_groups) {
372
0
        const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX];
373
0
        if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
374
0
            dictionary_coded = false;
375
0
            break;
376
0
        }
377
0
    }
378
0
    DataTypePtr data_type_file_path {new DataTypeString};
379
0
    DataTypePtr data_type_pos {new DataTypeInt64};
380
0
    bool eof = false;
381
0
    while (!eof) {
382
0
        Block block = {dictionary_coded
383
0
                               ? ColumnWithTypeAndName {ColumnDictI32::create(
384
0
                                                                FieldType::OLAP_FIELD_TYPE_VARCHAR),
385
0
                                                        data_type_file_path, ICEBERG_FILE_PATH}
386
0
                               : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH},
387
388
0
                       {data_type_pos, ICEBERG_ROW_POS}};
389
0
        size_t read_rows = 0;
390
0
        RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof));
391
392
0
        if (read_rows <= 0) {
393
0
            break;
394
0
        }
395
0
        _gen_position_delete_file_range(block, position_delete, read_rows, dictionary_coded);
396
0
    }
397
0
    return Status::OK();
398
0
};
399
400
// ============================================================================
401
// IcebergOrcReader: on_before_init_reader (ORC-specific schema matching)
402
// ============================================================================
403
1
Status IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
404
1
    _column_descs = ctx->column_descs;
405
1
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
406
1
    _file_format = Fileformat::ORC;
407
408
    // Get ORC file type first (available because _create_file_reader() already ran)
409
1
    const orc::Type* orc_type_ptr = nullptr;
410
1
    RETURN_IF_ERROR(this->get_file_type(&orc_type_ptr));
411
412
    // Build table_info_node by field_id or name matching.
413
    // This must happen BEFORE column classification so we can use children_column_exists
414
    // to check if a column exists in the file (by field ID, not name).
415
1
    if (!get_scan_params().__isset.history_schema_info ||
416
1
        get_scan_params().history_schema_info.empty()) [[unlikely]] {
417
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr,
418
1
                                                        ctx->table_info_node));
419
1
    } else {
420
0
        bool exist_field_id = true;
421
0
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
422
0
                get_scan_params().history_schema_info.front().root_field, orc_type_ptr,
423
0
                ICEBERG_ORC_ATTRIBUTE, ctx->table_info_node, exist_field_id));
424
0
        if (!exist_field_id) {
425
0
            RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(ctx->tuple_descriptor, orc_type_ptr,
426
0
                                                            ctx->table_info_node));
427
0
        }
428
0
    }
429
430
1
    std::unordered_set<std::string> partition_col_names;
431
1
    if (ctx->range->__isset.columns_from_path_keys) {
432
0
        partition_col_names.insert(ctx->range->columns_from_path_keys.begin(),
433
0
                                   ctx->range->columns_from_path_keys.end());
434
0
    }
435
436
    // Single pass: classify columns, detect $row_id, handle partition fallback.
437
1
    bool has_partition_from_path = false;
438
2
    for (auto& desc : *ctx->column_descs) {
439
2
        if (desc.category == ColumnCategory::SYNTHESIZED &&
440
2
            desc.name == BeConsts::ICEBERG_ROWID_COL) {
441
0
            _need_row_id_column = true;
442
0
            this->register_synthesized_column_handler(BeConsts::ICEBERG_ROWID_COL,
443
0
                                                      [this](Block* block, size_t rows) -> Status {
444
0
                                                          return _fill_iceberg_row_id(block, rows);
445
0
                                                      });
446
0
            continue;
447
0
        }
448
2
        if (desc.category == ColumnCategory::REGULAR) {
449
            // Partition fallback: if column is a partition key and NOT in the file
450
            // (checked via field ID matching in table_info_node), read from path instead.
451
2
            if (partition_col_names.contains(desc.name) &&
452
2
                !ctx->table_info_node->children_column_exists(desc.name)) {
453
0
                if (config::enable_iceberg_partition_column_fallback) {
454
0
                    desc.category = ColumnCategory::PARTITION_KEY;
455
0
                    has_partition_from_path = true;
456
0
                    continue;
457
0
                }
458
0
            }
459
2
            ctx->column_names.push_back(desc.name);
460
2
        } else if (desc.category == ColumnCategory::GENERATED) {
461
0
            ctx->column_names.push_back(desc.name);
462
0
        }
463
2
    }
464
465
1
    if (has_partition_from_path) {
466
0
        RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
467
0
                                                  _fill_partition_values));
468
0
    }
469
470
1
    _all_required_col_names = ctx->column_names;
471
472
    // Create column IDs from ORC type
473
1
    auto column_id_result = _create_column_ids(orc_type_ptr, ctx->tuple_descriptor);
474
1
    ctx->column_ids = std::move(column_id_result.column_ids);
475
1
    ctx->filter_column_ids = std::move(column_id_result.filter_column_ids);
476
477
    // Build field_id -> block_column_name mapping for equality delete filtering.
478
2
    for (const auto* slot : ctx->tuple_descriptor->slots()) {
479
2
        _id_to_block_column_name.emplace(slot->col_unique_id(), slot->col_name());
480
2
    }
481
482
    // Process delete files (must happen before _do_init_reader so expand col IDs are included)
483
1
    RETURN_IF_ERROR(_init_row_filters());
484
485
    // Add expand column IDs for equality delete and remap expand column names
486
    // (matching master's behavior with __equality_delete_column__ prefix)
487
1
    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
488
1
    std::unordered_map<int, std::string> field_id_to_file_col_name;
489
4
    for (uint64_t i = 0; i < orc_type_ptr->getSubtypeCount(); ++i) {
490
3
        std::string col_name = orc_type_ptr->getFieldName(i);
491
3
        const orc::Type* sub_type = orc_type_ptr->getSubtype(i);
492
3
        if (sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
493
3
            int fid = std::stoi(sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
494
3
            field_id_to_file_col_name[fid] = col_name;
495
3
        }
496
3
    }
497
498
1
    std::vector<std::string> new_expand_col_names;
499
1
    for (size_t i = 0; i < _expand_col_names.size(); ++i) {
500
0
        const auto& old_name = _expand_col_names[i];
501
0
        int field_id = -1;
502
0
        for (auto& [fid, name] : _id_to_block_column_name) {
503
0
            if (name == old_name) {
504
0
                field_id = fid;
505
0
                break;
506
0
            }
507
0
        }
508
509
0
        std::string file_col_name = old_name;
510
0
        auto it = field_id_to_file_col_name.find(field_id);
511
0
        if (it != field_id_to_file_col_name.end()) {
512
0
            file_col_name = it->second;
513
0
        }
514
515
0
        std::string table_col_name = EQ_DELETE_PRE + file_col_name;
516
517
0
        if (field_id >= 0) {
518
0
            _id_to_block_column_name[field_id] = table_col_name;
519
0
        }
520
0
        if (i < _expand_columns.size()) {
521
0
            _expand_columns[i].name = table_col_name;
522
0
        }
523
0
        new_expand_col_names.push_back(table_col_name);
524
525
        // Add column IDs
526
0
        if (it != field_id_to_file_col_name.end()) {
527
0
            for (uint64_t j = 0; j < orc_type_ptr->getSubtypeCount(); ++j) {
528
0
                const orc::Type* sub_type = orc_type_ptr->getSubtype(j);
529
0
                if (orc_type_ptr->getFieldName(j) == file_col_name) {
530
0
                    ctx->column_ids.insert(sub_type->getColumnId());
531
0
                    break;
532
0
                }
533
0
            }
534
0
        }
535
536
0
        ctx->column_names.push_back(table_col_name);
537
0
        ctx->table_info_node->add_children(table_col_name, file_col_name,
538
0
                                           TableSchemaChangeHelper::ConstNode::get_instance());
539
0
    }
540
1
    _expand_col_names = std::move(new_expand_col_names);
541
542
1
    return Status::OK();
543
1
}
544
545
// ============================================================================
546
// IcebergOrcReader: _create_column_ids
547
// ============================================================================
548
ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type,
549
7
                                                    const TupleDescriptor* tuple_descriptor) {
550
7
    std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map;
551
58
    for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) {
552
51
        auto orc_sub_type = orc_type->getSubtype(i);
553
51
        if (!orc_sub_type) continue;
554
51
        if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
555
0
            continue;
556
0
        }
557
51
        int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
558
51
        iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type;
559
51
    }
560
561
7
    std::set<uint64_t> column_ids;
562
7
    std::set<uint64_t> filter_column_ids;
563
564
7
    auto process_access_paths = [](const orc::Type* orc_field,
565
7
                                   const std::vector<TColumnAccessPath>& access_paths,
566
14
                                   std::set<uint64_t>& out_ids) {
567
14
        process_nested_access_paths(
568
14
                orc_field, access_paths, out_ids,
569
14
                [](const orc::Type* type) { return type->getColumnId(); },
570
14
                [](const orc::Type* type) { return type->getMaximumColumnId(); },
571
14
                IcebergOrcNestedColumnUtils::extract_nested_column_ids);
572
14
    };
573
574
15
    for (const auto* slot : tuple_descriptor->slots()) {
575
15
        auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id());
576
15
        if (it == iceberg_id_to_orc_type_map.end()) {
577
0
            continue;
578
0
        }
579
15
        const orc::Type* orc_field = it->second;
580
581
15
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
582
15
             slot->col_type() != TYPE_MAP)) {
583
7
            column_ids.insert(orc_field->getColumnId());
584
7
            if (slot->is_predicate()) {
585
0
                filter_column_ids.insert(orc_field->getColumnId());
586
0
            }
587
7
            continue;
588
7
        }
589
590
8
        const auto& all_access_paths = slot->all_access_paths();
591
8
        process_access_paths(orc_field, all_access_paths, column_ids);
592
593
8
        const auto& predicate_access_paths = slot->predicate_access_paths();
594
8
        if (!predicate_access_paths.empty()) {
595
6
            process_access_paths(orc_field, predicate_access_paths, filter_column_ids);
596
6
        }
597
8
    }
598
599
7
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
600
7
}
601
602
// ============================================================================
603
// IcebergOrcReader: _read_position_delete_file
604
// ============================================================================
605
Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
606
0
                                                    DeleteFile* position_delete) {
607
0
    OrcReader orc_delete_reader(get_profile(), get_state(), get_scan_params(), *delete_range,
608
0
                                READ_DELETE_FILE_BATCH_SIZE, get_state()->timezone(), get_io_ctx(),
609
0
                                _meta_cache);
610
0
    OrcInitContext delete_ctx;
611
0
    delete_ctx.column_names = delete_file_col_names;
612
0
    delete_ctx.col_name_to_block_idx =
613
0
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX);
614
0
    RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_ctx));
615
616
0
    bool eof = false;
617
0
    DataTypePtr data_type_file_path {new DataTypeString};
618
0
    DataTypePtr data_type_pos {new DataTypeInt64};
619
0
    while (!eof) {
620
0
        Block block = {{data_type_file_path, ICEBERG_FILE_PATH}, {data_type_pos, ICEBERG_ROW_POS}};
621
622
0
        size_t read_rows = 0;
623
0
        RETURN_IF_ERROR(orc_delete_reader.get_next_block(&block, &read_rows, &eof));
624
625
0
        _gen_position_delete_file_range(block, position_delete, read_rows, false);
626
0
    }
627
0
    return Status::OK();
628
0
}
629
630
#include "common/compile_check_end.h"
631
} // namespace doris