Coverage Report

Created: 2026-04-14 12:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/iceberg_reader.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <glog/logging.h>
25
#include <parallel_hashmap/phmap.h>
26
#include <rapidjson/document.h>
27
28
#include <algorithm>
29
#include <cstring>
30
#include <functional>
31
#include <memory>
32
#include <set>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/data_type/data_type_factory.hpp"
41
#include "exprs/aggregate/aggregate_function.h"
42
#include "format/format_common.h"
43
#include "format/generic_reader.h"
44
#include "format/orc/vorc_reader.h"
45
#include "format/parquet/schema_desc.h"
46
#include "format/parquet/vparquet_column_chunk_reader.h"
47
#include "format/table/deletion_vector_reader.h"
48
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
49
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
50
#include "format/table/iceberg_delete_file_reader_helper.h"
51
#include "format/table/nested_column_access_helper.h"
52
#include "format/table/table_format_reader.h"
53
#include "runtime/runtime_state.h"
54
#include "util/coding.h"
55
56
namespace cctz {
57
class time_zone;
58
} // namespace cctz
59
namespace doris {
60
class RowDescriptor;
61
class SlotDescriptor;
62
class TupleDescriptor;
63
64
namespace io {
65
struct IOContext;
66
} // namespace io
67
class VExprContext;
68
} // namespace doris
69
70
namespace doris {
71
namespace {
72
73
class GroupedDeleteRowsVisitor final : public IcebergPositionDeleteVisitor {
74
public:
75
    using DeleteRows = std::vector<int64_t>;
76
    using DeleteFile = phmap::parallel_flat_hash_map<
77
            std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>,
78
            std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8,
79
            std::mutex>;
80
81
    explicit GroupedDeleteRowsVisitor(DeleteFile* position_delete)
82
1.75k
            : _position_delete(position_delete) {}
83
84
479k
    Status visit(const std::string& file_path, int64_t pos) override {
85
479k
        if (_position_delete == nullptr) {
86
0
            return Status::InvalidArgument("position delete map is null");
87
0
        }
88
89
479k
        auto iter = _position_delete->find(file_path);
90
479k
        DeleteRows* delete_rows = nullptr;
91
479k
        if (iter == _position_delete->end()) {
92
6.65k
            delete_rows = new DeleteRows;
93
6.65k
            (*_position_delete)[file_path] = std::unique_ptr<DeleteRows>(delete_rows);
94
473k
        } else {
95
473k
            delete_rows = iter->second.get();
96
473k
        }
97
479k
        delete_rows->push_back(pos);
98
479k
        return Status::OK();
99
479k
    }
100
101
private:
102
    DeleteFile* _position_delete;
103
};
104
105
} // namespace
106
107
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
108
109
bool IcebergTableReader::_is_fully_dictionary_encoded(
110
796
        const tparquet::ColumnMetaData& column_metadata) {
111
802
    const auto is_dictionary_encoding = [](tparquet::Encoding::type encoding) {
112
802
        return encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
113
802
               encoding == tparquet::Encoding::RLE_DICTIONARY;
114
802
    };
115
1.21k
    const auto is_data_page = [](tparquet::PageType::type page_type) {
116
1.21k
        return page_type == tparquet::PageType::DATA_PAGE ||
117
1.21k
               page_type == tparquet::PageType::DATA_PAGE_V2;
118
1.21k
    };
119
796
    const auto is_level_encoding = [](tparquet::Encoding::type encoding) {
120
2
        return encoding == tparquet::Encoding::RLE || encoding == tparquet::Encoding::BIT_PACKED;
121
2
    };
122
123
    // A column chunk may have a dictionary page but still contain plain-encoded data pages.
124
    // Only treat it as dictionary-coded when all data pages are dictionary encoded.
125
796
    if (column_metadata.__isset.encoding_stats) {
126
795
        bool has_data_page_stats = false;
127
1.21k
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
128
1.21k
            if (is_data_page(enc_stat.page_type) && enc_stat.count > 0) {
129
796
                has_data_page_stats = true;
130
796
                if (!is_dictionary_encoding(enc_stat.encoding)) {
131
372
                    return false;
132
372
                }
133
796
            }
134
1.21k
        }
135
423
        if (has_data_page_stats) {
136
422
            return true;
137
422
        }
138
423
    }
139
140
2
    bool has_dict_encoding = false;
141
2
    bool has_nondict_encoding = false;
142
3
    for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
143
3
        if (is_dictionary_encoding(encoding)) {
144
1
            has_dict_encoding = true;
145
1
        }
146
147
3
        if (!is_dictionary_encoding(encoding) && !is_level_encoding(encoding)) {
148
2
            has_nondict_encoding = true;
149
2
            break;
150
2
        }
151
3
    }
152
2
    if (!has_dict_encoding || has_nondict_encoding) {
153
2
        return false;
154
2
    }
155
156
0
    return true;
157
2
}
158
159
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader,
160
                                       RuntimeProfile* profile, RuntimeState* state,
161
                                       const TFileScanRangeParams& params,
162
                                       const TFileRangeDesc& range, ShardedKVCache* kv_cache,
163
                                       io::IOContext* io_ctx, FileMetaCache* meta_cache)
164
24.5k
        : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx,
165
24.5k
                            meta_cache),
166
24.5k
          _kv_cache(kv_cache) {
167
24.5k
    static const char* iceberg_profile = "IcebergProfile";
168
24.5k
    ADD_TIMER(_profile, iceberg_profile);
169
24.5k
    _iceberg_profile.num_delete_files =
170
24.5k
            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile);
171
24.5k
    _iceberg_profile.num_delete_rows =
172
24.5k
            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
173
24.5k
    _iceberg_profile.delete_files_read_time =
174
24.5k
            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
175
24.5k
    _iceberg_profile.delete_rows_sort_time =
176
24.5k
            ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
177
24.5k
    _iceberg_profile.parse_delete_file_time =
178
24.5k
            ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile);
179
24.5k
}
180
181
31.8k
Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
182
31.8k
    RETURN_IF_ERROR(_expand_block_if_need(block));
183
184
31.8k
    RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
185
186
31.8k
    if (_equality_delete_impls.size() > 0) {
187
1.96k
        std::unique_ptr<IColumn::Filter> filter =
188
1.96k
                std::make_unique<IColumn::Filter>(block->rows(), 1);
189
2.48k
        for (auto& equality_delete_impl : _equality_delete_impls) {
190
2.48k
            RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
191
2.48k
                    block, _col_name_to_block_idx, _id_to_block_column_name, *filter));
192
2.48k
        }
193
1.96k
        Block::filter_block_internal(block, *filter, block->columns());
194
1.96k
    }
195
196
31.8k
    *read_rows = block->rows();
197
31.8k
    return _shrink_block_if_need(block);
198
31.8k
}
199
200
24.4k
Status IcebergTableReader::init_row_filters() {
201
    // We get the count value by doris's be, so we don't need to read the delete file
202
24.4k
    if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) {
203
114
        return Status::OK();
204
114
    }
205
206
24.3k
    const auto& table_desc = _range.table_format_params.iceberg_params;
207
24.3k
    const auto& version = table_desc.format_version;
208
24.3k
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
209
1.68k
        return Status::OK();
210
1.68k
    }
211
212
22.7k
    auto* parquet_reader = dynamic_cast<ParquetReader*>(_file_format_reader.get());
213
22.7k
    auto* orc_reader = dynamic_cast<OrcReader*>(_file_format_reader.get());
214
215
    // Initialize file information for $row_id generation
216
    // Extract from table_desc which contains current file's metadata
217
22.7k
    if (_need_row_id_column) {
218
156
        std::string file_path = table_desc.original_file_path;
219
156
        int32_t partition_spec_id = 0;
220
156
        std::string partition_data_json;
221
156
        if (table_desc.__isset.partition_spec_id) {
222
52
            partition_spec_id = table_desc.partition_spec_id;
223
52
        }
224
156
        if (table_desc.__isset.partition_data_json) {
225
52
            partition_data_json = table_desc.partition_data_json;
226
52
        }
227
228
156
        if (parquet_reader != nullptr) {
229
78
            parquet_reader->set_iceberg_rowid_params(file_path, partition_spec_id,
230
78
                                                     partition_data_json, _row_id_column_position);
231
78
        } else if (orc_reader != nullptr) {
232
78
            orc_reader->set_iceberg_rowid_params(file_path, partition_spec_id, partition_data_json,
233
78
                                                 _row_id_column_position);
234
78
        }
235
156
        LOG(INFO) << "Initialized $row_id generation for file: " << file_path
236
156
                  << ", partition_spec_id: " << partition_spec_id;
237
156
    }
238
239
22.7k
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
240
22.7k
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
241
22.7k
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
242
22.7k
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
243
10.8k
        if (desc.content == POSITION_DELETE) {
244
5.36k
            position_delete_files.emplace_back(desc);
245
5.48k
        } else if (desc.content == EQUALITY_DELETE) {
246
3.04k
            equality_delete_files.emplace_back(desc);
247
3.04k
        } else if (desc.content == DELETION_VECTOR) {
248
2.44k
            deletion_vector_files.emplace_back(desc);
249
2.44k
        }
250
10.8k
    }
251
252
22.7k
    if (!equality_delete_files.empty()) {
253
1.34k
        RETURN_IF_ERROR(_process_equality_delete(equality_delete_files));
254
1.34k
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
255
1.34k
    }
256
257
22.7k
    if (!deletion_vector_files.empty()) {
258
2.44k
        if (deletion_vector_files.size() != 1) [[unlikely]] {
259
            /*
260
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
261
             * at execution time than position delete files. Unlike equality or position delete files, there can be
262
             * at most one deletion vector for a given data file in a snapshot.
263
             */
264
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
265
0
        }
266
2.44k
        RETURN_IF_ERROR(
267
2.44k
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
268
269
2.44k
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
270
        // Readers can safely ignore position delete files if there is a DV for a data file.
271
20.2k
    } else if (!position_delete_files.empty()) {
272
1.83k
        RETURN_IF_ERROR(
273
1.83k
                _position_delete_base(table_desc.original_file_path, position_delete_files));
274
1.83k
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
275
1.83k
    }
276
277
22.7k
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
278
22.7k
    return Status::OK();
279
22.7k
}
280
281
void IcebergTableReader::_generate_equality_delete_block(
282
        Block* block, const std::vector<std::string>& equality_delete_col_names,
283
6.27k
        const std::vector<DataTypePtr>& equality_delete_col_types) {
284
16.0k
    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
285
9.76k
        DataTypePtr data_type = make_nullable(equality_delete_col_types[i]);
286
9.76k
        MutableColumnPtr data_column = data_type->create_column();
287
9.76k
        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
288
9.76k
                                            equality_delete_col_names[i]));
289
9.76k
    }
290
6.27k
}
291
292
31.7k
Status IcebergTableReader::_expand_block_if_need(Block* block) {
293
31.7k
    std::set<std::string> names;
294
31.7k
    auto block_names = block->get_names();
295
31.7k
    names.insert(block_names.begin(), block_names.end());
296
31.7k
    for (auto& col : _expand_columns) {
297
946
        col.column->assume_mutable()->clear();
298
946
        if (names.contains(col.name)) {
299
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
300
0
        }
301
946
        names.insert(col.name);
302
946
        (*_col_name_to_block_idx)[col.name] = static_cast<uint32_t>(block->columns());
303
946
        block->insert(col);
304
946
    }
305
31.7k
    return Status::OK();
306
31.7k
}
307
308
31.8k
Status IcebergTableReader::_shrink_block_if_need(Block* block) {
309
31.8k
    std::set<size_t> positions_to_erase;
310
31.8k
    for (const std::string& expand_col : _expand_col_names) {
311
946
        if (!_col_name_to_block_idx->contains(expand_col)) {
312
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
313
0
                                         block->dump_names());
314
0
        }
315
946
        positions_to_erase.emplace((*_col_name_to_block_idx)[expand_col]);
316
946
    }
317
31.8k
    block->erase(positions_to_erase);
318
31.8k
    for (const std::string& expand_col : _expand_col_names) {
319
946
        _col_name_to_block_idx->erase(expand_col);
320
946
    }
321
31.8k
    return Status::OK();
322
31.8k
}
323
324
Status IcebergTableReader::_position_delete_base(
325
1.83k
        const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
326
1.83k
    std::vector<DeleteRows*> delete_rows_array;
327
1.83k
    int64_t num_delete_rows = 0;
328
5.36k
    for (const auto& delete_file : delete_files) {
329
5.36k
        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
330
5.36k
        Status create_status = Status::OK();
331
5.36k
        auto* delete_file_cache = _kv_cache->get<DeleteFile>(
332
5.36k
                _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* {
333
1.75k
                    auto* position_delete = new DeleteFile;
334
1.75k
                    create_status = _read_position_delete_file(delete_file, position_delete);
335
336
1.75k
                    if (!create_status) {
337
0
                        return nullptr;
338
0
                    }
339
340
1.75k
                    return position_delete;
341
1.75k
                });
342
5.36k
        if (create_status.is<ErrorCode::END_OF_FILE>()) {
343
0
            continue;
344
5.36k
        } else if (!create_status.ok()) {
345
0
            return create_status;
346
0
        }
347
348
5.36k
        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
349
5.36k
        auto get_value = [&](const auto& v) {
350
3.50k
            DeleteRows* row_ids = v.second.get();
351
3.50k
            if (!row_ids->empty()) {
352
3.50k
                delete_rows_array.emplace_back(row_ids);
353
3.50k
                num_delete_rows += row_ids->size();
354
3.50k
            }
355
3.50k
        };
356
5.36k
        delete_file_map.if_contains(data_file_path, get_value);
357
5.36k
    }
358
    // Use a KV cache to store the delete rows corresponding to a data file path.
359
    // The Parquet/ORC reader holds a reference (pointer) to this cached entry.
360
    // This allows delete rows to be reused when a single data file is split into
361
    // multiple splits, avoiding excessive memory usage when delete rows are large.
362
1.83k
    if (num_delete_rows > 0) {
363
1.47k
        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
364
1.47k
        _iceberg_delete_rows =
365
1.47k
                _kv_cache->get<DeleteRows>(data_file_path,
366
1.47k
                                           [&]() -> DeleteRows* {
367
1.47k
                                               auto* data_file_position_delete = new DeleteRows;
368
1.47k
                                               _sort_delete_rows(delete_rows_array, num_delete_rows,
369
1.47k
                                                                 *data_file_position_delete);
370
371
1.47k
                                               return data_file_position_delete;
372
1.47k
                                           }
373
374
1.47k
                );
375
1.47k
        set_delete_rows();
376
1.47k
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
377
1.47k
    }
378
1.83k
    return Status::OK();
379
1.83k
}
380
381
Status IcebergTableReader::_read_position_delete_file(const TIcebergDeleteFileDesc& delete_file,
382
1.75k
                                                      DeleteFile* position_delete) {
383
1.75k
    GroupedDeleteRowsVisitor visitor(position_delete);
384
1.75k
    IcebergDeleteFileReaderOptions options;
385
1.75k
    options.state = _state;
386
1.75k
    options.profile = _profile;
387
1.75k
    options.scan_params = &_params;
388
1.75k
    options.io_ctx = _io_ctx;
389
1.75k
    options.meta_cache = _meta_cache;
390
1.75k
    options.fs_name = &_range.fs_name;
391
1.75k
    options.batch_size = READ_DELETE_FILE_BATCH_SIZE;
392
1.75k
    return read_iceberg_position_delete_file(delete_file, options, &visitor);
393
1.75k
}
394
395
/**
396
 * https://iceberg.apache.org/spec/#position-delete-files
397
 * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
398
 * Sorting by file_path allows filter pushdown by file in columnar storage formats.
399
 * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
400
 */
401
void IcebergTableReader::_sort_delete_rows(
402
        const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows,
403
1.47k
        std::vector<int64_t>& result) {
404
1.47k
    if (delete_rows_array.empty()) {
405
0
        return;
406
0
    }
407
1.47k
    if (delete_rows_array.size() == 1) {
408
764
        result.resize(num_delete_rows);
409
764
        memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows);
410
764
        return;
411
764
    }
412
712
    if (delete_rows_array.size() == 2) {
413
56
        result.resize(num_delete_rows);
414
56
        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
415
56
                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
416
56
                   result.begin());
417
56
        return;
418
56
    }
419
420
656
    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
421
656
    result.resize(num_delete_rows);
422
656
    auto row_id_iter = result.begin();
423
656
    auto iter_end = result.end();
424
656
    std::vector<vec_pair> rows_array;
425
2.62k
    for (auto* rows : delete_rows_array) {
426
2.62k
        if (!rows->empty()) {
427
2.62k
            rows_array.emplace_back(rows->begin(), rows->end());
428
2.62k
        }
429
2.62k
    }
430
656
    size_t array_size = rows_array.size();
431
320k
    while (row_id_iter != iter_end) {
432
320k
        int64_t min_index = 0;
433
320k
        int64_t min = *rows_array[0].first;
434
1.59M
        for (size_t i = 0; i < array_size; ++i) {
435
1.27M
            if (*rows_array[i].first < min) {
436
315k
                min_index = i;
437
315k
                min = *rows_array[i].first;
438
315k
            }
439
1.27M
        }
440
320k
        *row_id_iter++ = min;
441
320k
        rows_array[min_index].first++;
442
320k
        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
443
2.62k
            rows_array.erase(rows_array.begin() + min_index);
444
2.62k
            array_size--;
445
2.62k
        }
446
320k
    }
447
656
}
448
449
Status IcebergParquetReader::init_reader(
450
        const std::vector<std::string>& file_col_names,
451
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
452
        const VExprContextSPtrs& conjuncts,
453
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
454
                slot_id_to_predicates,
455
        const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
456
        const std::unordered_map<std::string, int>* colname_to_slot_id,
457
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
458
18.0k
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
459
18.0k
    _file_format = Fileformat::PARQUET;
460
18.0k
    _col_name_to_block_idx = col_name_to_block_idx;
461
18.0k
    auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
462
18.0k
    RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&_data_file_field_desc));
463
18.0k
    DCHECK(_data_file_field_desc != nullptr);
464
18.0k
    if (_row_lineage_columns != nullptr) {
465
236
        const auto& table_desc = _range.table_format_params.iceberg_params;
466
236
        _row_lineage_columns->first_row_id =
467
236
                table_desc.__isset.first_row_id ? table_desc.first_row_id : -1;
468
236
        _row_lineage_columns->last_updated_sequence_number =
469
236
                table_desc.__isset.last_updated_sequence_number
470
236
                        ? table_desc.last_updated_sequence_number
471
236
                        : -1;
472
236
        parquet_reader->set_row_lineage_columns(_row_lineage_columns);
473
236
    }
474
475
18.0k
    auto column_id_result = _create_column_ids(_data_file_field_desc, tuple_descriptor);
476
18.0k
    auto& column_ids = column_id_result.column_ids;
477
18.0k
    const auto& filter_column_ids = column_id_result.filter_column_ids;
478
479
18.0k
    RETURN_IF_ERROR(init_row_filters());
480
18.0k
    _all_required_col_names = file_col_names;
481
482
18.0k
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
483
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(
484
1
                tuple_descriptor, *_data_file_field_desc, table_info_node_ptr));
485
18.0k
    } else {
486
18.0k
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
487
488
18.0k
        bool exist_field_id = true;
489
141k
        for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
490
123k
            if (_data_file_field_desc->get_column(idx)->field_id == -1) {
491
                // the data file may be from hive table migrated to iceberg, field id is missing
492
2
                exist_field_id = false;
493
2
                break;
494
2
            }
495
123k
        }
496
18.0k
        const auto& table_schema = _params.history_schema_info.front().root_field;
497
498
18.0k
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
499
18.0k
        if (exist_field_id) {
500
            // id -> table column name. columns that need read data file.
501
18.0k
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
502
56.8k
            for (const auto& table_field : table_schema.fields) {
503
56.8k
                auto field = table_field.field_ptr;
504
56.8k
                DCHECK(field->__isset.name);
505
56.8k
                if (!read_col_name_set.contains(field->name)) {
506
2.49k
                    continue;
507
2.49k
                }
508
54.3k
                id_to_table_field.emplace(field->id, field);
509
54.3k
            }
510
511
141k
            for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
512
123k
                const auto& data_file_field = _data_file_field_desc->get_column(idx);
513
123k
                auto data_file_column_id = _data_file_field_desc->get_column(idx)->field_id;
514
515
123k
                if (id_to_table_field.contains(data_file_column_id)) {
516
51.1k
                    const auto& table_field = id_to_table_field[data_file_column_id];
517
518
51.1k
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
519
51.1k
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
520
51.1k
                            *table_field, *data_file_field, exist_field_id, field_node));
521
51.1k
                    table_info_node_ptr->add_children(table_field->name, data_file_field->name,
522
51.1k
                                                      field_node);
523
524
51.1k
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
525
51.1k
                    id_to_table_field.erase(data_file_column_id);
526
72.3k
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
527
                    // Columns that need to be read for equality delete.
528
318
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
529
530
                    // Construct table column names that avoid duplication with current table schema.
531
                    // As the columns currently being read may have been deleted in the latest
532
                    // table structure or have undergone a series of schema changes...
533
318
                    std::string table_column_name = EQ_DELETE_PRE + data_file_field->name;
534
318
                    table_info_node_ptr->add_children(
535
318
                            table_column_name, data_file_field->name,
536
318
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
537
538
318
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
539
318
                    _expand_col_names.emplace_back(table_column_name);
540
318
                    auto expand_data_type = make_nullable(data_file_field->data_type);
541
318
                    _expand_columns.emplace_back(
542
318
                            ColumnWithTypeAndName {expand_data_type->create_column(),
543
318
                                                   expand_data_type, table_column_name});
544
545
318
                    _all_required_col_names.emplace_back(table_column_name);
546
318
                    column_ids.insert(data_file_field->get_column_id());
547
318
                }
548
123k
            }
549
18.0k
            for (const auto& [id, table_field] : id_to_table_field) {
550
3.25k
                table_info_node_ptr->add_not_exist_children(table_field->name);
551
3.25k
            }
552
18.0k
        } else {
553
26
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
554
0
                return Status::InternalError(
555
0
                        "Can not read missing field id data file when have equality delete");
556
0
            }
557
26
            std::map<std::string, size_t> file_column_idx_map;
558
34
            for (size_t idx = 0; idx < _data_file_field_desc->size(); idx++) {
559
8
                file_column_idx_map.emplace(_data_file_field_desc->get_column(idx)->name, idx);
560
8
            }
561
562
26
            for (const auto& table_field : table_schema.fields) {
563
2
                DCHECK(table_field.__isset.field_ptr);
564
2
                DCHECK(table_field.field_ptr->__isset.name);
565
2
                const auto& table_column_name = table_field.field_ptr->name;
566
2
                if (!read_col_name_set.contains(table_column_name)) {
567
0
                    continue;
568
0
                }
569
2
                if (!table_field.field_ptr->__isset.name_mapping ||
570
2
                    table_field.field_ptr->name_mapping.size() == 0) {
571
2
                    return Status::DataQualityError(
572
2
                            "name_mapping must be set when read missing field id data file.");
573
2
                }
574
0
                bool have_mapping = false;
575
0
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
576
0
                    if (file_column_idx_map.contains(mapped_name)) {
577
0
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
578
0
                        const auto& file_field = _data_file_field_desc->get_column(
579
0
                                file_column_idx_map.at(mapped_name));
580
0
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
581
0
                                *table_field.field_ptr, *file_field, exist_field_id, field_node));
582
0
                        table_info_node_ptr->add_children(table_column_name, file_field->name,
583
0
                                                          field_node);
584
0
                        have_mapping = true;
585
0
                        break;
586
0
                    }
587
0
                }
588
0
                if (!have_mapping) {
589
0
                    table_info_node_ptr->add_not_exist_children(table_column_name);
590
0
                }
591
0
            }
592
26
        }
593
18.0k
    }
594
595
18.0k
    return parquet_reader->init_reader(
596
18.0k
            _all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates,
597
18.0k
            tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts,
598
18.0k
            slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids);
599
18.0k
}
600
601
ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
602
18.0k
                                                        const TupleDescriptor* tuple_descriptor) {
603
    // First, assign column IDs to the field descriptor
604
18.0k
    auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc);
605
18.0k
    mutable_field_desc->assign_ids();
606
607
    // map top-level table column iceberg_id -> FieldSchema*
608
18.0k
    std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map;
609
610
141k
    for (int i = 0; i < field_desc->size(); ++i) {
611
123k
        auto field_schema = field_desc->get_column(i);
612
123k
        if (!field_schema) continue;
613
614
123k
        int iceberg_id = field_schema->field_id;
615
123k
        iceberg_id_to_field_schema_map[iceberg_id] = field_schema;
616
123k
    }
617
618
18.0k
    std::set<uint64_t> column_ids;
619
18.0k
    std::set<uint64_t> filter_column_ids;
620
621
    // helper to process access paths for a given top-level parquet field
622
18.0k
    auto process_access_paths = [](const FieldSchema* parquet_field,
623
18.0k
                                   const std::vector<TColumnAccessPath>& access_paths,
624
18.0k
                                   std::set<uint64_t>& out_ids) {
625
15.7k
        process_nested_access_paths(
626
15.7k
                parquet_field, access_paths, out_ids,
627
15.7k
                [](const FieldSchema* field) { return field->get_column_id(); },
628
15.7k
                [](const FieldSchema* field) { return field->get_max_column_id(); },
629
15.7k
                IcebergParquetNestedColumnUtils::extract_nested_column_ids);
630
15.7k
    };
631
632
54.7k
    for (const auto* slot : tuple_descriptor->slots()) {
633
54.7k
        auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id());
634
54.7k
        if (it == iceberg_id_to_field_schema_map.end()) {
635
            // Column not found in file (e.g., partition column, added column)
636
3.57k
            continue;
637
3.57k
        }
638
51.1k
        auto field_schema = it->second;
639
640
        // primitive (non-nested) types: direct mapping by name
641
51.1k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
642
51.1k
             slot->col_type() != TYPE_MAP)) {
643
36.8k
            column_ids.insert(field_schema->column_id);
644
645
36.8k
            if (slot->is_predicate()) {
646
5.76k
                filter_column_ids.insert(field_schema->column_id);
647
5.76k
            }
648
36.8k
            continue;
649
36.8k
        }
650
651
        // complex types:
652
14.2k
        const auto& all_access_paths = slot->all_access_paths();
653
14.2k
        process_access_paths(field_schema, all_access_paths, column_ids);
654
655
14.2k
        const auto& predicate_access_paths = slot->predicate_access_paths();
656
14.2k
        if (!predicate_access_paths.empty()) {
657
1.58k
            process_access_paths(field_schema, predicate_access_paths, filter_column_ids);
658
1.58k
        }
659
14.2k
    }
660
18.0k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
661
18.0k
}
662
663
Status IcebergOrcReader::init_reader(
664
        const std::vector<std::string>& file_col_names,
665
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
666
        const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
667
        const RowDescriptor* row_descriptor,
668
        const std::unordered_map<std::string, int>* colname_to_slot_id,
669
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
670
6.47k
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
671
6.47k
    _file_format = Fileformat::ORC;
672
6.47k
    _col_name_to_block_idx = col_name_to_block_idx;
673
6.47k
    auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
674
6.47k
    RETURN_IF_ERROR(orc_reader->get_file_type(&_data_file_type_desc));
675
6.47k
    std::vector<std::string> data_file_col_names;
676
6.47k
    std::vector<DataTypePtr> data_file_col_types;
677
6.47k
    RETURN_IF_ERROR(orc_reader->get_parsed_schema(&data_file_col_names, &data_file_col_types));
678
6.47k
    if (_row_lineage_columns != nullptr) {
679
236
        const auto& table_desc = _range.table_format_params.iceberg_params;
680
236
        _row_lineage_columns->first_row_id =
681
236
                table_desc.__isset.first_row_id ? table_desc.first_row_id : -1;
682
236
        _row_lineage_columns->last_updated_sequence_number =
683
236
                table_desc.__isset.last_updated_sequence_number
684
236
                        ? table_desc.last_updated_sequence_number
685
236
                        : -1;
686
236
        orc_reader->set_row_lineage_columns(_row_lineage_columns);
687
236
    }
688
689
6.47k
    auto column_id_result = _create_column_ids(_data_file_type_desc, tuple_descriptor);
690
6.47k
    auto& column_ids = column_id_result.column_ids;
691
6.47k
    const auto& filter_column_ids = column_id_result.filter_column_ids;
692
693
6.47k
    RETURN_IF_ERROR(init_row_filters());
694
695
6.47k
    _all_required_col_names = file_col_names;
696
6.47k
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
697
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, _data_file_type_desc,
698
1
                                                        table_info_node_ptr));
699
6.47k
    } else {
700
6.47k
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
701
702
6.47k
        bool exist_field_id = true;
703
50.7k
        for (size_t idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
704
44.3k
            if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
705
0
                exist_field_id = false;
706
0
                break;
707
0
            }
708
44.3k
        }
709
710
6.47k
        const auto& table_schema = _params.history_schema_info.front().root_field;
711
6.47k
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
712
6.47k
        if (exist_field_id) {
713
            // id -> table column name. columns that need read data file.
714
6.47k
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
715
32.0k
            for (const auto& table_field : table_schema.fields) {
716
32.0k
                auto field = table_field.field_ptr;
717
32.0k
                DCHECK(field->__isset.name);
718
32.0k
                if (!read_col_name_set.contains(field->name)) {
719
548
                    continue;
720
548
                }
721
722
31.5k
                id_to_table_field.emplace(field->id, field);
723
31.5k
            }
724
725
50.8k
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
726
44.3k
                const auto& data_file_field = _data_file_type_desc->getSubtype(idx);
727
44.3k
                auto data_file_column_id =
728
44.3k
                        std::stoi(data_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
729
44.3k
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
730
731
44.3k
                if (id_to_table_field.contains(data_file_column_id)) {
732
29.3k
                    const auto& table_field = id_to_table_field[data_file_column_id];
733
734
29.3k
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
735
29.3k
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
736
29.3k
                            *table_field, data_file_field, ICEBERG_ORC_ATTRIBUTE, exist_field_id,
737
29.3k
                            field_node));
738
29.3k
                    table_info_node_ptr->add_children(table_field->name, file_column_name,
739
29.3k
                                                      field_node);
740
741
29.3k
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
742
29.3k
                    id_to_table_field.erase(data_file_column_id);
743
29.3k
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
744
                    // Columns that need to be read for equality delete.
745
318
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
746
747
                    // Construct table column names that avoid duplication with current table schema.
748
                    // As the columns currently being read may have been deleted in the latest
749
                    // table structure or have undergone a series of schema changes...
750
318
                    std::string table_column_name = EQ_DELETE_PRE + file_column_name;
751
318
                    table_info_node_ptr->add_children(
752
318
                            table_column_name, file_column_name,
753
318
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
754
755
318
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
756
318
                    _expand_col_names.emplace_back(table_column_name);
757
758
318
                    auto expand_data_type = make_nullable(data_file_col_types[idx]);
759
318
                    _expand_columns.emplace_back(
760
318
                            ColumnWithTypeAndName {expand_data_type->create_column(),
761
318
                                                   expand_data_type, table_column_name});
762
763
318
                    _all_required_col_names.emplace_back(table_column_name);
764
318
                    column_ids.insert(data_file_field->getColumnId());
765
318
                }
766
44.3k
            }
767
6.47k
            for (const auto& [id, table_field] : id_to_table_field) {
768
2.18k
                table_info_node_ptr->add_not_exist_children(table_field->name);
769
2.18k
            }
770
6.47k
        } else {
771
2
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
772
0
                return Status::InternalError(
773
0
                        "Can not read missing field id data file when have equality delete");
774
0
            }
775
2
            std::map<std::string, size_t> file_column_idx_map;
776
2
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
777
0
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
778
0
                file_column_idx_map.emplace(file_column_name, idx);
779
0
            }
780
781
2
            for (const auto& table_field : table_schema.fields) {
782
0
                DCHECK(table_field.__isset.field_ptr);
783
0
                DCHECK(table_field.field_ptr->__isset.name);
784
0
                const auto& table_column_name = table_field.field_ptr->name;
785
0
                if (!read_col_name_set.contains(table_column_name)) {
786
0
                    continue;
787
0
                }
788
0
                if (!table_field.field_ptr->__isset.name_mapping ||
789
0
                    table_field.field_ptr->name_mapping.size() == 0) {
790
0
                    return Status::DataQualityError(
791
0
                            "name_mapping must be set when read missing field id data file.");
792
0
                }
793
0
                auto have_mapping = false;
794
0
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
795
0
                    if (file_column_idx_map.contains(mapped_name)) {
796
0
                        auto file_column_idx = file_column_idx_map.at(mapped_name);
797
0
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
798
0
                        const auto& file_field = _data_file_type_desc->getSubtype(file_column_idx);
799
0
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
800
0
                                *table_field.field_ptr, file_field, ICEBERG_ORC_ATTRIBUTE,
801
0
                                exist_field_id, field_node));
802
0
                        table_info_node_ptr->add_children(
803
0
                                table_column_name,
804
0
                                _data_file_type_desc->getFieldName(file_column_idx), field_node);
805
0
                        have_mapping = true;
806
0
                        break;
807
0
                    }
808
0
                }
809
0
                if (!have_mapping) {
810
0
                    table_info_node_ptr->add_not_exist_children(table_column_name);
811
0
                }
812
0
            }
813
2
        }
814
6.47k
    }
815
816
6.47k
    return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts,
817
6.47k
                                   false, tuple_descriptor, row_descriptor,
818
6.47k
                                   not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts,
819
6.47k
                                   table_info_node_ptr, column_ids, filter_column_ids);
820
6.47k
}
821
822
ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type,
823
6.47k
                                                    const TupleDescriptor* tuple_descriptor) {
824
    // map top-level table column iceberg_id -> orc::Type*
825
6.47k
    std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map;
826
50.8k
    for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) {
827
44.3k
        auto orc_sub_type = orc_type->getSubtype(i);
828
44.3k
        if (!orc_sub_type) continue;
829
830
44.3k
        if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
831
0
            continue;
832
0
        }
833
44.3k
        int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
834
44.3k
        iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type;
835
44.3k
    }
836
837
6.47k
    std::set<uint64_t> column_ids;
838
6.47k
    std::set<uint64_t> filter_column_ids;
839
840
    // helper to process access paths for a given top-level orc field
841
6.47k
    auto process_access_paths = [](const orc::Type* orc_field,
842
6.47k
                                   const std::vector<TColumnAccessPath>& access_paths,
843
14.8k
                                   std::set<uint64_t>& out_ids) {
844
14.8k
        process_nested_access_paths(
845
14.8k
                orc_field, access_paths, out_ids,
846
14.8k
                [](const orc::Type* type) { return type->getColumnId(); },
847
14.8k
                [](const orc::Type* type) { return type->getMaximumColumnId(); },
848
14.8k
                IcebergOrcNestedColumnUtils::extract_nested_column_ids);
849
14.8k
    };
850
851
31.8k
    for (const auto* slot : tuple_descriptor->slots()) {
852
31.8k
        auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id());
853
31.8k
        if (it == iceberg_id_to_orc_type_map.end()) {
854
            // Column not found in file
855
2.49k
            continue;
856
2.49k
        }
857
29.3k
        const orc::Type* orc_field = it->second;
858
859
        // primitive (non-nested) types
860
29.3k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
861
29.3k
             slot->col_type() != TYPE_MAP)) {
862
16.0k
            column_ids.insert(orc_field->getColumnId());
863
16.0k
            if (slot->is_predicate()) {
864
2.28k
                filter_column_ids.insert(orc_field->getColumnId());
865
2.28k
            }
866
16.0k
            continue;
867
16.0k
        }
868
869
        // complex types
870
13.3k
        const auto& all_access_paths = slot->all_access_paths();
871
13.3k
        process_access_paths(orc_field, all_access_paths, column_ids);
872
873
13.3k
        const auto& predicate_access_paths = slot->predicate_access_paths();
874
13.3k
        if (!predicate_access_paths.empty()) {
875
1.55k
            process_access_paths(orc_field, predicate_access_paths, filter_column_ids);
876
1.55k
        }
877
13.3k
    }
878
879
6.47k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
880
6.47k
}
881
882
// Directly read the deletion vector using the `content_offset` and
883
// `content_size_in_bytes` provided by FE in `delete_file_desc`.
884
// These two fields indicate the location of a blob in storage.
885
// Since the current format is `deletion-vector-v1`, which does not
886
// compress any blobs, we can temporarily skip parsing the Puffin footer.
887
Status IcebergTableReader::read_deletion_vector(const std::string& data_file_path,
888
2.44k
                                                const TIcebergDeleteFileDesc& delete_file_desc) {
889
2.44k
    Status create_status = Status::OK();
890
2.44k
    SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
891
2.44k
    _iceberg_delete_rows = _kv_cache->get<DeleteRows>(data_file_path, [&]() -> DeleteRows* {
892
970
        auto* delete_rows = new DeleteRows;
893
894
970
        TFileRangeDesc delete_range;
895
        // must use __set() method to make sure __isset is true
896
970
        delete_range.__set_fs_name(_range.fs_name);
897
970
        delete_range.path = delete_file_desc.path;
898
970
        delete_range.start_offset = delete_file_desc.content_offset;
899
970
        delete_range.size = delete_file_desc.content_size_in_bytes;
900
970
        delete_range.file_size = -1;
901
902
        // We may consider caching the DeletionVectorReader when reading Puffin files,
903
        // where the underlying reader is an `InMemoryFileReader` and a single data file is
904
        // split into multiple splits. However, we need to ensure that the underlying
905
        // reader supports multi-threaded access.
906
970
        DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx);
907
970
        create_status = dv_reader.open();
908
970
        if (!create_status.ok()) [[unlikely]] {
909
0
            return nullptr;
910
0
        }
911
912
970
        size_t buffer_size = delete_range.size;
913
970
        std::vector<char> buf(buffer_size);
914
970
        if (buffer_size < 12) [[unlikely]] {
915
            // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32
916
0
            create_status = Status::DataQualityError("Deletion vector file size too small: {}",
917
0
                                                     buffer_size);
918
0
            return nullptr;
919
0
        }
920
921
970
        create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size});
922
970
        if (!create_status) [[unlikely]] {
923
0
            return nullptr;
924
0
        }
925
        // The serialized blob contains:
926
        //
927
        // Combined length of the vector and magic bytes stored as 4 bytes, big-endian
928
        // A 4-byte magic sequence, D1 D3 39 64
929
        // The vector, serialized as described below
930
        // A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian
931
932
970
        auto total_length = BigEndian::Load32(buf.data());
933
970
        if (total_length + 8 != buffer_size) [[unlikely]] {
934
0
            create_status = Status::DataQualityError(
935
0
                    "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8,
936
0
                    buffer_size);
937
0
            return nullptr;
938
0
        }
939
940
970
        constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
941
970
        if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] {
942
0
            create_status = Status::DataQualityError("Deletion vector magic number mismatch");
943
0
            return nullptr;
944
0
        }
945
946
970
        roaring::Roaring64Map bitmap;
947
970
        SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
948
970
        try {
949
970
            bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12);
950
970
        } catch (const std::runtime_error& e) {
951
0
            create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
952
0
            return nullptr;
953
0
        }
954
        // skip CRC-32 checksum
955
956
970
        delete_rows->reserve(bitmap.cardinality());
957
5.10M
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
958
5.10M
            delete_rows->push_back(*it);
959
5.10M
        }
960
970
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
961
970
        return delete_rows;
962
970
    });
963
964
2.44k
    RETURN_IF_ERROR(create_status);
965
2.44k
    if (!_iceberg_delete_rows->empty()) [[likely]] {
966
2.44k
        set_delete_rows();
967
2.44k
    }
968
2.44k
    return Status::OK();
969
2.44k
}
970
971
// Similar to the code structure of IcebergOrcReader::_process_equality_delete,
972
// but considering the significant differences in how parquet/orc obtains
973
// attributes/column IDs, it is not easy to combine them.
974
Status IcebergParquetReader::_process_equality_delete(
975
670
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
976
670
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
977
670
            partition_columns;
978
670
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
979
980
670
    std::map<int, const FieldSchema*> data_file_id_to_field_schema;
981
2.67k
    for (int idx = 0; idx < _data_file_field_desc->size(); ++idx) {
982
2.00k
        auto field_schema = _data_file_field_desc->get_column(idx);
983
2.00k
        if (_data_file_field_desc->get_column(idx)->field_id == -1) {
984
0
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
985
0
        }
986
2.00k
        data_file_id_to_field_schema[_data_file_field_desc->get_column(idx)->field_id] =
987
2.00k
                field_schema;
988
2.00k
    }
989
990
1.52k
    for (const auto& delete_file : delete_files) {
991
1.52k
        TFileRangeDesc delete_desc;
992
        // must use __set() method to make sure __isset is true
993
1.52k
        delete_desc.__set_fs_name(_range.fs_name);
994
1.52k
        delete_desc.path = delete_file.path;
995
1.52k
        delete_desc.start_offset = 0;
996
1.52k
        delete_desc.size = -1;
997
1.52k
        delete_desc.file_size = -1;
998
999
1.52k
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1000
0
            return Status::InternalError(
1001
0
                    "missing delete field ids when reading equality delete file");
1002
0
        }
1003
1.52k
        auto& read_column_field_ids = delete_file.field_ids;
1004
1.52k
        std::set<int> read_column_field_ids_set;
1005
2.36k
        for (const auto& field_id : read_column_field_ids) {
1006
2.36k
            read_column_field_ids_set.insert(field_id);
1007
2.36k
            _equality_delete_col_ids.insert(field_id);
1008
2.36k
        }
1009
1010
1.52k
        auto delete_reader = ParquetReader::create_unique(
1011
1.52k
                _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
1012
1.52k
                &_state->timezone_obj(), _io_ctx, _state, _meta_cache);
1013
1.52k
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1014
1015
        // the column that to read equality delete file.
1016
        // (delete file may be have extra columns that don't need to read)
1017
1.52k
        std::vector<std::string> delete_col_names;
1018
1.52k
        std::vector<DataTypePtr> delete_col_types;
1019
1.52k
        std::vector<int> delete_col_ids;
1020
1.52k
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1021
1022
1.52k
        const FieldDescriptor* delete_field_desc = nullptr;
1023
1.52k
        RETURN_IF_ERROR(delete_reader->get_file_metadata_schema(&delete_field_desc));
1024
1.52k
        DCHECK(delete_field_desc != nullptr);
1025
1026
1.52k
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1027
2.37k
        for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) {
1028
2.37k
            if (delete_file_field.field_id == -1) [[unlikely]] { // missing delete_file_field id
1029
                // equality delete file must have delete_file_field id to match column.
1030
0
                return Status::DataQualityError(
1031
0
                        "missing delete_file_field id when reading equality delete file");
1032
2.37k
            } else if (read_column_field_ids_set.contains(delete_file_field.field_id)) {
1033
                // the column that need to read.
1034
2.37k
                if (delete_file_field.children.size() > 0) [[unlikely]] { // complex column
1035
0
                    return Status::InternalError(
1036
0
                            "can not support read complex column in equality delete file");
1037
2.37k
                } else if (!data_file_id_to_field_schema.contains(delete_file_field.field_id))
1038
0
                        [[unlikely]] {
1039
0
                    return Status::DataQualityError(
1040
0
                            "can not find delete field id in data file schema when reading "
1041
0
                            "equality delete file");
1042
0
                }
1043
2.37k
                auto data_file_field = data_file_id_to_field_schema[delete_file_field.field_id];
1044
2.37k
                if (data_file_field->data_type->get_primitive_type() !=
1045
2.37k
                    delete_file_field.data_type->get_primitive_type()) [[unlikely]] {
1046
0
                    return Status::NotSupported(
1047
0
                            "Not Support type change in equality delete, field: {}, delete "
1048
0
                            "file type: {}, data file type: {}",
1049
0
                            delete_file_field.field_id, delete_file_field.data_type->get_name(),
1050
0
                            data_file_field->data_type->get_name());
1051
0
                }
1052
1053
2.37k
                std::string filed_lower_name = to_lower(delete_file_field.name);
1054
2.37k
                eq_file_node->add_children(filed_lower_name, delete_file_field.name,
1055
2.37k
                                           std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1056
1057
2.37k
                delete_col_ids.emplace_back(delete_file_field.field_id);
1058
2.37k
                delete_col_names.emplace_back(filed_lower_name);
1059
2.37k
                delete_col_types.emplace_back(make_nullable(delete_file_field.data_type));
1060
1061
2.37k
                read_column_field_ids_set.erase(delete_file_field.field_id);
1062
2.37k
            } else {
1063
                // delete file may be have extra columns that don't need to read
1064
0
            }
1065
2.37k
        }
1066
1.52k
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1067
0
            return Status::DataQualityError("some field ids not found in equality delete file.");
1068
0
        }
1069
1070
3.89k
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1071
2.36k
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1072
2.36k
        }
1073
1.52k
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
1074
1.52k
        RETURN_IF_ERROR(delete_reader->init_reader(delete_col_names, &delete_col_name_to_block_idx,
1075
1.52k
                                                   {}, tmp, nullptr, nullptr, nullptr, nullptr,
1076
1.52k
                                                   nullptr, eq_file_node, false));
1077
1.52k
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1078
1079
1.52k
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1080
850
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1081
850
            Block block;
1082
850
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1083
850
            _equality_delete_blocks.emplace_back(block);
1084
850
        }
1085
1.52k
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1086
1.52k
        bool eof = false;
1087
3.04k
        while (!eof) {
1088
1.52k
            Block tmp_block;
1089
1.52k
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1090
1.52k
            size_t read_rows = 0;
1091
1.52k
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1092
1.52k
            if (read_rows > 0) {
1093
1.52k
                MutableBlock mutable_block(&eq_file_block);
1094
1.52k
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1095
1.52k
            }
1096
1.52k
        }
1097
1.52k
    }
1098
1099
852
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1100
852
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1101
852
        auto equality_delete_impl =
1102
852
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1103
852
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1104
852
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1105
852
    }
1106
670
    return Status::OK();
1107
670
}
1108
1109
Status IcebergOrcReader::_process_equality_delete(
1110
670
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
1111
670
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
1112
670
            partition_columns;
1113
670
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
1114
1115
670
    std::map<int, int> data_file_id_to_field_idx;
1116
2.68k
    for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); ++idx) {
1117
2.01k
        if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
1118
0
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
1119
0
        }
1120
2.01k
        auto field_id = std::stoi(
1121
2.01k
                _data_file_type_desc->getSubtype(idx)->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1122
2.01k
        data_file_id_to_field_idx[field_id] = idx;
1123
2.01k
    }
1124
1125
1.52k
    for (const auto& delete_file : delete_files) {
1126
1.52k
        TFileRangeDesc delete_desc;
1127
        // must use __set() method to make sure __isset is true
1128
1.52k
        delete_desc.__set_fs_name(_range.fs_name);
1129
1.52k
        delete_desc.path = delete_file.path;
1130
1.52k
        delete_desc.start_offset = 0;
1131
1.52k
        delete_desc.size = -1;
1132
1.52k
        delete_desc.file_size = -1;
1133
1134
1.52k
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1135
0
            return Status::InternalError(
1136
0
                    "missing delete field ids when reading equality delete file");
1137
0
        }
1138
1.52k
        auto& read_column_field_ids = delete_file.field_ids;
1139
1.52k
        std::set<int> read_column_field_ids_set;
1140
2.37k
        for (const auto& field_id : read_column_field_ids) {
1141
2.37k
            read_column_field_ids_set.insert(field_id);
1142
2.37k
            _equality_delete_col_ids.insert(field_id);
1143
2.37k
        }
1144
1145
1.52k
        auto delete_reader = OrcReader::create_unique(_profile, _state, _params, delete_desc,
1146
1.52k
                                                      READ_DELETE_FILE_BATCH_SIZE,
1147
1.52k
                                                      _state->timezone(), _io_ctx, _meta_cache);
1148
1.52k
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1149
        // delete file schema
1150
1.52k
        std::vector<std::string> delete_file_col_names;
1151
1.52k
        std::vector<DataTypePtr> delete_file_col_types;
1152
1.52k
        RETURN_IF_ERROR(
1153
1.52k
                delete_reader->get_parsed_schema(&delete_file_col_names, &delete_file_col_types));
1154
1155
        // the column that to read equality delete file.
1156
        // (delete file maybe have extra columns that don't need to read)
1157
1.52k
        std::vector<std::string> delete_col_names;
1158
1.52k
        std::vector<DataTypePtr> delete_col_types;
1159
1.52k
        std::vector<int> delete_col_ids;
1160
1.52k
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1161
1162
1.52k
        const orc::Type* delete_field_desc = nullptr;
1163
1.52k
        RETURN_IF_ERROR(delete_reader->get_file_type(&delete_field_desc));
1164
1.52k
        DCHECK(delete_field_desc != nullptr);
1165
1166
1.52k
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1167
1168
3.89k
        for (size_t idx = 0; idx < delete_field_desc->getSubtypeCount(); idx++) {
1169
2.36k
            auto delete_file_field = delete_field_desc->getSubtype(idx);
1170
1171
2.36k
            if (!delete_file_field->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE))
1172
0
                    [[unlikely]] { // missing delete_file_field id
1173
                // equality delete file must have delete_file_field id to match column.
1174
0
                return Status::DataQualityError(
1175
0
                        "missing delete_file_field id when reading equality delete file");
1176
2.36k
            } else {
1177
2.36k
                auto delete_field_id =
1178
2.36k
                        std::stoi(delete_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1179
2.37k
                if (read_column_field_ids_set.contains(delete_field_id)) {
1180
                    // the column that need to read.
1181
2.37k
                    if (is_complex_type(delete_file_col_types[idx]->get_primitive_type()))
1182
0
                            [[unlikely]] {
1183
0
                        return Status::InternalError(
1184
0
                                "can not support read complex column in equality delete file.");
1185
2.37k
                    } else if (!data_file_id_to_field_idx.contains(delete_field_id)) [[unlikely]] {
1186
0
                        return Status::DataQualityError(
1187
0
                                "can not find delete field id in data file schema when reading "
1188
0
                                "equality delete file");
1189
0
                    }
1190
1191
2.37k
                    auto data_file_field = _data_file_type_desc->getSubtype(
1192
2.37k
                            data_file_id_to_field_idx[delete_field_id]);
1193
1194
2.37k
                    if (delete_file_field->getKind() != data_file_field->getKind()) [[unlikely]] {
1195
0
                        return Status::NotSupported(
1196
0
                                "Not Support type change in equality delete, field: {}, delete "
1197
0
                                "file type: {}, data file type: {}",
1198
0
                                delete_field_id, delete_file_field->getKind(),
1199
0
                                data_file_field->getKind());
1200
0
                    }
1201
2.37k
                    std::string filed_lower_name = to_lower(delete_field_desc->getFieldName(idx));
1202
2.37k
                    eq_file_node->add_children(
1203
2.37k
                            filed_lower_name, delete_field_desc->getFieldName(idx),
1204
2.37k
                            std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1205
1206
2.37k
                    delete_col_ids.emplace_back(delete_field_id);
1207
2.37k
                    delete_col_names.emplace_back(filed_lower_name);
1208
2.37k
                    delete_col_types.emplace_back(make_nullable(delete_file_col_types[idx]));
1209
2.37k
                    read_column_field_ids_set.erase(delete_field_id);
1210
2.37k
                }
1211
2.36k
            }
1212
2.36k
        }
1213
1.52k
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1214
0
            return Status::DataQualityError("some field ids not found in equality delete file.");
1215
0
        }
1216
1217
3.89k
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1218
2.36k
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1219
2.36k
        }
1220
1221
1.52k
        RETURN_IF_ERROR(delete_reader->init_reader(&delete_col_names, &delete_col_name_to_block_idx,
1222
1.52k
                                                   {}, false, nullptr, nullptr, nullptr, nullptr,
1223
1.52k
                                                   eq_file_node));
1224
1.52k
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1225
1226
1.52k
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1227
852
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1228
852
            Block block;
1229
852
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1230
852
            _equality_delete_blocks.emplace_back(block);
1231
852
        }
1232
1.52k
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1233
1.52k
        bool eof = false;
1234
4.57k
        while (!eof) {
1235
3.04k
            Block tmp_block;
1236
3.04k
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1237
3.04k
            size_t read_rows = 0;
1238
3.04k
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1239
3.04k
            if (read_rows > 0) {
1240
1.52k
                MutableBlock mutable_block(&eq_file_block);
1241
1.52k
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1242
1.52k
            }
1243
3.04k
        }
1244
1.52k
    }
1245
1246
852
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1247
852
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1248
852
        auto equality_delete_impl =
1249
852
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1250
852
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1251
852
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1252
852
    }
1253
670
    return Status::OK();
1254
670
}
1255
} // namespace doris