Coverage Report

Created: 2026-04-02 10:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/iceberg_reader.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <glog/logging.h>
25
#include <parallel_hashmap/phmap.h>
26
#include <rapidjson/document.h>
27
28
#include <algorithm>
29
#include <cstring>
30
#include <functional>
31
#include <memory>
32
#include <set>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/data_type/data_type_factory.hpp"
41
#include "exprs/aggregate/aggregate_function.h"
42
#include "format/format_common.h"
43
#include "format/generic_reader.h"
44
#include "format/orc/vorc_reader.h"
45
#include "format/parquet/schema_desc.h"
46
#include "format/parquet/vparquet_column_chunk_reader.h"
47
#include "format/table/deletion_vector_reader.h"
48
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
49
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
50
#include "format/table/iceberg_delete_file_reader_helper.h"
51
#include "format/table/nested_column_access_helper.h"
52
#include "format/table/table_format_reader.h"
53
#include "runtime/runtime_state.h"
54
#include "util/coding.h"
55
56
namespace cctz {
57
#include "common/compile_check_begin.h"
58
class time_zone;
59
} // namespace cctz
60
namespace doris {
61
class RowDescriptor;
62
class SlotDescriptor;
63
class TupleDescriptor;
64
65
namespace io {
66
struct IOContext;
67
} // namespace io
68
class VExprContext;
69
} // namespace doris
70
71
namespace doris {
72
namespace {
73
74
class GroupedDeleteRowsVisitor final : public IcebergPositionDeleteVisitor {
75
public:
76
    using DeleteRows = std::vector<int64_t>;
77
    using DeleteFile = phmap::parallel_flat_hash_map<
78
            std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>,
79
            std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8,
80
            std::mutex>;
81
82
    explicit GroupedDeleteRowsVisitor(DeleteFile* position_delete)
83
1.75k
            : _position_delete(position_delete) {}
84
85
479k
    Status visit(const std::string& file_path, int64_t pos) override {
86
479k
        if (_position_delete == nullptr) {
87
0
            return Status::InvalidArgument("position delete map is null");
88
0
        }
89
90
479k
        auto iter = _position_delete->find(file_path);
91
479k
        DeleteRows* delete_rows = nullptr;
92
479k
        if (iter == _position_delete->end()) {
93
6.65k
            delete_rows = new DeleteRows;
94
6.65k
            (*_position_delete)[file_path] = std::unique_ptr<DeleteRows>(delete_rows);
95
473k
        } else {
96
473k
            delete_rows = iter->second.get();
97
473k
        }
98
479k
        delete_rows->push_back(pos);
99
479k
        return Status::OK();
100
479k
    }
101
102
private:
103
    DeleteFile* _position_delete;
104
};
105
106
} // namespace
107
108
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
109
110
bool IcebergTableReader::_is_fully_dictionary_encoded(
111
        const tparquet::ColumnMetaData& column_metadata) {
112
    const auto is_dictionary_encoding = [](tparquet::Encoding::type encoding) {
113
        return encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
114
               encoding == tparquet::Encoding::RLE_DICTIONARY;
115
24.4k
    };
116
24.4k
    const auto is_data_page = [](tparquet::PageType::type page_type) {
117
24.4k
        return page_type == tparquet::PageType::DATA_PAGE ||
118
24.4k
               page_type == tparquet::PageType::DATA_PAGE_V2;
119
24.4k
    };
120
24.4k
    const auto is_level_encoding = [](tparquet::Encoding::type encoding) {
121
24.4k
        return encoding == tparquet::Encoding::RLE || encoding == tparquet::Encoding::BIT_PACKED;
122
24.4k
    };
123
24.4k
124
24.4k
    // A column chunk may have a dictionary page but still contain plain-encoded data pages.
125
24.4k
    // Only treat it as dictionary-coded when all data pages are dictionary encoded.
126
24.4k
    if (column_metadata.__isset.encoding_stats) {
127
24.4k
        bool has_data_page_stats = false;
128
24.4k
        for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
129
24.4k
            if (is_data_page(enc_stat.page_type) && enc_stat.count > 0) {
130
24.4k
                has_data_page_stats = true;
131
                if (!is_dictionary_encoding(enc_stat.encoding)) {
132
31.6k
                    return false;
133
31.6k
                }
134
            }
135
31.6k
        }
136
        if (has_data_page_stats) {
137
31.6k
            return true;
138
1.96k
        }
139
1.96k
    }
140
2.48k
141
2.48k
    bool has_dict_encoding = false;
142
2.48k
    bool has_nondict_encoding = false;
143
2.48k
    for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
144
1.96k
        if (is_dictionary_encoding(encoding)) {
145
1.96k
            has_dict_encoding = true;
146
        }
147
31.6k
148
31.6k
        if (!is_dictionary_encoding(encoding) && !is_level_encoding(encoding)) {
149
31.6k
            has_nondict_encoding = true;
150
            break;
151
24.3k
        }
152
    }
153
24.3k
    if (!has_dict_encoding || has_nondict_encoding) {
154
108
        return false;
155
108
    }
156
157
24.2k
    return true;
158
24.2k
}
159
24.2k
160
1.68k
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader,
161
1.68k
                                       RuntimeProfile* profile, RuntimeState* state,
162
                                       const TFileScanRangeParams& params,
163
22.5k
                                       const TFileRangeDesc& range, ShardedKVCache* kv_cache,
164
22.5k
                                       io::IOContext* io_ctx, FileMetaCache* meta_cache)
165
        : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx,
166
                            meta_cache),
167
          _kv_cache(kv_cache) {
168
22.5k
    static const char* iceberg_profile = "IcebergProfile";
169
156
    ADD_TIMER(_profile, iceberg_profile);
170
156
    _iceberg_profile.num_delete_files =
171
156
            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile);
172
156
    _iceberg_profile.num_delete_rows =
173
52
            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
174
52
    _iceberg_profile.delete_files_read_time =
175
156
            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
176
52
    _iceberg_profile.delete_rows_sort_time =
177
52
            ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
178
    _iceberg_profile.parse_delete_file_time =
179
156
            ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile);
180
78
}
181
78
182
78
Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
183
78
    RETURN_IF_ERROR(_expand_block_if_need(block));
184
78
185
78
    RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
186
156
187
156
    if (_equality_delete_impls.size() > 0) {
188
156
        std::unique_ptr<IColumn::Filter> filter =
189
                std::make_unique<IColumn::Filter>(block->rows(), 1);
190
22.5k
        for (auto& equality_delete_impl : _equality_delete_impls) {
191
22.5k
            RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
192
22.5k
                    block, _col_name_to_block_idx, _id_to_block_column_name, *filter));
193
22.5k
        }
194
10.8k
        Block::filter_block_internal(block, *filter, block->columns());
195
5.36k
    }
196
5.48k
197
3.04k
    *read_rows = block->rows();
198
3.04k
    return _shrink_block_if_need(block);
199
2.44k
}
200
2.44k
201
10.8k
Status IcebergTableReader::init_row_filters() {
202
    // We get the count value by doris's be, so we don't need to read the delete file
203
22.5k
    if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) {
204
1.34k
        return Status::OK();
205
1.34k
    }
206
1.34k
207
    const auto& table_desc = _range.table_format_params.iceberg_params;
208
22.5k
    const auto& version = table_desc.format_version;
209
2.44k
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
210
        return Status::OK();
211
    }
212
213
    auto* parquet_reader = dynamic_cast<ParquetReader*>(_file_format_reader.get());
214
    auto* orc_reader = dynamic_cast<OrcReader*>(_file_format_reader.get());
215
0
216
0
    // Initialize file information for $row_id generation
217
2.44k
    // Extract from table_desc which contains current file's metadata
218
2.44k
    if (_need_row_id_column) {
219
        std::string file_path = table_desc.original_file_path;
220
2.44k
        int32_t partition_spec_id = 0;
221
        std::string partition_data_json;
222
20.1k
        if (table_desc.__isset.partition_spec_id) {
223
1.83k
            partition_spec_id = table_desc.partition_spec_id;
224
1.83k
        }
225
1.83k
        if (table_desc.__isset.partition_data_json) {
226
1.83k
            partition_data_json = table_desc.partition_data_json;
227
        }
228
22.5k
229
22.5k
        if (parquet_reader != nullptr) {
230
22.5k
            parquet_reader->set_iceberg_rowid_params(file_path, partition_spec_id,
231
                                                     partition_data_json, _row_id_column_position);
232
        } else if (orc_reader != nullptr) {
233
            orc_reader->set_iceberg_rowid_params(file_path, partition_spec_id, partition_data_json,
234
6.25k
                                                 _row_id_column_position);
235
16.0k
        }
236
9.74k
        LOG(INFO) << "Initialized $row_id generation for file: " << file_path
237
9.74k
                  << ", partition_spec_id: " << partition_spec_id;
238
9.74k
    }
239
9.74k
240
9.74k
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
241
6.25k
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
242
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
243
31.6k
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
244
31.6k
        if (desc.content == POSITION_DELETE) {
245
31.6k
            position_delete_files.emplace_back(desc);
246
31.6k
        } else if (desc.content == EQUALITY_DELETE) {
247
31.6k
            equality_delete_files.emplace_back(desc);
248
946
        } else if (desc.content == DELETION_VECTOR) {
249
946
            deletion_vector_files.emplace_back(desc);
250
0
        }
251
0
    }
252
946
253
946
    if (!equality_delete_files.empty()) {
254
946
        RETURN_IF_ERROR(_process_equality_delete(equality_delete_files));
255
946
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
256
31.6k
    }
257
31.6k
258
    if (!deletion_vector_files.empty()) {
259
31.7k
        if (deletion_vector_files.size() != 1) [[unlikely]] {
260
31.7k
            /*
261
31.7k
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
262
946
             * at execution time than position delete files. Unlike equality or position delete files, there can be
263
0
             * at most one deletion vector for a given data file in a snapshot.
264
0
             */
265
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
266
946
        }
267
946
        RETURN_IF_ERROR(
268
31.7k
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
269
31.7k
270
946
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
271
946
        // Readers can safely ignore position delete files if there is a DV for a data file.
272
31.7k
    } else if (!position_delete_files.empty()) {
273
31.7k
        RETURN_IF_ERROR(
274
                _position_delete_base(table_desc.original_file_path, position_delete_files));
275
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
276
1.83k
    }
277
1.83k
278
1.83k
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
279
5.36k
    return Status::OK();
280
5.36k
}
281
5.36k
282
5.36k
void IcebergTableReader::_generate_equality_delete_block(
283
5.36k
        Block* block, const std::vector<std::string>& equality_delete_col_names,
284
1.75k
        const std::vector<DataTypePtr>& equality_delete_col_types) {
285
1.75k
    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
286
        DataTypePtr data_type = make_nullable(equality_delete_col_types[i]);
287
1.75k
        MutableColumnPtr data_column = data_type->create_column();
288
0
        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
289
0
                                            equality_delete_col_names[i]));
290
    }
291
1.75k
}
292
1.75k
293
5.36k
Status IcebergTableReader::_expand_block_if_need(Block* block) {
294
0
    std::set<std::string> names;
295
5.36k
    auto block_names = block->get_names();
296
0
    names.insert(block_names.begin(), block_names.end());
297
0
    for (auto& col : _expand_columns) {
298
        col.column->assume_mutable()->clear();
299
5.36k
        if (names.contains(col.name)) {
300
5.36k
            return Status::InternalError("Wrong expand column '{}'", col.name);
301
3.50k
        }
302
3.50k
        names.insert(col.name);
303
3.50k
        (*_col_name_to_block_idx)[col.name] = static_cast<uint32_t>(block->columns());
304
3.50k
        block->insert(col);
305
3.50k
    }
306
3.50k
    return Status::OK();
307
5.36k
}
308
5.36k
309
Status IcebergTableReader::_shrink_block_if_need(Block* block) {
310
    std::set<size_t> positions_to_erase;
311
    for (const std::string& expand_col : _expand_col_names) {
312
        if (!_col_name_to_block_idx->contains(expand_col)) {
313
1.83k
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
314
1.47k
                                         block->dump_names());
315
1.47k
        }
316
1.47k
        positions_to_erase.emplace((*_col_name_to_block_idx)[expand_col]);
317
1.47k
    }
318
1.47k
    block->erase(positions_to_erase);
319
1.47k
    for (const std::string& expand_col : _expand_col_names) {
320
1.47k
        _col_name_to_block_idx->erase(expand_col);
321
    }
322
1.47k
    return Status::OK();
323
1.47k
}
324
325
1.47k
Status IcebergTableReader::_position_delete_base(
326
1.47k
        const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
327
1.47k
    std::vector<DeleteRows*> delete_rows_array;
328
1.47k
    int64_t num_delete_rows = 0;
329
1.83k
    for (const auto& delete_file : delete_files) {
330
1.83k
        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
331
        Status create_status = Status::OK();
332
        auto* delete_file_cache = _kv_cache->get<DeleteFile>(
333
1.75k
                _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* {
334
1.75k
                    auto* position_delete = new DeleteFile;
335
1.75k
                    create_status = _read_position_delete_file(delete_file, position_delete);
336
1.75k
337
1.75k
                    if (!create_status) {
338
1.75k
                        return nullptr;
339
1.75k
                    }
340
1.75k
341
1.75k
                    return position_delete;
342
1.75k
                });
343
1.75k
        if (create_status.is<ErrorCode::END_OF_FILE>()) {
344
1.75k
            continue;
345
        } else if (!create_status.ok()) {
346
            return create_status;
347
        }
348
349
        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
350
        auto get_value = [&](const auto& v) {
351
            DeleteRows* row_ids = v.second.get();
352
            if (!row_ids->empty()) {
353
                delete_rows_array.emplace_back(row_ids);
354
1.47k
                num_delete_rows += row_ids->size();
355
1.47k
            }
356
0
        };
357
0
        delete_file_map.if_contains(data_file_path, get_value);
358
1.47k
    }
359
764
    // Use a KV cache to store the delete rows corresponding to a data file path.
360
764
    // The Parquet/ORC reader holds a reference (pointer) to this cached entry.
361
764
    // This allows delete rows to be reused when a single data file is split into
362
764
    // multiple splits, avoiding excessive memory usage when delete rows are large.
363
712
    if (num_delete_rows > 0) {
364
56
        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
365
56
        _iceberg_delete_rows =
366
56
                _kv_cache->get<DeleteRows>(data_file_path,
367
56
                                           [&]() -> DeleteRows* {
368
56
                                               auto* data_file_position_delete = new DeleteRows;
369
56
                                               _sort_delete_rows(delete_rows_array, num_delete_rows,
370
                                                                 *data_file_position_delete);
371
656
372
656
                                               return data_file_position_delete;
373
656
                                           }
374
656
375
656
                );
376
2.62k
        set_delete_rows();
377
2.62k
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
378
2.62k
    }
379
2.62k
    return Status::OK();
380
2.62k
}
381
656
382
322k
Status IcebergTableReader::_read_position_delete_file(const TIcebergDeleteFileDesc& delete_file,
383
321k
                                                      DeleteFile* position_delete) {
384
321k
    GroupedDeleteRowsVisitor visitor(position_delete);
385
1.60M
    IcebergDeleteFileReaderOptions options;
386
1.28M
    options.state = _state;
387
316k
    options.profile = _profile;
388
316k
    options.scan_params = &_params;
389
316k
    options.io_ctx = _io_ctx;
390
1.28M
    options.meta_cache = _meta_cache;
391
321k
    options.fs_name = &_range.fs_name;
392
321k
    options.batch_size = READ_DELETE_FILE_BATCH_SIZE;
393
321k
    return read_iceberg_position_delete_file(delete_file, options, &visitor);
394
2.62k
}
395
2.62k
396
2.62k
/**
397
321k
 * https://iceberg.apache.org/spec/#position-delete-files
398
656
 * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
399
 * Sorting by file_path allows filter pushdown by file in columnar storage formats.
400
 * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
401
 */
402
void IcebergTableReader::_sort_delete_rows(
403
        const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows,
404
        std::vector<int64_t>& result) {
405
    if (delete_rows_array.empty()) {
406
        return;
407
    }
408
    if (delete_rows_array.size() == 1) {
409
17.9k
        result.resize(num_delete_rows);
410
17.9k
        memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows);
411
17.9k
        return;
412
17.9k
    }
413
17.9k
    if (delete_rows_array.size() == 2) {
414
17.9k
        result.resize(num_delete_rows);
415
17.9k
        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
416
234
                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
417
234
                   result.begin());
418
234
        return;
419
234
    }
420
234
421
236
    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
422
18.4E
    result.resize(num_delete_rows);
423
234
    auto row_id_iter = result.begin();
424
234
    auto iter_end = result.end();
425
    std::vector<vec_pair> rows_array;
426
17.9k
    for (auto* rows : delete_rows_array) {
427
17.9k
        if (!rows->empty()) {
428
17.9k
            rows_array.emplace_back(rows->begin(), rows->end());
429
        }
430
17.9k
    }
431
17.9k
    size_t array_size = rows_array.size();
432
    while (row_id_iter != iter_end) {
433
17.9k
        int64_t min_index = 0;
434
1
        int64_t min = *rows_array[0].first;
435
1
        for (size_t i = 0; i < array_size; ++i) {
436
17.9k
            if (*rows_array[i].first < min) {
437
17.9k
                min_index = i;
438
                min = *rows_array[i].first;
439
17.9k
            }
440
140k
        }
441
122k
        *row_id_iter++ = min;
442
        rows_array[min_index].first++;
443
2
        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
444
2
            rows_array.erase(rows_array.begin() + min_index);
445
2
            array_size--;
446
122k
        }
447
17.9k
    }
448
}
449
17.9k
450
17.9k
Status IcebergParquetReader::init_reader(
451
        const std::vector<std::string>& file_col_names,
452
17.8k
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
453
56.3k
        const VExprContextSPtrs& conjuncts,
454
56.3k
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
455
56.3k
                slot_id_to_predicates,
456
56.3k
        const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
457
2.50k
        const std::unordered_map<std::string, int>* colname_to_slot_id,
458
2.50k
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
459
53.8k
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
460
53.8k
    _file_format = Fileformat::PARQUET;
461
    _col_name_to_block_idx = col_name_to_block_idx;
462
140k
    auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
463
123k
    RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&_data_file_field_desc));
464
123k
    DCHECK(_data_file_field_desc != nullptr);
465
    if (_row_lineage_columns != nullptr) {
466
123k
        const auto& table_desc = _range.table_format_params.iceberg_params;
467
50.6k
        _row_lineage_columns->first_row_id =
468
                table_desc.__isset.first_row_id ? table_desc.first_row_id : -1;
469
50.6k
        _row_lineage_columns->last_updated_sequence_number =
470
50.6k
                table_desc.__isset.last_updated_sequence_number
471
50.6k
                        ? table_desc.last_updated_sequence_number
472
50.6k
                        : -1;
473
50.6k
        parquet_reader->set_row_lineage_columns(_row_lineage_columns);
474
    }
475
50.6k
476
50.6k
    auto column_id_result = _create_column_ids(_data_file_field_desc, tuple_descriptor);
477
72.4k
    auto& column_ids = column_id_result.column_ids;
478
    const auto& filter_column_ids = column_id_result.filter_column_ids;
479
318
480
    RETURN_IF_ERROR(init_row_filters());
481
    _all_required_col_names = file_col_names;
482
483
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
484
318
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(
485
318
                tuple_descriptor, *_data_file_field_desc, table_info_node_ptr));
486
318
    } else {
487
318
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
488
489
318
        bool exist_field_id = true;
490
318
        for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
491
318
            if (_data_file_field_desc->get_column(idx)->field_id == -1) {
492
318
                // the data file may be from hive table migrated to iceberg, field id is missing
493
318
                exist_field_id = false;
494
318
                break;
495
            }
496
318
        }
497
318
        const auto& table_schema = _params.history_schema_info.front().root_field;
498
318
499
123k
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
500
17.8k
        if (exist_field_id) {
501
3.25k
            // id -> table column name. columns that need read data file.
502
3.25k
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
503
17.8k
            for (const auto& table_field : table_schema.fields) {
504
50
                auto field = table_field.field_ptr;
505
0
                DCHECK(field->__isset.name);
506
0
                if (!read_col_name_set.contains(field->name)) {
507
0
                    continue;
508
50
                }
509
58
                id_to_table_field.emplace(field->id, field);
510
8
            }
511
8
512
            for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
513
50
                const auto& data_file_field = _data_file_field_desc->get_column(idx);
514
2
                auto data_file_column_id = _data_file_field_desc->get_column(idx)->field_id;
515
2
516
2
                if (id_to_table_field.contains(data_file_column_id)) {
517
2
                    const auto& table_field = id_to_table_field[data_file_column_id];
518
0
519
0
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
520
2
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
521
2
                            *table_field, *data_file_field, exist_field_id, field_node));
522
2
                    table_info_node_ptr->add_children(table_field->name, data_file_field->name,
523
2
                                                      field_node);
524
2
525
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
526
0
                    id_to_table_field.erase(data_file_column_id);
527
0
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
528
0
                    // Columns that need to be read for equality delete.
529
0
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
530
0
531
0
                    // Construct table column names that avoid duplication with current table schema.
532
0
                    // As the columns currently being read may have been deleted in the latest
533
0
                    // table structure or have undergone a series of schema changes...
534
0
                    std::string table_column_name = EQ_DELETE_PRE + data_file_field->name;
535
0
                    table_info_node_ptr->add_children(
536
0
                            table_column_name, data_file_field->name,
537
0
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
538
0
539
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
540
0
                    _expand_col_names.emplace_back(table_column_name);
541
0
                    auto expand_data_type = make_nullable(data_file_field->data_type);
542
0
                    _expand_columns.emplace_back(
543
50
                            ColumnWithTypeAndName {expand_data_type->create_column(),
544
17.9k
                                                   expand_data_type, table_column_name});
545
546
17.9k
                    _all_required_col_names.emplace_back(table_column_name);
547
17.9k
                    column_ids.insert(data_file_field->get_column_id());
548
17.9k
                }
549
17.9k
            }
550
17.9k
            for (const auto& [id, table_field] : id_to_table_field) {
551
                table_info_node_ptr->add_not_exist_children(table_field->name);
552
            }
553
17.9k
        } else {
554
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
555
17.9k
                return Status::InternalError(
556
17.9k
                        "Can not read missing field id data file when have equality delete");
557
            }
558
            std::map<std::string, size_t> file_column_idx_map;
559
17.9k
            for (size_t idx = 0; idx < _data_file_field_desc->size(); idx++) {
560
                file_column_idx_map.emplace(_data_file_field_desc->get_column(idx)->name, idx);
561
141k
            }
562
123k
563
123k
            for (const auto& table_field : table_schema.fields) {
564
                DCHECK(table_field.__isset.field_ptr);
565
123k
                DCHECK(table_field.field_ptr->__isset.name);
566
123k
                const auto& table_column_name = table_field.field_ptr->name;
567
123k
                if (!read_col_name_set.contains(table_column_name)) {
568
                    continue;
569
17.9k
                }
570
17.9k
                if (!table_field.field_ptr->__isset.name_mapping ||
571
                    table_field.field_ptr->name_mapping.size() == 0) {
572
                    return Status::DataQualityError(
573
17.9k
                            "name_mapping must be set when read missing field id data file.");
574
17.9k
                }
575
17.9k
                bool have_mapping = false;
576
15.7k
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
577
15.7k
                    if (file_column_idx_map.contains(mapped_name)) {
578
15.7k
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
579
15.7k
                        const auto& file_field = _data_file_field_desc->get_column(
580
15.7k
                                file_column_idx_map.at(mapped_name));
581
15.7k
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
582
                                *table_field.field_ptr, *file_field, exist_field_id, field_node));
583
54.2k
                        table_info_node_ptr->add_children(table_column_name, file_field->name,
584
54.2k
                                                          field_node);
585
54.2k
                        have_mapping = true;
586
                        break;
587
3.57k
                    }
588
3.57k
                }
589
50.6k
                if (!have_mapping) {
590
                    table_info_node_ptr->add_not_exist_children(table_column_name);
591
                }
592
50.6k
            }
593
50.6k
        }
594
36.4k
    }
595
596
36.4k
    return parquet_reader->init_reader(
597
5.75k
            _all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates,
598
5.75k
            tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts,
599
36.4k
            slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids);
600
36.4k
}
601
602
ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
603
14.2k
                                                        const TupleDescriptor* tuple_descriptor) {
604
14.2k
    // First, assign column IDs to the field descriptor
605
    auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc);
606
14.2k
    mutable_field_desc->assign_ids();
607
14.2k
608
1.58k
    // map top-level table column iceberg_id -> FieldSchema*
609
1.58k
    std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map;
610
14.2k
611
17.9k
    for (int i = 0; i < field_desc->size(); ++i) {
612
17.9k
        auto field_schema = field_desc->get_column(i);
613
        if (!field_schema) continue;
614
615
        int iceberg_id = field_schema->field_id;
616
        iceberg_id_to_field_schema_map[iceberg_id] = field_schema;
617
    }
618
619
    std::set<uint64_t> column_ids;
620
    std::set<uint64_t> filter_column_ids;
621
6.47k
622
6.47k
    // helper to process access paths for a given top-level parquet field
623
6.47k
    auto process_access_paths = [](const FieldSchema* parquet_field,
624
6.47k
                                   const std::vector<TColumnAccessPath>& access_paths,
625
6.47k
                                   std::set<uint64_t>& out_ids) {
626
6.47k
        process_nested_access_paths(
627
6.47k
                parquet_field, access_paths, out_ids,
628
6.47k
                [](const FieldSchema* field) { return field->get_column_id(); },
629
6.47k
                [](const FieldSchema* field) { return field->get_max_column_id(); },
630
234
                IcebergParquetNestedColumnUtils::extract_nested_column_ids);
631
234
    };
632
18.4E
633
234
    for (const auto* slot : tuple_descriptor->slots()) {
634
234
        auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id());
635
236
        if (it == iceberg_id_to_field_schema_map.end()) {
636
18.4E
            // Column not found in file (e.g., partition column, added column)
637
234
            continue;
638
234
        }
639
        auto field_schema = it->second;
640
6.47k
641
6.47k
        // primitive (non-nested) types: direct mapping by name
642
6.47k
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
643
             slot->col_type() != TYPE_MAP)) {
644
6.47k
            column_ids.insert(field_schema->column_id);
645
646
6.47k
            if (slot->is_predicate()) {
647
6.47k
                filter_column_ids.insert(field_schema->column_id);
648
1
            }
649
1
            continue;
650
6.47k
        }
651
6.47k
652
        // complex types:
653
6.47k
        const auto& all_access_paths = slot->all_access_paths();
654
50.8k
        process_access_paths(field_schema, all_access_paths, column_ids);
655
44.3k
656
0
        const auto& predicate_access_paths = slot->predicate_access_paths();
657
0
        if (!predicate_access_paths.empty()) {
658
0
            process_access_paths(field_schema, predicate_access_paths, filter_column_ids);
659
44.3k
        }
660
    }
661
6.47k
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
662
6.47k
}
663
6.47k
664
Status IcebergOrcReader::init_reader(
665
6.46k
        const std::vector<std::string>& file_col_names,
666
32.0k
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
667
32.0k
        const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
668
32.0k
        const RowDescriptor* row_descriptor,
669
32.0k
        const std::unordered_map<std::string, int>* colname_to_slot_id,
670
548
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
671
548
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
672
    _file_format = Fileformat::ORC;
673
31.5k
    _col_name_to_block_idx = col_name_to_block_idx;
674
31.5k
    auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
675
    RETURN_IF_ERROR(orc_reader->get_file_type(&_data_file_type_desc));
676
50.8k
    std::vector<std::string> data_file_col_names;
677
44.3k
    std::vector<DataTypePtr> data_file_col_types;
678
44.3k
    RETURN_IF_ERROR(orc_reader->get_parsed_schema(&data_file_col_names, &data_file_col_types));
679
44.3k
    if (_row_lineage_columns != nullptr) {
680
44.3k
        const auto& table_desc = _range.table_format_params.iceberg_params;
681
        _row_lineage_columns->first_row_id =
682
44.3k
                table_desc.__isset.first_row_id ? table_desc.first_row_id : -1;
683
29.3k
        _row_lineage_columns->last_updated_sequence_number =
684
                table_desc.__isset.last_updated_sequence_number
685
29.3k
                        ? table_desc.last_updated_sequence_number
686
29.3k
                        : -1;
687
29.3k
        orc_reader->set_row_lineage_columns(_row_lineage_columns);
688
29.3k
    }
689
29.3k
690
29.3k
    auto column_id_result = _create_column_ids(_data_file_type_desc, tuple_descriptor);
691
    auto& column_ids = column_id_result.column_ids;
692
29.3k
    const auto& filter_column_ids = column_id_result.filter_column_ids;
693
29.3k
694
29.3k
    RETURN_IF_ERROR(init_row_filters());
695
696
318
    _all_required_col_names = file_col_names;
697
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
698
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, _data_file_type_desc,
699
                                                        table_info_node_ptr));
700
    } else {
701
318
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
702
318
703
318
        bool exist_field_id = true;
704
318
        for (size_t idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
705
            if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
706
318
                exist_field_id = false;
707
318
                break;
708
            }
709
318
        }
710
318
711
318
        const auto& table_schema = _params.history_schema_info.front().root_field;
712
318
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
713
        if (exist_field_id) {
714
318
            // id -> table column name. columns that need read data file.
715
318
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
716
318
            for (const auto& table_field : table_schema.fields) {
717
44.3k
                auto field = table_field.field_ptr;
718
6.46k
                DCHECK(field->__isset.name);
719
2.18k
                if (!read_col_name_set.contains(field->name)) {
720
2.18k
                    continue;
721
6.46k
                }
722
14
723
0
                id_to_table_field.emplace(field->id, field);
724
0
            }
725
0
726
14
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
727
14
                const auto& data_file_field = _data_file_type_desc->getSubtype(idx);
728
0
                auto data_file_column_id =
729
0
                        std::stoi(data_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
730
0
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
731
732
14
                if (id_to_table_field.contains(data_file_column_id)) {
733
0
                    const auto& table_field = id_to_table_field[data_file_column_id];
734
0
735
0
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
736
0
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
737
0
                            *table_field, data_file_field, ICEBERG_ORC_ATTRIBUTE, exist_field_id,
738
0
                            field_node));
739
0
                    table_info_node_ptr->add_children(table_field->name, file_column_name,
740
0
                                                      field_node);
741
0
742
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
743
0
                    id_to_table_field.erase(data_file_column_id);
744
0
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
745
0
                    // Columns that need to be read for equality delete.
746
0
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
747
0
748
0
                    // Construct table column names that avoid duplication with current table schema.
749
0
                    // As the columns currently being read may have been deleted in the latest
750
0
                    // table structure or have undergone a series of schema changes...
751
0
                    std::string table_column_name = EQ_DELETE_PRE + file_column_name;
752
0
                    table_info_node_ptr->add_children(
753
0
                            table_column_name, file_column_name,
754
0
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
755
0
756
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
757
0
                    _expand_col_names.emplace_back(table_column_name);
758
0
759
0
                    auto expand_data_type = make_nullable(data_file_col_types[idx]);
760
0
                    _expand_columns.emplace_back(
761
0
                            ColumnWithTypeAndName {expand_data_type->create_column(),
762
0
                                                   expand_data_type, table_column_name});
763
0
764
14
                    _all_required_col_names.emplace_back(table_column_name);
765
6.47k
                    column_ids.insert(data_file_field->getColumnId());
766
                }
767
6.47k
            }
768
6.47k
            for (const auto& [id, table_field] : id_to_table_field) {
769
6.47k
                table_info_node_ptr->add_not_exist_children(table_field->name);
770
6.47k
            }
771
6.47k
        } else {
772
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
773
                return Status::InternalError(
774
6.46k
                        "Can not read missing field id data file when have equality delete");
775
            }
776
6.46k
            std::map<std::string, size_t> file_column_idx_map;
777
50.8k
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
778
44.3k
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
779
44.3k
                file_column_idx_map.emplace(file_column_name, idx);
780
            }
781
44.3k
782
0
            for (const auto& table_field : table_schema.fields) {
783
0
                DCHECK(table_field.__isset.field_ptr);
784
44.3k
                DCHECK(table_field.field_ptr->__isset.name);
785
44.3k
                const auto& table_column_name = table_field.field_ptr->name;
786
44.3k
                if (!read_col_name_set.contains(table_column_name)) {
787
                    continue;
788
6.46k
                }
789
6.46k
                if (!table_field.field_ptr->__isset.name_mapping ||
790
                    table_field.field_ptr->name_mapping.size() == 0) {
791
                    return Status::DataQualityError(
792
6.46k
                            "name_mapping must be set when read missing field id data file.");
793
6.46k
                }
794
14.8k
                auto have_mapping = false;
795
14.8k
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
796
14.8k
                    if (file_column_idx_map.contains(mapped_name)) {
797
14.8k
                        auto file_column_idx = file_column_idx_map.at(mapped_name);
798
14.8k
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
799
14.8k
                        const auto& file_field = _data_file_type_desc->getSubtype(file_column_idx);
800
14.8k
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
801
                                *table_field.field_ptr, file_field, ICEBERG_ORC_ATTRIBUTE,
802
31.8k
                                exist_field_id, field_node));
803
31.8k
                        table_info_node_ptr->add_children(
804
31.8k
                                table_column_name,
805
                                _data_file_type_desc->getFieldName(file_column_idx), field_node);
806
2.49k
                        have_mapping = true;
807
2.49k
                        break;
808
29.3k
                    }
809
                }
810
                if (!have_mapping) {
811
29.3k
                    table_info_node_ptr->add_not_exist_children(table_column_name);
812
29.3k
                }
813
16.0k
            }
814
16.0k
        }
815
2.27k
    }
816
2.27k
817
16.0k
    return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts,
818
16.0k
                                   false, tuple_descriptor, row_descriptor,
819
                                   not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts,
820
                                   table_info_node_ptr, column_ids, filter_column_ids);
821
13.3k
}
822
13.3k
823
ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type,
824
13.3k
                                                    const TupleDescriptor* tuple_descriptor) {
825
13.3k
    // map top-level table column iceberg_id -> orc::Type*
826
1.55k
    std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map;
827
1.55k
    for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) {
828
13.3k
        auto orc_sub_type = orc_type->getSubtype(i);
829
        if (!orc_sub_type) continue;
830
6.46k
831
6.46k
        if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
832
            continue;
833
        }
834
        int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
835
        iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type;
836
    }
837
838
    std::set<uint64_t> column_ids;
839
2.44k
    std::set<uint64_t> filter_column_ids;
840
2.44k
841
2.44k
    // helper to process access paths for a given top-level orc field
842
2.44k
    auto process_access_paths = [](const orc::Type* orc_field,
843
970
                                   const std::vector<TColumnAccessPath>& access_paths,
844
                                   std::set<uint64_t>& out_ids) {
845
970
        process_nested_access_paths(
846
                orc_field, access_paths, out_ids,
847
970
                [](const orc::Type* type) { return type->getColumnId(); },
848
970
                [](const orc::Type* type) { return type->getMaximumColumnId(); },
849
970
                IcebergOrcNestedColumnUtils::extract_nested_column_ids);
850
970
    };
851
970
852
    for (const auto* slot : tuple_descriptor->slots()) {
853
        auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id());
854
        if (it == iceberg_id_to_orc_type_map.end()) {
855
            // Column not found in file
856
            continue;
857
970
        }
858
970
        const orc::Type* orc_field = it->second;
859
970
860
0
        // primitive (non-nested) types
861
0
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
862
             slot->col_type() != TYPE_MAP)) {
863
970
            column_ids.insert(orc_field->getColumnId());
864
970
            if (slot->is_predicate()) {
865
970
                filter_column_ids.insert(orc_field->getColumnId());
866
            }
867
0
            continue;
868
0
        }
869
0
870
0
        // complex types
871
        const auto& all_access_paths = slot->all_access_paths();
872
970
        process_access_paths(orc_field, all_access_paths, column_ids);
873
970
874
0
        const auto& predicate_access_paths = slot->predicate_access_paths();
875
0
        if (!predicate_access_paths.empty()) {
876
            process_access_paths(orc_field, predicate_access_paths, filter_column_ids);
877
        }
878
    }
879
880
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
881
}
882
883
970
// Directly read the deletion vector using the `content_offset` and
884
970
// `content_size_in_bytes` provided by FE in `delete_file_desc`.
885
0
// These two fields indicate the location of a blob in storage.
886
0
// Since the current format is `deletion-vector-v1`, which does not
887
0
// compress any blobs, we can temporarily skip parsing the Puffin footer.
888
0
Status IcebergTableReader::read_deletion_vector(const std::string& data_file_path,
889
0
                                                const TIcebergDeleteFileDesc& delete_file_desc) {
890
    Status create_status = Status::OK();
891
970
    SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
892
970
    _iceberg_delete_rows = _kv_cache->get<DeleteRows>(data_file_path, [&]() -> DeleteRows* {
893
0
        auto* delete_rows = new DeleteRows;
894
0
895
0
        TFileRangeDesc delete_range;
896
        // must use __set() method to make sure __isset is true
897
970
        delete_range.__set_fs_name(_range.fs_name);
898
970
        delete_range.path = delete_file_desc.path;
899
970
        delete_range.start_offset = delete_file_desc.content_offset;
900
970
        delete_range.size = delete_file_desc.content_size_in_bytes;
901
970
        delete_range.file_size = -1;
902
0
903
0
        // We may consider caching the DeletionVectorReader when reading Puffin files,
904
0
        // where the underlying reader is an `InMemoryFileReader` and a single data file is
905
        // split into multiple splits. However, we need to ensure that the underlying
906
        // reader supports multi-threaded access.
907
970
        DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx);
908
5.12M
        create_status = dv_reader.open();
909
5.12M
        if (!create_status.ok()) [[unlikely]] {
910
5.12M
            return nullptr;
911
970
        }
912
970
913
970
        size_t buffer_size = delete_range.size;
914
        std::vector<char> buf(buffer_size);
915
2.44k
        if (buffer_size < 12) [[unlikely]] {
916
2.44k
            // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32
917
2.44k
            create_status = Status::DataQualityError("Deletion vector file size too small: {}",
918
2.44k
                                                     buffer_size);
919
2.44k
            return nullptr;
920
2.44k
        }
921
922
        create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size});
923
        if (!create_status) [[unlikely]] {
924
            return nullptr;
925
        }
926
670
        // The serialized blob contains:
927
670
        //
928
670
        // Combined length of the vector and magic bytes stored as 4 bytes, big-endian
929
670
        // A 4-byte magic sequence, D1 D3 39 64
930
        // The vector, serialized as described below
931
670
        // A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian
932
2.68k
933
2.01k
        auto total_length = BigEndian::Load32(buf.data());
934
2.01k
        if (total_length + 8 != buffer_size) [[unlikely]] {
935
0
            create_status = Status::DataQualityError(
936
0
                    "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8,
937
2.01k
                    buffer_size);
938
2.01k
            return nullptr;
939
2.01k
        }
940
941
1.52k
        constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
942
1.52k
        if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] {
943
            create_status = Status::DataQualityError("Deletion vector magic number mismatch");
944
1.52k
            return nullptr;
945
1.52k
        }
946
1.52k
947
1.52k
        roaring::Roaring64Map bitmap;
948
1.52k
        SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
949
        try {
950
1.52k
            bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12);
951
0
        } catch (const std::runtime_error& e) {
952
0
            create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
953
0
            return nullptr;
954
1.52k
        }
955
1.52k
        // skip CRC-32 checksum
956
2.37k
957
2.37k
        delete_rows->reserve(bitmap.cardinality());
958
2.37k
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
959
2.37k
            delete_rows->push_back(*it);
960
        }
961
1.52k
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
962
1.52k
        return delete_rows;
963
1.52k
    });
964
1.52k
965
    RETURN_IF_ERROR(create_status);
966
    if (!_iceberg_delete_rows->empty()) [[likely]] {
967
        set_delete_rows();
968
1.52k
    }
969
1.52k
    return Status::OK();
970
1.52k
}
971
1.52k
972
// Similar to the code structure of IcebergOrcReader::_process_equality_delete,
973
1.52k
// but considering the significant differences in how parquet/orc obtains
974
1.52k
// attributes/column IDs, it is not easy to combine them.
975
1.52k
Status IcebergParquetReader::_process_equality_delete(
976
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
977
1.52k
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
978
2.36k
            partition_columns;
979
2.36k
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
980
981
0
    std::map<int, const FieldSchema*> data_file_id_to_field_schema;
982
0
    for (int idx = 0; idx < _data_file_field_desc->size(); ++idx) {
983
2.36k
        auto field_schema = _data_file_field_desc->get_column(idx);
984
        if (_data_file_field_desc->get_column(idx)->field_id == -1) {
985
2.36k
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
986
0
        }
987
0
        data_file_id_to_field_schema[_data_file_field_desc->get_column(idx)->field_id] =
988
2.36k
                field_schema;
989
0
    }
990
0
991
0
    for (const auto& delete_file : delete_files) {
992
0
        TFileRangeDesc delete_desc;
993
0
        // must use __set() method to make sure __isset is true
994
2.36k
        delete_desc.__set_fs_name(_range.fs_name);
995
2.36k
        delete_desc.path = delete_file.path;
996
2.36k
        delete_desc.start_offset = 0;
997
0
        delete_desc.size = -1;
998
0
        delete_desc.file_size = -1;
999
0
1000
0
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1001
0
            return Status::InternalError(
1002
0
                    "missing delete field ids when reading equality delete file");
1003
        }
1004
2.36k
        auto& read_column_field_ids = delete_file.field_ids;
1005
2.36k
        std::set<int> read_column_field_ids_set;
1006
2.36k
        for (const auto& field_id : read_column_field_ids) {
1007
            read_column_field_ids_set.insert(field_id);
1008
2.36k
            _equality_delete_col_ids.insert(field_id);
1009
2.36k
        }
1010
2.36k
1011
        auto delete_reader = ParquetReader::create_unique(
1012
2.36k
                _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
1013
2.36k
                &_state->timezone_obj(), _io_ctx, _state, _meta_cache);
1014
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1015
0
1016
2.36k
        // the column that to read equality delete file.
1017
1.52k
        // (delete file may be have extra columns that don't need to read)
1018
0
        std::vector<std::string> delete_col_names;
1019
0
        std::vector<DataTypePtr> delete_col_types;
1020
        std::vector<int> delete_col_ids;
1021
3.88k
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1022
2.36k
1023
2.36k
        const FieldDescriptor* delete_field_desc = nullptr;
1024
1.52k
        RETURN_IF_ERROR(delete_reader->get_file_metadata_schema(&delete_field_desc));
1025
1.52k
        DCHECK(delete_field_desc != nullptr);
1026
1.52k
1027
1.52k
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1028
1.52k
        for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) {
1029
            if (delete_file_field.field_id == -1) [[unlikely]] { // missing delete_file_field id
1030
1.52k
                // equality delete file must have delete_file_field id to match column.
1031
846
                return Status::DataQualityError(
1032
846
                        "missing delete_file_field id when reading equality delete file");
1033
846
            } else if (read_column_field_ids_set.contains(delete_file_field.field_id)) {
1034
846
                // the column that need to read.
1035
846
                if (delete_file_field.children.size() > 0) [[unlikely]] { // complex column
1036
1.52k
                    return Status::InternalError(
1037
1.52k
                            "can not support read complex column in equality delete file");
1038
3.03k
                } else if (!data_file_id_to_field_schema.contains(delete_file_field.field_id))
1039
1.51k
                        [[unlikely]] {
1040
1.51k
                    return Status::DataQualityError(
1041
1.51k
                            "can not find delete field id in data file schema when reading "
1042
1.51k
                            "equality delete file");
1043
1.52k
                }
1044
1.52k
                auto data_file_field = data_file_id_to_field_schema[delete_file_field.field_id];
1045
1.52k
                if (data_file_field->data_type->get_primitive_type() !=
1046
1.52k
                    delete_file_field.data_type->get_primitive_type()) [[unlikely]] {
1047
1.51k
                    return Status::NotSupported(
1048
1.52k
                            "Not Support type change in equality delete, field: {}, delete "
1049
                            "file type: {}, data file type: {}",
1050
852
                            delete_file_field.field_id, delete_file_field.data_type->get_name(),
1051
852
                            data_file_field->data_type->get_name());
1052
852
                }
1053
852
1054
852
                std::string filed_lower_name = to_lower(delete_file_field.name);
1055
852
                eq_file_node->add_children(filed_lower_name, delete_file_field.name,
1056
852
                                           std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1057
670
1058
670
                delete_col_ids.emplace_back(delete_file_field.field_id);
1059
                delete_col_names.emplace_back(filed_lower_name);
1060
                delete_col_types.emplace_back(make_nullable(delete_file_field.data_type));
1061
670
1062
670
                read_column_field_ids_set.erase(delete_file_field.field_id);
1063
670
            } else {
1064
670
                // delete file may be have extra columns that don't need to read
1065
            }
1066
670
        }
1067
2.68k
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1068
2.01k
            return Status::DataQualityError("some field ids not found in equality delete file.");
1069
0
        }
1070
0
1071
2.01k
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1072
2.01k
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1073
2.01k
        }
1074
2.01k
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
1075
        RETURN_IF_ERROR(delete_reader->init_reader(delete_col_names, &delete_col_name_to_block_idx,
1076
1.52k
                                                   {}, tmp, nullptr, nullptr, nullptr, nullptr,
1077
1.52k
                                                   nullptr, eq_file_node, false));
1078
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1079
1.52k
1080
1.52k
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1081
1.52k
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1082
1.52k
            Block block;
1083
1.52k
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1084
            _equality_delete_blocks.emplace_back(block);
1085
1.52k
        }
1086
0
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1087
0
        bool eof = false;
1088
0
        while (!eof) {
1089
1.52k
            Block tmp_block;
1090
1.52k
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1091
2.37k
            size_t read_rows = 0;
1092
2.37k
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1093
2.37k
            if (read_rows > 0) {
1094
2.37k
                MutableBlock mutable_block(&eq_file_block);
1095
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1096
1.52k
            }
1097
1.52k
        }
1098
1.52k
    }
1099
1.52k
1100
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1101
1.52k
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1102
1.52k
        auto equality_delete_impl =
1103
1.52k
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1104
1.52k
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1105
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1106
    }
1107
    return Status::OK();
1108
1.52k
}
1109
1.52k
1110
1.52k
Status IcebergOrcReader::_process_equality_delete(
1111
1.52k
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
1112
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
1113
1.52k
            partition_columns;
1114
1.52k
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
1115
1.52k
1116
    std::map<int, int> data_file_id_to_field_idx;
1117
1.52k
    for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); ++idx) {
1118
        if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
1119
3.89k
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
1120
2.36k
        }
1121
        auto field_id = std::stoi(
1122
2.36k
                _data_file_type_desc->getSubtype(idx)->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1123
0
        data_file_id_to_field_idx[field_id] = idx;
1124
    }
1125
0
1126
0
    for (const auto& delete_file : delete_files) {
1127
2.36k
        TFileRangeDesc delete_desc;
1128
2.36k
        // must use __set() method to make sure __isset is true
1129
2.36k
        delete_desc.__set_fs_name(_range.fs_name);
1130
2.36k
        delete_desc.path = delete_file.path;
1131
        delete_desc.start_offset = 0;
1132
2.36k
        delete_desc.size = -1;
1133
0
        delete_desc.file_size = -1;
1134
0
1135
0
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1136
2.36k
            return Status::InternalError(
1137
0
                    "missing delete field ids when reading equality delete file");
1138
0
        }
1139
0
        auto& read_column_field_ids = delete_file.field_ids;
1140
0
        std::set<int> read_column_field_ids_set;
1141
        for (const auto& field_id : read_column_field_ids) {
1142
2.36k
            read_column_field_ids_set.insert(field_id);
1143
2.36k
            _equality_delete_col_ids.insert(field_id);
1144
        }
1145
2.36k
1146
0
        auto delete_reader = OrcReader::create_unique(_profile, _state, _params, delete_desc,
1147
0
                                                      READ_DELETE_FILE_BATCH_SIZE,
1148
0
                                                      _state->timezone(), _io_ctx, _meta_cache);
1149
0
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1150
0
        // delete file schema
1151
0
        std::vector<std::string> delete_file_col_names;
1152
2.36k
        std::vector<DataTypePtr> delete_file_col_types;
1153
2.36k
        RETURN_IF_ERROR(
1154
2.36k
                delete_reader->get_parsed_schema(&delete_file_col_names, &delete_file_col_types));
1155
2.36k
1156
        // the column that to read equality delete file.
1157
2.36k
        // (delete file maybe have extra columns that don't need to read)
1158
2.36k
        std::vector<std::string> delete_col_names;
1159
2.36k
        std::vector<DataTypePtr> delete_col_types;
1160
2.36k
        std::vector<int> delete_col_ids;
1161
2.36k
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1162
2.36k
1163
2.36k
        const orc::Type* delete_field_desc = nullptr;
1164
1.52k
        RETURN_IF_ERROR(delete_reader->get_file_type(&delete_field_desc));
1165
0
        DCHECK(delete_field_desc != nullptr);
1166
0
1167
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1168
3.89k
1169
2.37k
        for (size_t idx = 0; idx < delete_field_desc->getSubtypeCount(); idx++) {
1170
2.37k
            auto delete_file_field = delete_field_desc->getSubtype(idx);
1171
1172
1.52k
            if (!delete_file_field->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE))
1173
1.52k
                    [[unlikely]] { // missing delete_file_field id
1174
1.52k
                // equality delete file must have delete_file_field id to match column.
1175
1.52k
                return Status::DataQualityError(
1176
                        "missing delete_file_field id when reading equality delete file");
1177
1.52k
            } else {
1178
852
                auto delete_field_id =
1179
852
                        std::stoi(delete_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1180
852
                if (read_column_field_ids_set.contains(delete_field_id)) {
1181
852
                    // the column that need to read.
1182
852
                    if (is_complex_type(delete_file_col_types[idx]->get_primitive_type()))
1183
1.52k
                            [[unlikely]] {
1184
1.52k
                        return Status::InternalError(
1185
4.57k
                                "can not support read complex column in equality delete file.");
1186
3.04k
                    } else if (!data_file_id_to_field_idx.contains(delete_field_id)) [[unlikely]] {
1187
3.04k
                        return Status::DataQualityError(
1188
3.04k
                                "can not find delete field id in data file schema when reading "
1189
3.04k
                                "equality delete file");
1190
3.04k
                    }
1191
1.52k
1192
1.52k
                    auto data_file_field = _data_file_type_desc->getSubtype(
1193
1.52k
                            data_file_id_to_field_idx[delete_field_id]);
1194
3.04k
1195
1.52k
                    if (delete_file_field->getKind() != data_file_field->getKind()) [[unlikely]] {
1196
                        return Status::NotSupported(
1197
852
                                "Not Support type change in equality delete, field: {}, delete "
1198
852
                                "file type: {}, data file type: {}",
1199
852
                                delete_field_id, delete_file_field->getKind(),
1200
852
                                data_file_field->getKind());
1201
852
                    }
1202
852
                    std::string filed_lower_name = to_lower(delete_field_desc->getFieldName(idx));
1203
852
                    eq_file_node->add_children(
1204
670
                            filed_lower_name, delete_field_desc->getFieldName(idx),
1205
670
                            std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1206
1207
                    delete_col_ids.emplace_back(delete_field_id);
1208
                    delete_col_names.emplace_back(filed_lower_name);
1209
                    delete_col_types.emplace_back(make_nullable(delete_file_col_types[idx]));
1210
                    read_column_field_ids_set.erase(delete_field_id);
1211
                }
1212
            }
1213
        }
1214
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1215
            return Status::DataQualityError("some field ids not found in equality delete file.");
1216
        }
1217
1218
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1219
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1220
        }
1221
1222
        RETURN_IF_ERROR(delete_reader->init_reader(&delete_col_names, &delete_col_name_to_block_idx,
1223
                                                   {}, false, nullptr, nullptr, nullptr, nullptr,
1224
                                                   eq_file_node));
1225
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1226
1227
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1228
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1229
            Block block;
1230
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1231
            _equality_delete_blocks.emplace_back(block);
1232
        }
1233
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1234
        bool eof = false;
1235
        while (!eof) {
1236
            Block tmp_block;
1237
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1238
            size_t read_rows = 0;
1239
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1240
            if (read_rows > 0) {
1241
                MutableBlock mutable_block(&eq_file_block);
1242
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1243
            }
1244
        }
1245
    }
1246
1247
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1248
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1249
        auto equality_delete_impl =
1250
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1251
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1252
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1253
    }
1254
    return Status::OK();
1255
}
1256
#include "common/compile_check_end.h"
1257
} // namespace doris