Coverage Report

Created: 2026-03-28 18:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/iceberg_reader.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <glog/logging.h>
25
#include <parallel_hashmap/phmap.h>
26
#include <rapidjson/document.h>
27
28
#include <algorithm>
29
#include <cstring>
30
#include <functional>
31
#include <memory>
32
#include <set>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/column/column_string.h"
41
#include "core/column/column_vector.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "core/data_type/define_primitive_type.h"
44
#include "core/data_type/primitive_type.h"
45
#include "core/string_ref.h"
46
#include "exprs/aggregate/aggregate_function.h"
47
#include "format/format_common.h"
48
#include "format/generic_reader.h"
49
#include "format/orc/vorc_reader.h"
50
#include "format/parquet/schema_desc.h"
51
#include "format/parquet/vparquet_column_chunk_reader.h"
52
#include "format/table/deletion_vector_reader.h"
53
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
54
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
55
#include "format/table/nested_column_access_helper.h"
56
#include "format/table/table_format_reader.h"
57
#include "runtime/runtime_state.h"
58
#include "util/coding.h"
59
60
namespace cctz {
61
#include "common/compile_check_begin.h"
62
class time_zone;
63
} // namespace cctz
64
namespace doris {
65
class RowDescriptor;
66
class SlotDescriptor;
67
class TupleDescriptor;
68
69
namespace io {
70
struct IOContext;
71
} // namespace io
72
class VExprContext;
73
} // namespace doris
74
75
namespace doris {
76
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
77
78
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader,
79
                                       RuntimeProfile* profile, RuntimeState* state,
80
                                       const TFileScanRangeParams& params,
81
                                       const TFileRangeDesc& range, ShardedKVCache* kv_cache,
82
                                       io::IOContext* io_ctx, FileMetaCache* meta_cache)
83
23.9k
        : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx,
84
23.9k
                            meta_cache),
85
23.9k
          _kv_cache(kv_cache) {
86
23.9k
    static const char* iceberg_profile = "IcebergProfile";
87
23.9k
    ADD_TIMER(_profile, iceberg_profile);
88
23.9k
    _iceberg_profile.num_delete_files =
89
23.9k
            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile);
90
23.9k
    _iceberg_profile.num_delete_rows =
91
23.9k
            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
92
23.9k
    _iceberg_profile.delete_files_read_time =
93
23.9k
            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
94
23.9k
    _iceberg_profile.delete_rows_sort_time =
95
23.9k
            ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
96
23.9k
    _iceberg_profile.parse_delete_file_time =
97
23.9k
            ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile);
98
23.9k
}
99
100
30.8k
Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
101
30.8k
    RETURN_IF_ERROR(_expand_block_if_need(block));
102
103
30.8k
    RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
104
105
30.8k
    if (_equality_delete_impls.size() > 0) {
106
1.96k
        std::unique_ptr<IColumn::Filter> filter =
107
1.96k
                std::make_unique<IColumn::Filter>(block->rows(), 1);
108
2.48k
        for (auto& equality_delete_impl : _equality_delete_impls) {
109
2.48k
            RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
110
2.48k
                    block, _col_name_to_block_idx, _id_to_block_column_name, *filter));
111
2.48k
        }
112
1.96k
        Block::filter_block_internal(block, *filter, block->columns());
113
1.96k
    }
114
115
30.8k
    *read_rows = block->rows();
116
30.8k
    return _shrink_block_if_need(block);
117
30.8k
}
118
119
23.8k
Status IcebergTableReader::init_row_filters() {
120
    // We get the count value by doris's be, so we don't need to read the delete file
121
23.8k
    if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) {
122
0
        return Status::OK();
123
0
    }
124
125
23.8k
    const auto& table_desc = _range.table_format_params.iceberg_params;
126
23.8k
    const auto& version = table_desc.format_version;
127
23.8k
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
128
1.67k
        return Status::OK();
129
1.67k
    }
130
131
    // Initialize file information for $row_id generation
132
    // Extract from table_desc which contains current file's metadata
133
22.1k
    if (_need_row_id_column) {
134
98
        std::string file_path = table_desc.original_file_path;
135
98
        int32_t partition_spec_id = 0;
136
98
        std::string partition_data_json;
137
98
        if (table_desc.__isset.partition_spec_id) {
138
16
            partition_spec_id = table_desc.partition_spec_id;
139
16
        }
140
98
        if (table_desc.__isset.partition_data_json) {
141
16
            partition_data_json = table_desc.partition_data_json;
142
16
        }
143
144
98
        if (auto* parquet_reader = dynamic_cast<ParquetReader*>(_file_format_reader.get())) {
145
48
            parquet_reader->set_iceberg_rowid_params(file_path, partition_spec_id,
146
48
                                                     partition_data_json, _row_id_column_position);
147
50
        } else if (auto* orc_reader = dynamic_cast<OrcReader*>(_file_format_reader.get())) {
148
50
            orc_reader->set_iceberg_rowid_params(file_path, partition_spec_id, partition_data_json,
149
50
                                                 _row_id_column_position);
150
50
        }
151
98
        LOG(INFO) << "Initialized $row_id generation for file: " << file_path
152
98
                  << ", partition_spec_id: " << partition_spec_id;
153
98
    }
154
155
22.1k
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
156
22.1k
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
157
22.1k
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
158
22.1k
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
159
11.7k
        if (desc.content == POSITION_DELETE) {
160
6.33k
            position_delete_files.emplace_back(desc);
161
6.33k
        } else if (desc.content == EQUALITY_DELETE) {
162
3.04k
            equality_delete_files.emplace_back(desc);
163
3.04k
        } else if (desc.content == DELETION_VECTOR) {
164
2.36k
            deletion_vector_files.emplace_back(desc);
165
2.36k
        }
166
11.7k
    }
167
168
22.1k
    if (!equality_delete_files.empty()) {
169
1.34k
        RETURN_IF_ERROR(_process_equality_delete(equality_delete_files));
170
1.34k
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
171
1.34k
    }
172
173
22.1k
    if (!deletion_vector_files.empty()) {
174
2.36k
        if (deletion_vector_files.size() != 1) [[unlikely]] {
175
            /*
176
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
177
             * at execution time than position delete files. Unlike equality or position delete files, there can be
178
             * at most one deletion vector for a given data file in a snapshot.
179
             */
180
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
181
0
        }
182
2.36k
        RETURN_IF_ERROR(
183
2.36k
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
184
185
2.36k
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
186
        // Readers can safely ignore position delete files if there is a DV for a data file.
187
19.8k
    } else if (!position_delete_files.empty()) {
188
2.03k
        RETURN_IF_ERROR(
189
2.03k
                _position_delete_base(table_desc.original_file_path, position_delete_files));
190
2.03k
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
191
2.03k
    }
192
193
22.1k
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
194
22.1k
    return Status::OK();
195
22.1k
}
196
197
void IcebergTableReader::_generate_equality_delete_block(
198
        Block* block, const std::vector<std::string>& equality_delete_col_names,
199
6.27k
        const std::vector<DataTypePtr>& equality_delete_col_types) {
200
16.0k
    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
201
9.76k
        DataTypePtr data_type = make_nullable(equality_delete_col_types[i]);
202
9.76k
        MutableColumnPtr data_column = data_type->create_column();
203
9.76k
        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
204
9.76k
                                            equality_delete_col_names[i]));
205
9.76k
    }
206
6.27k
}
207
208
30.8k
Status IcebergTableReader::_expand_block_if_need(Block* block) {
209
30.8k
    std::set<std::string> names;
210
30.8k
    auto block_names = block->get_names();
211
30.8k
    names.insert(block_names.begin(), block_names.end());
212
30.8k
    for (auto& col : _expand_columns) {
213
946
        col.column->assume_mutable()->clear();
214
946
        if (names.contains(col.name)) {
215
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
216
0
        }
217
946
        names.insert(col.name);
218
946
        (*_col_name_to_block_idx)[col.name] = static_cast<uint32_t>(block->columns());
219
946
        block->insert(col);
220
946
    }
221
30.8k
    return Status::OK();
222
30.8k
}
223
224
30.9k
Status IcebergTableReader::_shrink_block_if_need(Block* block) {
225
30.9k
    std::set<size_t> positions_to_erase;
226
30.9k
    for (const std::string& expand_col : _expand_col_names) {
227
946
        if (!_col_name_to_block_idx->contains(expand_col)) {
228
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
229
0
                                         block->dump_names());
230
0
        }
231
946
        positions_to_erase.emplace((*_col_name_to_block_idx)[expand_col]);
232
946
    }
233
30.9k
    block->erase(positions_to_erase);
234
30.9k
    for (const std::string& expand_col : _expand_col_names) {
235
946
        _col_name_to_block_idx->erase(expand_col);
236
946
    }
237
30.9k
    return Status::OK();
238
30.9k
}
239
240
Status IcebergTableReader::_position_delete_base(
241
2.02k
        const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
242
2.02k
    std::vector<DeleteRows*> delete_rows_array;
243
2.02k
    int64_t num_delete_rows = 0;
244
6.33k
    for (const auto& delete_file : delete_files) {
245
6.33k
        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
246
6.33k
        Status create_status = Status::OK();
247
6.33k
        auto* delete_file_cache = _kv_cache->get<DeleteFile>(
248
6.33k
                _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* {
249
1.73k
                    auto* position_delete = new DeleteFile;
250
1.73k
                    TFileRangeDesc delete_file_range;
251
                    // must use __set() method to make sure __isset is true
252
1.73k
                    delete_file_range.__set_fs_name(_range.fs_name);
253
1.73k
                    delete_file_range.path = delete_file.path;
254
1.73k
                    delete_file_range.start_offset = 0;
255
1.73k
                    delete_file_range.size = -1;
256
1.73k
                    delete_file_range.file_size = -1;
257
                    //read position delete file base on delete_file_range , generate DeleteFile , add DeleteFile to kv_cache
258
1.73k
                    create_status = _read_position_delete_file(&delete_file_range, position_delete);
259
260
1.73k
                    if (!create_status) {
261
0
                        return nullptr;
262
0
                    }
263
264
1.73k
                    return position_delete;
265
1.73k
                });
266
6.33k
        if (create_status.is<ErrorCode::END_OF_FILE>()) {
267
0
            continue;
268
6.33k
        } else if (!create_status.ok()) {
269
0
            return create_status;
270
0
        }
271
272
6.33k
        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
273
6.33k
        auto get_value = [&](const auto& v) {
274
3.68k
            DeleteRows* row_ids = v.second.get();
275
3.68k
            if (!row_ids->empty()) {
276
3.68k
                delete_rows_array.emplace_back(row_ids);
277
3.68k
                num_delete_rows += row_ids->size();
278
3.68k
            }
279
3.68k
        };
280
6.33k
        delete_file_map.if_contains(data_file_path, get_value);
281
6.33k
    }
282
    // Use a KV cache to store the delete rows corresponding to a data file path.
283
    // The Parquet/ORC reader holds a reference (pointer) to this cached entry.
284
    // This allows delete rows to be reused when a single data file is split into
285
    // multiple splits, avoiding excessive memory usage when delete rows are large.
286
2.02k
    if (num_delete_rows > 0) {
287
1.48k
        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
288
1.48k
        _iceberg_delete_rows =
289
1.48k
                _kv_cache->get<DeleteRows>(data_file_path,
290
1.48k
                                           [&]() -> DeleteRows* {
291
1.48k
                                               auto* data_file_position_delete = new DeleteRows;
292
1.48k
                                               _sort_delete_rows(delete_rows_array, num_delete_rows,
293
1.48k
                                                                 *data_file_position_delete);
294
295
1.48k
                                               return data_file_position_delete;
296
1.48k
                                           }
297
298
1.48k
                );
299
1.48k
        set_delete_rows();
300
1.48k
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
301
1.48k
    }
302
2.02k
    return Status::OK();
303
2.02k
}
304
305
IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
306
438
        const ColumnDictI32& file_path_column) {
307
438
    IcebergTableReader::PositionDeleteRange range;
308
438
    size_t read_rows = file_path_column.get_data().size();
309
438
    const int* code_path = file_path_column.get_data().data();
310
438
    const int* code_path_start = code_path;
311
438
    const int* code_path_end = code_path + read_rows;
312
876
    while (code_path < code_path_end) {
313
438
        int code = code_path[0];
314
438
        const int* code_end = std::upper_bound(code_path, code_path_end, code);
315
438
        range.data_file_path.emplace_back(file_path_column.get_value(code).to_string());
316
438
        range.range.emplace_back(code_path - code_path_start, code_end - code_path_start);
317
438
        code_path = code_end;
318
438
    }
319
438
    return range;
320
438
}
321
322
IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
323
2.25k
        const ColumnString& file_path_column) {
324
2.25k
    IcebergTableReader::PositionDeleteRange range;
325
2.25k
    size_t read_rows = file_path_column.size();
326
2.25k
    size_t index = 0;
327
8.45k
    while (index < read_rows) {
328
6.19k
        StringRef data_path = file_path_column.get_data_at(index);
329
6.19k
        size_t left = index - 1;
330
6.19k
        size_t right = read_rows;
331
54.6k
        while (left + 1 != right) {
332
48.4k
            size_t mid = left + (right - left) / 2;
333
48.4k
            if (file_path_column.get_data_at(mid) > data_path) {
334
30.1k
                right = mid;
335
30.1k
            } else {
336
18.2k
                left = mid;
337
18.2k
            }
338
48.4k
        }
339
6.19k
        range.data_file_path.emplace_back(data_path.to_string());
340
6.19k
        range.range.emplace_back(index, left + 1);
341
6.19k
        index = left + 1;
342
6.19k
    }
343
2.25k
    return range;
344
2.25k
}
345
346
/**
347
 * https://iceberg.apache.org/spec/#position-delete-files
348
 * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
349
 * Sorting by file_path allows filter pushdown by file in columnar storage formats.
350
 * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
351
 */
352
void IcebergTableReader::_sort_delete_rows(
353
        const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows,
354
1.48k
        std::vector<int64_t>& result) {
355
1.48k
    if (delete_rows_array.empty()) {
356
0
        return;
357
0
    }
358
1.48k
    if (delete_rows_array.size() == 1) {
359
708
        result.resize(num_delete_rows);
360
708
        memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows);
361
708
        return;
362
708
    }
363
772
    if (delete_rows_array.size() == 2) {
364
56
        result.resize(num_delete_rows);
365
56
        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
366
56
                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
367
56
                   result.begin());
368
56
        return;
369
56
    }
370
371
716
    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
372
716
    result.resize(num_delete_rows);
373
716
    auto row_id_iter = result.begin();
374
716
    auto iter_end = result.end();
375
716
    std::vector<vec_pair> rows_array;
376
2.86k
    for (auto* rows : delete_rows_array) {
377
2.86k
        if (!rows->empty()) {
378
2.86k
            rows_array.emplace_back(rows->begin(), rows->end());
379
2.86k
        }
380
2.86k
    }
381
716
    size_t array_size = rows_array.size();
382
335k
    while (row_id_iter != iter_end) {
383
334k
        int64_t min_index = 0;
384
334k
        int64_t min = *rows_array[0].first;
385
1.66M
        for (size_t i = 0; i < array_size; ++i) {
386
1.32M
            if (*rows_array[i].first < min) {
387
329k
                min_index = i;
388
329k
                min = *rows_array[i].first;
389
329k
            }
390
1.32M
        }
391
334k
        *row_id_iter++ = min;
392
334k
        rows_array[min_index].first++;
393
334k
        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
394
2.86k
            rows_array.erase(rows_array.begin() + min_index);
395
2.86k
            array_size--;
396
2.86k
        }
397
334k
    }
398
716
}
399
400
void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete,
401
                                                         size_t read_rows,
402
2.69k
                                                         bool file_path_column_dictionary_coded) {
403
2.69k
    SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
404
    // todo: maybe do not need to build name to index map every time
405
2.69k
    auto name_to_pos_map = block.get_name_to_pos_map();
406
2.69k
    ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
407
2.69k
    DCHECK_EQ(path_column->size(), read_rows);
408
2.69k
    ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column;
409
2.69k
    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
410
2.69k
    const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data();
411
2.69k
    IcebergTableReader::PositionDeleteRange range;
412
2.69k
    if (file_path_column_dictionary_coded) {
413
438
        range = _get_range(assert_cast<const ColumnDictI32&>(*path_column));
414
2.25k
    } else {
415
2.25k
        range = _get_range(assert_cast<const ColumnString&>(*path_column));
416
2.25k
    }
417
9.32k
    for (int i = 0; i < range.range.size(); ++i) {
418
6.63k
        std::string key = range.data_file_path[i];
419
6.63k
        auto iter = position_delete->find(key);
420
6.63k
        DeleteRows* delete_rows;
421
6.63k
        if (iter == position_delete->end()) {
422
6.63k
            delete_rows = new DeleteRows;
423
6.63k
            std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows);
424
6.63k
            (*position_delete)[key] = std::move(delete_rows_ptr);
425
18.4E
        } else {
426
18.4E
            delete_rows = iter->second.get();
427
18.4E
        }
428
6.63k
        const int64_t* cpy_start = src_data + range.range[i].first;
429
6.63k
        const int64_t cpy_count = range.range[i].second - range.range[i].first;
430
6.63k
        int64_t origin_size = delete_rows->size();
431
6.63k
        delete_rows->resize(origin_size + cpy_count);
432
6.63k
        int64_t* dest_position = &(*delete_rows)[origin_size];
433
6.63k
        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
434
6.63k
    }
435
2.69k
}
436
437
Status IcebergParquetReader::init_reader(
438
        const std::vector<std::string>& file_col_names,
439
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
440
        const VExprContextSPtrs& conjuncts,
441
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
442
                slot_id_to_predicates,
443
        const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
444
        const std::unordered_map<std::string, int>* colname_to_slot_id,
445
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
446
17.7k
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
447
17.7k
    _file_format = Fileformat::PARQUET;
448
17.7k
    _col_name_to_block_idx = col_name_to_block_idx;
449
17.7k
    auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
450
17.7k
    RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&_data_file_field_desc));
451
17.7k
    DCHECK(_data_file_field_desc != nullptr);
452
453
17.7k
    auto column_id_result = _create_column_ids(_data_file_field_desc, tuple_descriptor);
454
17.7k
    auto& column_ids = column_id_result.column_ids;
455
17.7k
    const auto& filter_column_ids = column_id_result.filter_column_ids;
456
457
17.7k
    RETURN_IF_ERROR(init_row_filters());
458
17.7k
    _all_required_col_names = file_col_names;
459
460
17.7k
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
461
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(
462
1
                tuple_descriptor, *_data_file_field_desc, table_info_node_ptr));
463
17.7k
    } else {
464
17.7k
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
465
466
17.7k
        bool exist_field_id = true;
467
139k
        for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
468
122k
            if (_data_file_field_desc->get_column(idx)->field_id == -1) {
469
                // the data file may be from hive table migrated to iceberg, field id is missing
470
2
                exist_field_id = false;
471
2
                break;
472
2
            }
473
122k
        }
474
17.7k
        const auto& table_schema = _params.history_schema_info.front().root_field;
475
476
17.7k
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
477
17.7k
        if (exist_field_id) {
478
            // id -> table column name. columns that need read data file.
479
17.6k
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
480
55.6k
            for (const auto& table_field : table_schema.fields) {
481
55.6k
                auto field = table_field.field_ptr;
482
55.6k
                DCHECK(field->__isset.name);
483
55.6k
                if (!read_col_name_set.contains(field->name)) {
484
2.47k
                    continue;
485
2.47k
                }
486
53.1k
                id_to_table_field.emplace(field->id, field);
487
53.1k
            }
488
489
139k
            for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
490
122k
                const auto& data_file_field = _data_file_field_desc->get_column(idx);
491
122k
                auto data_file_column_id = _data_file_field_desc->get_column(idx)->field_id;
492
493
122k
                if (id_to_table_field.contains(data_file_column_id)) {
494
50.3k
                    const auto& table_field = id_to_table_field[data_file_column_id];
495
496
50.3k
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
497
50.3k
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
498
50.3k
                            *table_field, *data_file_field, exist_field_id, field_node));
499
50.3k
                    table_info_node_ptr->add_children(table_field->name, data_file_field->name,
500
50.3k
                                                      field_node);
501
502
50.3k
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
503
50.3k
                    id_to_table_field.erase(data_file_column_id);
504
71.7k
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
505
                    // Columns that need to be read for equality delete.
506
318
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
507
508
                    // Construct table column names that avoid duplication with current table schema.
509
                    // As the columns currently being read may have been deleted in the latest
510
                    // table structure or have undergone a series of schema changes...
511
318
                    std::string table_column_name = EQ_DELETE_PRE + data_file_field->name;
512
318
                    table_info_node_ptr->add_children(
513
318
                            table_column_name, data_file_field->name,
514
318
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
515
516
318
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
517
318
                    _expand_col_names.emplace_back(table_column_name);
518
318
                    auto expand_data_type = make_nullable(data_file_field->data_type);
519
318
                    _expand_columns.emplace_back(
520
318
                            ColumnWithTypeAndName {expand_data_type->create_column(),
521
318
                                                   expand_data_type, table_column_name});
522
523
318
                    _all_required_col_names.emplace_back(table_column_name);
524
318
                    column_ids.insert(data_file_field->get_column_id());
525
318
                }
526
122k
            }
527
17.6k
            for (const auto& [id, table_field] : id_to_table_field) {
528
2.87k
                table_info_node_ptr->add_not_exist_children(table_field->name);
529
2.87k
            }
530
17.6k
        } else {
531
64
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
532
0
                return Status::InternalError(
533
0
                        "Can not read missing field id data file when have equality delete");
534
0
            }
535
64
            std::map<std::string, size_t> file_column_idx_map;
536
72
            for (size_t idx = 0; idx < _data_file_field_desc->size(); idx++) {
537
8
                file_column_idx_map.emplace(_data_file_field_desc->get_column(idx)->name, idx);
538
8
            }
539
540
64
            for (const auto& table_field : table_schema.fields) {
541
2
                DCHECK(table_field.__isset.field_ptr);
542
2
                DCHECK(table_field.field_ptr->__isset.name);
543
2
                const auto& table_column_name = table_field.field_ptr->name;
544
2
                if (!read_col_name_set.contains(table_column_name)) {
545
0
                    continue;
546
0
                }
547
2
                if (!table_field.field_ptr->__isset.name_mapping ||
548
2
                    table_field.field_ptr->name_mapping.size() == 0) {
549
2
                    return Status::DataQualityError(
550
2
                            "name_mapping must be set when read missing field id data file.");
551
2
                }
552
0
                bool have_mapping = false;
553
0
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
554
0
                    if (file_column_idx_map.contains(mapped_name)) {
555
0
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
556
0
                        const auto& file_field = _data_file_field_desc->get_column(
557
0
                                file_column_idx_map.at(mapped_name));
558
0
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
559
0
                                *table_field.field_ptr, *file_field, exist_field_id, field_node));
560
0
                        table_info_node_ptr->add_children(table_column_name, file_field->name,
561
0
                                                          field_node);
562
0
                        have_mapping = true;
563
0
                        break;
564
0
                    }
565
0
                }
566
0
                if (!have_mapping) {
567
0
                    table_info_node_ptr->add_not_exist_children(table_column_name);
568
0
                }
569
0
            }
570
64
        }
571
17.7k
    }
572
573
17.7k
    return parquet_reader->init_reader(
574
17.7k
            _all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates,
575
17.7k
            tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts,
576
17.7k
            slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids);
577
17.7k
}
578
579
ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
580
17.6k
                                                        const TupleDescriptor* tuple_descriptor) {
581
    // First, assign column IDs to the field descriptor
582
17.6k
    auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc);
583
17.6k
    mutable_field_desc->assign_ids();
584
585
    // map top-level table column iceberg_id -> FieldSchema*
586
17.6k
    std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map;
587
588
139k
    for (int i = 0; i < field_desc->size(); ++i) {
589
122k
        auto field_schema = field_desc->get_column(i);
590
122k
        if (!field_schema) continue;
591
592
122k
        int iceberg_id = field_schema->field_id;
593
122k
        iceberg_id_to_field_schema_map[iceberg_id] = field_schema;
594
122k
    }
595
596
17.6k
    std::set<uint64_t> column_ids;
597
17.6k
    std::set<uint64_t> filter_column_ids;
598
599
    // helper to process access paths for a given top-level parquet field
600
17.6k
    auto process_access_paths = [](const FieldSchema* parquet_field,
601
17.6k
                                   const std::vector<TColumnAccessPath>& access_paths,
602
17.6k
                                   std::set<uint64_t>& out_ids) {
603
15.7k
        process_nested_access_paths(
604
15.7k
                parquet_field, access_paths, out_ids,
605
15.7k
                [](const FieldSchema* field) { return field->get_column_id(); },
606
15.7k
                [](const FieldSchema* field) { return field->get_max_column_id(); },
607
15.7k
                IcebergParquetNestedColumnUtils::extract_nested_column_ids);
608
15.7k
    };
609
610
53.4k
    for (const auto* slot : tuple_descriptor->slots()) {
611
53.4k
        auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id());
612
53.4k
        if (it == iceberg_id_to_field_schema_map.end()) {
613
            // Column not found in file (e.g., partition column, added column)
614
3.16k
            continue;
615
3.16k
        }
616
50.3k
        auto field_schema = it->second;
617
618
        // primitive (non-nested) types: direct mapping by name
619
50.3k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
620
50.3k
             slot->col_type() != TYPE_MAP)) {
621
36.0k
            column_ids.insert(field_schema->column_id);
622
623
36.0k
            if (slot->is_predicate()) {
624
5.71k
                filter_column_ids.insert(field_schema->column_id);
625
5.71k
            }
626
36.0k
            continue;
627
36.0k
        }
628
629
        // complex types:
630
14.2k
        const auto& all_access_paths = slot->all_access_paths();
631
14.2k
        process_access_paths(field_schema, all_access_paths, column_ids);
632
633
14.2k
        const auto& predicate_access_paths = slot->predicate_access_paths();
634
14.2k
        if (!predicate_access_paths.empty()) {
635
1.58k
            process_access_paths(field_schema, predicate_access_paths, filter_column_ids);
636
1.58k
        }
637
14.2k
    }
638
17.6k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
639
17.6k
}
640
641
Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* delete_range,
642
780
                                                         DeleteFile* position_delete) {
643
780
    ParquetReader parquet_delete_reader(_profile, _params, *delete_range,
644
780
                                        READ_DELETE_FILE_BATCH_SIZE, &_state->timezone_obj(),
645
780
                                        _io_ctx, _state, _meta_cache);
646
780
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
647
780
    RETURN_IF_ERROR(parquet_delete_reader.init_reader(
648
780
            delete_file_col_names,
649
780
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX),
650
780
            {}, tmp, nullptr, nullptr, nullptr, nullptr, nullptr,
651
780
            TableSchemaChangeHelper::ConstNode::get_instance(), false));
652
653
780
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
654
780
            partition_columns;
655
780
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
656
780
    RETURN_IF_ERROR(parquet_delete_reader.set_fill_columns(partition_columns, missing_columns));
657
658
780
    const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data();
659
780
    bool dictionary_coded = true;
660
780
    for (const auto& row_group : meta_data->row_groups) {
661
778
        const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX];
662
778
        if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
663
342
            dictionary_coded = false;
664
342
            break;
665
342
        }
666
778
    }
667
780
    DataTypePtr data_type_file_path {new DataTypeString};
668
780
    DataTypePtr data_type_pos {new DataTypeInt64};
669
780
    bool eof = false;
670
1.56k
    while (!eof) {
671
780
        Block block = {dictionary_coded
672
780
                               ? ColumnWithTypeAndName {ColumnDictI32::create(
673
438
                                                                FieldType::OLAP_FIELD_TYPE_VARCHAR),
674
438
                                                        data_type_file_path, ICEBERG_FILE_PATH}
675
780
                               : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH},
676
677
780
                       {data_type_pos, ICEBERG_ROW_POS}};
678
780
        size_t read_rows = 0;
679
780
        RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof));
680
681
780
        if (read_rows <= 0) {
682
0
            break;
683
0
        }
684
780
        _gen_position_delete_file_range(block, position_delete, read_rows, dictionary_coded);
685
780
    }
686
780
    return Status::OK();
687
780
};
688
689
Status IcebergOrcReader::init_reader(
690
        const std::vector<std::string>& file_col_names,
691
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
692
        const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
693
        const RowDescriptor* row_descriptor,
694
        const std::unordered_map<std::string, int>* colname_to_slot_id,
695
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
696
6.20k
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
697
6.20k
    _file_format = Fileformat::ORC;
698
6.20k
    _col_name_to_block_idx = col_name_to_block_idx;
699
6.20k
    auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
700
6.20k
    RETURN_IF_ERROR(orc_reader->get_file_type(&_data_file_type_desc));
701
6.20k
    std::vector<std::string> data_file_col_names;
702
6.20k
    std::vector<DataTypePtr> data_file_col_types;
703
6.20k
    RETURN_IF_ERROR(orc_reader->get_parsed_schema(&data_file_col_names, &data_file_col_types));
704
705
6.20k
    auto column_id_result = _create_column_ids(_data_file_type_desc, tuple_descriptor);
706
6.20k
    auto& column_ids = column_id_result.column_ids;
707
6.20k
    const auto& filter_column_ids = column_id_result.filter_column_ids;
708
709
6.20k
    RETURN_IF_ERROR(init_row_filters());
710
711
6.20k
    _all_required_col_names = file_col_names;
712
6.20k
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
713
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, _data_file_type_desc,
714
1
                                                        table_info_node_ptr));
715
6.20k
    } else {
716
6.20k
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
717
718
6.20k
        bool exist_field_id = true;
719
49.3k
        for (size_t idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
720
43.1k
            if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
721
0
                exist_field_id = false;
722
0
                break;
723
0
            }
724
43.1k
        }
725
726
6.20k
        const auto& table_schema = _params.history_schema_info.front().root_field;
727
6.20k
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
728
6.20k
        if (exist_field_id) {
729
            // id -> table column name. columns that need read data file.
730
6.19k
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
731
31.0k
            for (const auto& table_field : table_schema.fields) {
732
31.0k
                auto field = table_field.field_ptr;
733
31.0k
                DCHECK(field->__isset.name);
734
31.0k
                if (!read_col_name_set.contains(field->name)) {
735
514
                    continue;
736
514
                }
737
738
30.5k
                id_to_table_field.emplace(field->id, field);
739
30.5k
            }
740
741
49.2k
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
742
43.1k
                const auto& data_file_field = _data_file_type_desc->getSubtype(idx);
743
43.1k
                auto data_file_column_id =
744
43.1k
                        std::stoi(data_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
745
43.1k
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
746
747
43.1k
                if (id_to_table_field.contains(data_file_column_id)) {
748
28.7k
                    const auto& table_field = id_to_table_field[data_file_column_id];
749
750
28.7k
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
751
28.7k
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
752
28.7k
                            *table_field, data_file_field, ICEBERG_ORC_ATTRIBUTE, exist_field_id,
753
28.7k
                            field_node));
754
28.7k
                    table_info_node_ptr->add_children(table_field->name, file_column_name,
755
28.7k
                                                      field_node);
756
757
28.7k
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
758
28.7k
                    id_to_table_field.erase(data_file_column_id);
759
28.7k
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
760
                    // Columns that need to be read for equality delete.
761
318
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
762
763
                    // Construct table column names that avoid duplication with current table schema.
764
                    // As the columns currently being read may have been deleted in the latest
765
                    // table structure or have undergone a series of schema changes...
766
318
                    std::string table_column_name = EQ_DELETE_PRE + file_column_name;
767
318
                    table_info_node_ptr->add_children(
768
318
                            table_column_name, file_column_name,
769
318
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
770
771
318
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
772
318
                    _expand_col_names.emplace_back(table_column_name);
773
774
318
                    auto expand_data_type = make_nullable(data_file_col_types[idx]);
775
318
                    _expand_columns.emplace_back(
776
318
                            ColumnWithTypeAndName {expand_data_type->create_column(),
777
318
                                                   expand_data_type, table_column_name});
778
779
318
                    _all_required_col_names.emplace_back(table_column_name);
780
318
                    column_ids.insert(data_file_field->getColumnId());
781
318
                }
782
43.1k
            }
783
6.19k
            for (const auto& [id, table_field] : id_to_table_field) {
784
1.81k
                table_info_node_ptr->add_not_exist_children(table_field->name);
785
1.81k
            }
786
6.19k
        } else {
787
10
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
788
0
                return Status::InternalError(
789
0
                        "Can not read missing field id data file when have equality delete");
790
0
            }
791
10
            std::map<std::string, size_t> file_column_idx_map;
792
10
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
793
0
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
794
0
                file_column_idx_map.emplace(file_column_name, idx);
795
0
            }
796
797
10
            for (const auto& table_field : table_schema.fields) {
798
0
                DCHECK(table_field.__isset.field_ptr);
799
0
                DCHECK(table_field.field_ptr->__isset.name);
800
0
                const auto& table_column_name = table_field.field_ptr->name;
801
0
                if (!read_col_name_set.contains(table_column_name)) {
802
0
                    continue;
803
0
                }
804
0
                if (!table_field.field_ptr->__isset.name_mapping ||
805
0
                    table_field.field_ptr->name_mapping.size() == 0) {
806
0
                    return Status::DataQualityError(
807
0
                            "name_mapping must be set when read missing field id data file.");
808
0
                }
809
0
                auto have_mapping = false;
810
0
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
811
0
                    if (file_column_idx_map.contains(mapped_name)) {
812
0
                        auto file_column_idx = file_column_idx_map.at(mapped_name);
813
0
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
814
0
                        const auto& file_field = _data_file_type_desc->getSubtype(file_column_idx);
815
0
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
816
0
                                *table_field.field_ptr, file_field, ICEBERG_ORC_ATTRIBUTE,
817
0
                                exist_field_id, field_node));
818
0
                        table_info_node_ptr->add_children(
819
0
                                table_column_name,
820
0
                                _data_file_type_desc->getFieldName(file_column_idx), field_node);
821
0
                        have_mapping = true;
822
0
                        break;
823
0
                    }
824
0
                }
825
0
                if (!have_mapping) {
826
0
                    table_info_node_ptr->add_not_exist_children(table_column_name);
827
0
                }
828
0
            }
829
10
        }
830
6.20k
    }
831
832
6.20k
    return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts,
833
6.20k
                                   false, tuple_descriptor, row_descriptor,
834
6.20k
                                   not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts,
835
6.20k
                                   table_info_node_ptr, column_ids, filter_column_ids);
836
6.20k
}
837
838
ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type,
839
6.20k
                                                    const TupleDescriptor* tuple_descriptor) {
840
    // map top-level table column iceberg_id -> orc::Type*
841
6.20k
    std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map;
842
49.3k
    for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) {
843
43.1k
        auto orc_sub_type = orc_type->getSubtype(i);
844
43.1k
        if (!orc_sub_type) continue;
845
846
43.1k
        if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
847
0
            continue;
848
0
        }
849
43.1k
        int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
850
43.1k
        iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type;
851
43.1k
    }
852
853
6.20k
    std::set<uint64_t> column_ids;
854
6.20k
    std::set<uint64_t> filter_column_ids;
855
856
    // helper to process access paths for a given top-level orc field
857
6.20k
    auto process_access_paths = [](const orc::Type* orc_field,
858
6.20k
                                   const std::vector<TColumnAccessPath>& access_paths,
859
14.8k
                                   std::set<uint64_t>& out_ids) {
860
14.8k
        process_nested_access_paths(
861
14.8k
                orc_field, access_paths, out_ids,
862
14.8k
                [](const orc::Type* type) { return type->getColumnId(); },
863
14.8k
                [](const orc::Type* type) { return type->getMaximumColumnId(); },
864
14.8k
                IcebergOrcNestedColumnUtils::extract_nested_column_ids);
865
14.8k
    };
866
867
30.8k
    for (const auto* slot : tuple_descriptor->slots()) {
868
30.8k
        auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id());
869
30.8k
        if (it == iceberg_id_to_orc_type_map.end()) {
870
            // Column not found in file
871
2.09k
            continue;
872
2.09k
        }
873
28.7k
        const orc::Type* orc_field = it->second;
874
875
        // primitive (non-nested) types
876
28.7k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
877
28.7k
             slot->col_type() != TYPE_MAP)) {
878
15.4k
            column_ids.insert(orc_field->getColumnId());
879
15.4k
            if (slot->is_predicate()) {
880
2.34k
                filter_column_ids.insert(orc_field->getColumnId());
881
2.34k
            }
882
15.4k
            continue;
883
15.4k
        }
884
885
        // complex types
886
13.3k
        const auto& all_access_paths = slot->all_access_paths();
887
13.3k
        process_access_paths(orc_field, all_access_paths, column_ids);
888
889
13.3k
        const auto& predicate_access_paths = slot->predicate_access_paths();
890
13.3k
        if (!predicate_access_paths.empty()) {
891
1.55k
            process_access_paths(orc_field, predicate_access_paths, filter_column_ids);
892
1.55k
        }
893
13.3k
    }
894
895
6.20k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
896
6.20k
}
897
898
Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
899
958
                                                    DeleteFile* position_delete) {
900
958
    OrcReader orc_delete_reader(_profile, _state, _params, *delete_range,
901
958
                                READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx,
902
958
                                _meta_cache);
903
958
    RETURN_IF_ERROR(orc_delete_reader.init_reader(
904
958
            &delete_file_col_names,
905
958
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX),
906
958
            {}, false, {}, {}, nullptr, nullptr));
907
908
958
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
909
958
            partition_columns;
910
958
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
911
958
    RETURN_IF_ERROR(orc_delete_reader.set_fill_columns(partition_columns, missing_columns));
912
913
958
    bool eof = false;
914
958
    DataTypePtr data_type_file_path {new DataTypeString};
915
958
    DataTypePtr data_type_pos {new DataTypeInt64};
916
2.87k
    while (!eof) {
917
1.91k
        Block block = {{data_type_file_path, ICEBERG_FILE_PATH}, {data_type_pos, ICEBERG_ROW_POS}};
918
919
1.91k
        size_t read_rows = 0;
920
1.91k
        RETURN_IF_ERROR(orc_delete_reader.get_next_block(&block, &read_rows, &eof));
921
922
1.91k
        _gen_position_delete_file_range(block, position_delete, read_rows, false);
923
1.91k
    }
924
958
    return Status::OK();
925
958
}
926
927
// Directly read the deletion vector using the `content_offset` and
928
// `content_size_in_bytes` provided by FE in `delete_file_desc`.
929
// These two fields indicate the location of a blob in storage.
930
// Since the current format is `deletion-vector-v1`, which does not
931
// compress any blobs, we can temporarily skip parsing the Puffin footer.
932
Status IcebergTableReader::read_deletion_vector(const std::string& data_file_path,
933
2.36k
                                                const TIcebergDeleteFileDesc& delete_file_desc) {
934
2.36k
    Status create_status = Status::OK();
935
2.36k
    SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
936
2.36k
    _iceberg_delete_rows = _kv_cache->get<DeleteRows>(data_file_path, [&]() -> DeleteRows* {
937
886
        auto* delete_rows = new DeleteRows;
938
939
886
        TFileRangeDesc delete_range;
940
        // must use __set() method to make sure __isset is true
941
886
        delete_range.__set_fs_name(_range.fs_name);
942
886
        delete_range.path = delete_file_desc.path;
943
886
        delete_range.start_offset = delete_file_desc.content_offset;
944
886
        delete_range.size = delete_file_desc.content_size_in_bytes;
945
886
        delete_range.file_size = -1;
946
947
        // We may consider caching the DeletionVectorReader when reading Puffin files,
948
        // where the underlying reader is an `InMemoryFileReader` and a single data file is
949
        // split into multiple splits. However, we need to ensure that the underlying
950
        // reader supports multi-threaded access.
951
886
        DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx);
952
886
        create_status = dv_reader.open();
953
886
        if (!create_status.ok()) [[unlikely]] {
954
0
            return nullptr;
955
0
        }
956
957
886
        size_t buffer_size = delete_range.size;
958
886
        std::vector<char> buf(buffer_size);
959
886
        if (buffer_size < 12) [[unlikely]] {
960
            // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32
961
0
            create_status = Status::DataQualityError("Deletion vector file size too small: {}",
962
0
                                                     buffer_size);
963
0
            return nullptr;
964
0
        }
965
966
886
        create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size});
967
886
        if (!create_status) [[unlikely]] {
968
0
            return nullptr;
969
0
        }
970
        // The serialized blob contains:
971
        //
972
        // Combined length of the vector and magic bytes stored as 4 bytes, big-endian
973
        // A 4-byte magic sequence, D1 D3 39 64
974
        // The vector, serialized as described below
975
        // A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian
976
977
886
        auto total_length = BigEndian::Load32(buf.data());
978
886
        if (total_length + 8 != buffer_size) [[unlikely]] {
979
0
            create_status = Status::DataQualityError(
980
0
                    "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8,
981
0
                    buffer_size);
982
0
            return nullptr;
983
0
        }
984
985
886
        constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
986
886
        if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] {
987
0
            create_status = Status::DataQualityError("Deletion vector magic number mismatch");
988
0
            return nullptr;
989
0
        }
990
991
886
        roaring::Roaring64Map bitmap;
992
886
        SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
993
886
        try {
994
886
            bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12);
995
886
        } catch (const std::runtime_error& e) {
996
0
            create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
997
0
            return nullptr;
998
0
        }
999
        // skip CRC-32 checksum
1000
1001
886
        delete_rows->reserve(bitmap.cardinality());
1002
4.99M
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
1003
4.99M
            delete_rows->push_back(*it);
1004
4.99M
        }
1005
886
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
1006
886
        return delete_rows;
1007
886
    });
1008
1009
2.36k
    RETURN_IF_ERROR(create_status);
1010
2.36k
    if (!_iceberg_delete_rows->empty()) [[likely]] {
1011
2.36k
        set_delete_rows();
1012
2.36k
    }
1013
2.36k
    return Status::OK();
1014
2.36k
}
1015
1016
// Similar to the code structure of IcebergOrcReader::_process_equality_delete,
1017
// but considering the significant differences in how parquet/orc obtains
1018
// attributes/column IDs, it is not easy to combine them.
1019
Status IcebergParquetReader::_process_equality_delete(
1020
670
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
1021
670
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
1022
670
            partition_columns;
1023
670
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
1024
1025
670
    std::map<int, const FieldSchema*> data_file_id_to_field_schema;
1026
2.67k
    for (int idx = 0; idx < _data_file_field_desc->size(); ++idx) {
1027
2.00k
        auto field_schema = _data_file_field_desc->get_column(idx);
1028
2.00k
        if (_data_file_field_desc->get_column(idx)->field_id == -1) {
1029
0
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
1030
0
        }
1031
2.00k
        data_file_id_to_field_schema[_data_file_field_desc->get_column(idx)->field_id] =
1032
2.00k
                field_schema;
1033
2.00k
    }
1034
1035
1.52k
    for (const auto& delete_file : delete_files) {
1036
1.52k
        TFileRangeDesc delete_desc;
1037
        // must use __set() method to make sure __isset is true
1038
1.52k
        delete_desc.__set_fs_name(_range.fs_name);
1039
1.52k
        delete_desc.path = delete_file.path;
1040
1.52k
        delete_desc.start_offset = 0;
1041
1.52k
        delete_desc.size = -1;
1042
1.52k
        delete_desc.file_size = -1;
1043
1044
1.52k
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1045
0
            return Status::InternalError(
1046
0
                    "missing delete field ids when reading equality delete file");
1047
0
        }
1048
1.52k
        auto& read_column_field_ids = delete_file.field_ids;
1049
1.52k
        std::set<int> read_column_field_ids_set;
1050
2.37k
        for (const auto& field_id : read_column_field_ids) {
1051
2.37k
            read_column_field_ids_set.insert(field_id);
1052
2.37k
            _equality_delete_col_ids.insert(field_id);
1053
2.37k
        }
1054
1055
1.52k
        auto delete_reader = ParquetReader::create_unique(
1056
1.52k
                _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
1057
1.52k
                &_state->timezone_obj(), _io_ctx, _state, _meta_cache);
1058
1.52k
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1059
1060
        // the column that to read equality delete file.
1061
        // (delete file may be have extra columns that don't need to read)
1062
1.52k
        std::vector<std::string> delete_col_names;
1063
1.52k
        std::vector<DataTypePtr> delete_col_types;
1064
1.52k
        std::vector<int> delete_col_ids;
1065
1.52k
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1066
1067
1.52k
        const FieldDescriptor* delete_field_desc = nullptr;
1068
1.52k
        RETURN_IF_ERROR(delete_reader->get_file_metadata_schema(&delete_field_desc));
1069
1.52k
        DCHECK(delete_field_desc != nullptr);
1070
1071
1.52k
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1072
2.36k
        for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) {
1073
2.36k
            if (delete_file_field.field_id == -1) [[unlikely]] { // missing delete_file_field id
1074
                // equality delete file must have delete_file_field id to match column.
1075
0
                return Status::DataQualityError(
1076
0
                        "missing delete_file_field id when reading equality delete file");
1077
2.37k
            } else if (read_column_field_ids_set.contains(delete_file_field.field_id)) {
1078
                // the column that need to read.
1079
2.37k
                if (delete_file_field.children.size() > 0) [[unlikely]] { // complex column
1080
0
                    return Status::InternalError(
1081
0
                            "can not support read complex column in equality delete file");
1082
2.37k
                } else if (!data_file_id_to_field_schema.contains(delete_file_field.field_id))
1083
0
                        [[unlikely]] {
1084
0
                    return Status::DataQualityError(
1085
0
                            "can not find delete field id in data file schema when reading "
1086
0
                            "equality delete file");
1087
0
                }
1088
2.37k
                auto data_file_field = data_file_id_to_field_schema[delete_file_field.field_id];
1089
2.37k
                if (data_file_field->data_type->get_primitive_type() !=
1090
2.37k
                    delete_file_field.data_type->get_primitive_type()) [[unlikely]] {
1091
0
                    return Status::NotSupported(
1092
0
                            "Not Support type change in equality delete, field: {}, delete "
1093
0
                            "file type: {}, data file type: {}",
1094
0
                            delete_file_field.field_id, delete_file_field.data_type->get_name(),
1095
0
                            data_file_field->data_type->get_name());
1096
0
                }
1097
1098
2.37k
                std::string filed_lower_name = to_lower(delete_file_field.name);
1099
2.37k
                eq_file_node->add_children(filed_lower_name, delete_file_field.name,
1100
2.37k
                                           std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1101
1102
2.37k
                delete_col_ids.emplace_back(delete_file_field.field_id);
1103
2.37k
                delete_col_names.emplace_back(filed_lower_name);
1104
2.37k
                delete_col_types.emplace_back(make_nullable(delete_file_field.data_type));
1105
1106
2.37k
                read_column_field_ids_set.erase(delete_file_field.field_id);
1107
18.4E
            } else {
1108
                // delete file may be have extra columns that don't need to read
1109
18.4E
            }
1110
2.36k
        }
1111
1.52k
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1112
0
            return Status::DataQualityError("some field ids not found in equality delete file.");
1113
0
        }
1114
1115
3.89k
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1116
2.37k
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1117
2.37k
        }
1118
1.52k
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
1119
1.52k
        RETURN_IF_ERROR(delete_reader->init_reader(delete_col_names, &delete_col_name_to_block_idx,
1120
1.52k
                                                   {}, tmp, nullptr, nullptr, nullptr, nullptr,
1121
1.52k
                                                   nullptr, eq_file_node, false));
1122
1.52k
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1123
1124
1.52k
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1125
850
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1126
850
            Block block;
1127
850
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1128
850
            _equality_delete_blocks.emplace_back(block);
1129
850
        }
1130
1.52k
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1131
1.52k
        bool eof = false;
1132
3.04k
        while (!eof) {
1133
1.52k
            Block tmp_block;
1134
1.52k
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1135
1.52k
            size_t read_rows = 0;
1136
1.52k
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1137
1.52k
            if (read_rows > 0) {
1138
1.52k
                MutableBlock mutable_block(&eq_file_block);
1139
1.52k
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1140
1.52k
            }
1141
1.52k
        }
1142
1.52k
    }
1143
1144
852
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1145
852
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1146
852
        auto equality_delete_impl =
1147
852
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1148
852
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1149
852
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1150
852
    }
1151
670
    return Status::OK();
1152
670
}
1153
1154
Status IcebergOrcReader::_process_equality_delete(
1155
670
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
1156
670
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
1157
670
            partition_columns;
1158
670
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
1159
1160
670
    std::map<int, int> data_file_id_to_field_idx;
1161
2.67k
    for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); ++idx) {
1162
2.00k
        if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
1163
0
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
1164
0
        }
1165
2.00k
        auto field_id = std::stoi(
1166
2.00k
                _data_file_type_desc->getSubtype(idx)->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1167
2.00k
        data_file_id_to_field_idx[field_id] = idx;
1168
2.00k
    }
1169
1170
1.52k
    for (const auto& delete_file : delete_files) {
1171
1.52k
        TFileRangeDesc delete_desc;
1172
        // must use __set() method to make sure __isset is true
1173
1.52k
        delete_desc.__set_fs_name(_range.fs_name);
1174
1.52k
        delete_desc.path = delete_file.path;
1175
1.52k
        delete_desc.start_offset = 0;
1176
1.52k
        delete_desc.size = -1;
1177
1.52k
        delete_desc.file_size = -1;
1178
1179
1.52k
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1180
0
            return Status::InternalError(
1181
0
                    "missing delete field ids when reading equality delete file");
1182
0
        }
1183
1.52k
        auto& read_column_field_ids = delete_file.field_ids;
1184
1.52k
        std::set<int> read_column_field_ids_set;
1185
2.37k
        for (const auto& field_id : read_column_field_ids) {
1186
2.37k
            read_column_field_ids_set.insert(field_id);
1187
2.37k
            _equality_delete_col_ids.insert(field_id);
1188
2.37k
        }
1189
1190
1.52k
        auto delete_reader = OrcReader::create_unique(_profile, _state, _params, delete_desc,
1191
1.52k
                                                      READ_DELETE_FILE_BATCH_SIZE,
1192
1.52k
                                                      _state->timezone(), _io_ctx, _meta_cache);
1193
1.52k
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1194
        // delete file schema
1195
1.52k
        std::vector<std::string> delete_file_col_names;
1196
1.52k
        std::vector<DataTypePtr> delete_file_col_types;
1197
1.52k
        RETURN_IF_ERROR(
1198
1.52k
                delete_reader->get_parsed_schema(&delete_file_col_names, &delete_file_col_types));
1199
1200
        // the column that to read equality delete file.
1201
        // (delete file maybe have extra columns that don't need to read)
1202
1.52k
        std::vector<std::string> delete_col_names;
1203
1.52k
        std::vector<DataTypePtr> delete_col_types;
1204
1.52k
        std::vector<int> delete_col_ids;
1205
1.52k
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1206
1207
1.52k
        const orc::Type* delete_field_desc = nullptr;
1208
1.52k
        RETURN_IF_ERROR(delete_reader->get_file_type(&delete_field_desc));
1209
1.52k
        DCHECK(delete_field_desc != nullptr);
1210
1211
1.52k
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1212
1213
3.89k
        for (size_t idx = 0; idx < delete_field_desc->getSubtypeCount(); idx++) {
1214
2.36k
            auto delete_file_field = delete_field_desc->getSubtype(idx);
1215
1216
2.36k
            if (!delete_file_field->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE))
1217
0
                    [[unlikely]] { // missing delete_file_field id
1218
                // equality delete file must have delete_file_field id to match column.
1219
0
                return Status::DataQualityError(
1220
0
                        "missing delete_file_field id when reading equality delete file");
1221
2.36k
            } else {
1222
2.36k
                auto delete_field_id =
1223
2.36k
                        std::stoi(delete_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1224
2.37k
                if (read_column_field_ids_set.contains(delete_field_id)) {
1225
                    // the column that need to read.
1226
2.37k
                    if (is_complex_type(delete_file_col_types[idx]->get_primitive_type()))
1227
0
                            [[unlikely]] {
1228
0
                        return Status::InternalError(
1229
0
                                "can not support read complex column in equality delete file.");
1230
2.37k
                    } else if (!data_file_id_to_field_idx.contains(delete_field_id)) [[unlikely]] {
1231
0
                        return Status::DataQualityError(
1232
0
                                "can not find delete field id in data file schema when reading "
1233
0
                                "equality delete file");
1234
0
                    }
1235
1236
2.37k
                    auto data_file_field = _data_file_type_desc->getSubtype(
1237
2.37k
                            data_file_id_to_field_idx[delete_field_id]);
1238
1239
2.37k
                    if (delete_file_field->getKind() != data_file_field->getKind()) [[unlikely]] {
1240
0
                        return Status::NotSupported(
1241
0
                                "Not Support type change in equality delete, field: {}, delete "
1242
0
                                "file type: {}, data file type: {}",
1243
0
                                delete_field_id, delete_file_field->getKind(),
1244
0
                                data_file_field->getKind());
1245
0
                    }
1246
2.37k
                    std::string filed_lower_name = to_lower(delete_field_desc->getFieldName(idx));
1247
2.37k
                    eq_file_node->add_children(
1248
2.37k
                            filed_lower_name, delete_field_desc->getFieldName(idx),
1249
2.37k
                            std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1250
1251
2.37k
                    delete_col_ids.emplace_back(delete_field_id);
1252
2.37k
                    delete_col_names.emplace_back(filed_lower_name);
1253
2.37k
                    delete_col_types.emplace_back(make_nullable(delete_file_col_types[idx]));
1254
2.37k
                    read_column_field_ids_set.erase(delete_field_id);
1255
2.37k
                }
1256
2.36k
            }
1257
2.36k
        }
1258
1.52k
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1259
0
            return Status::DataQualityError("some field ids not found in equality delete file.");
1260
0
        }
1261
1262
3.89k
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1263
2.36k
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1264
2.36k
        }
1265
1266
1.52k
        RETURN_IF_ERROR(delete_reader->init_reader(&delete_col_names, &delete_col_name_to_block_idx,
1267
1.52k
                                                   {}, false, nullptr, nullptr, nullptr, nullptr,
1268
1.52k
                                                   eq_file_node));
1269
1.52k
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1270
1271
1.52k
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1272
852
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1273
852
            Block block;
1274
852
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1275
852
            _equality_delete_blocks.emplace_back(block);
1276
852
        }
1277
1.52k
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1278
1.52k
        bool eof = false;
1279
4.57k
        while (!eof) {
1280
3.04k
            Block tmp_block;
1281
3.04k
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1282
3.04k
            size_t read_rows = 0;
1283
3.04k
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1284
3.04k
            if (read_rows > 0) {
1285
1.52k
                MutableBlock mutable_block(&eq_file_block);
1286
1.52k
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1287
1.52k
            }
1288
3.04k
        }
1289
1.52k
    }
1290
1291
852
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1292
852
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1293
852
        auto equality_delete_impl =
1294
852
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1295
852
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1296
852
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1297
852
    }
1298
670
    return Status::OK();
1299
670
}
1300
#include "common/compile_check_end.h"
1301
} // namespace doris