Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/iceberg_reader.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/parquet_types.h>
24
#include <glog/logging.h>
25
#include <parallel_hashmap/phmap.h>
26
#include <rapidjson/document.h>
27
28
#include <algorithm>
29
#include <cstring>
30
#include <functional>
31
#include <memory>
32
#include <set>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/column/column_string.h"
41
#include "core/column/column_vector.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "core/data_type/define_primitive_type.h"
44
#include "core/data_type/primitive_type.h"
45
#include "core/string_ref.h"
46
#include "exprs/aggregate/aggregate_function.h"
47
#include "format/format_common.h"
48
#include "format/generic_reader.h"
49
#include "format/orc/vorc_reader.h"
50
#include "format/parquet/schema_desc.h"
51
#include "format/parquet/vparquet_column_chunk_reader.h"
52
#include "format/table/deletion_vector_reader.h"
53
#include "format/table/iceberg/iceberg_orc_nested_column_utils.h"
54
#include "format/table/iceberg/iceberg_parquet_nested_column_utils.h"
55
#include "format/table/nested_column_access_helper.h"
56
#include "format/table/table_format_reader.h"
57
#include "runtime/runtime_state.h"
58
#include "util/coding.h"
59
60
namespace cctz {
61
#include "common/compile_check_begin.h"
62
class time_zone;
63
} // namespace cctz
64
namespace doris {
65
class RowDescriptor;
66
class SlotDescriptor;
67
class TupleDescriptor;
68
69
namespace io {
70
struct IOContext;
71
} // namespace io
72
class VExprContext;
73
} // namespace doris
74
75
namespace doris {
76
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
77
78
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader,
79
                                       RuntimeProfile* profile, RuntimeState* state,
80
                                       const TFileScanRangeParams& params,
81
                                       const TFileRangeDesc& range, ShardedKVCache* kv_cache,
82
                                       io::IOContext* io_ctx, FileMetaCache* meta_cache)
83
14
        : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx,
84
14
                            meta_cache),
85
14
          _kv_cache(kv_cache) {
86
14
    static const char* iceberg_profile = "IcebergProfile";
87
14
    ADD_TIMER(_profile, iceberg_profile);
88
14
    _iceberg_profile.num_delete_files =
89
14
            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile);
90
14
    _iceberg_profile.num_delete_rows =
91
14
            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
92
14
    _iceberg_profile.delete_files_read_time =
93
14
            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
94
14
    _iceberg_profile.delete_rows_sort_time =
95
14
            ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
96
14
    _iceberg_profile.parse_delete_file_time =
97
14
            ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile);
98
14
}
99
100
2
Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
101
2
    RETURN_IF_ERROR(_expand_block_if_need(block));
102
2
    RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
103
104
2
    if (_equality_delete_impls.size() > 0) {
105
0
        std::unique_ptr<IColumn::Filter> filter =
106
0
                std::make_unique<IColumn::Filter>(block->rows(), 1);
107
0
        for (auto& equality_delete_impl : _equality_delete_impls) {
108
0
            RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
109
0
                    block, _col_name_to_block_idx, _id_to_block_column_name, *filter));
110
0
        }
111
0
        Block::filter_block_internal(block, *filter, block->columns());
112
0
    }
113
114
2
    *read_rows = block->rows();
115
2
    return _shrink_block_if_need(block);
116
2
}
117
118
2
Status IcebergTableReader::init_row_filters() {
119
    // We get the count value by doris's be, so we don't need to read the delete file
120
2
    if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) {
121
0
        return Status::OK();
122
0
    }
123
124
2
    const auto& table_desc = _range.table_format_params.iceberg_params;
125
2
    const auto& version = table_desc.format_version;
126
2
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
127
2
        return Status::OK();
128
2
    }
129
130
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
131
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
132
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
133
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
134
0
        if (desc.content == POSITION_DELETE) {
135
0
            position_delete_files.emplace_back(desc);
136
0
        } else if (desc.content == EQUALITY_DELETE) {
137
0
            equality_delete_files.emplace_back(desc);
138
0
        } else if (desc.content == DELETION_VECTOR) {
139
0
            deletion_vector_files.emplace_back(desc);
140
0
        }
141
0
    }
142
143
0
    if (!equality_delete_files.empty()) {
144
0
        RETURN_IF_ERROR(_process_equality_delete(equality_delete_files));
145
0
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
146
0
    }
147
148
0
    if (!deletion_vector_files.empty()) {
149
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
150
            /*
151
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
152
             * at execution time than position delete files. Unlike equality or position delete files, there can be
153
             * at most one deletion vector for a given data file in a snapshot.
154
             */
155
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
156
0
        }
157
0
        RETURN_IF_ERROR(
158
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
159
160
0
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
161
        // Readers can safely ignore position delete files if there is a DV for a data file.
162
0
    } else if (!position_delete_files.empty()) {
163
0
        RETURN_IF_ERROR(
164
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
165
0
        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
166
0
    }
167
168
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
169
0
    return Status::OK();
170
0
}
171
172
void IcebergTableReader::_generate_equality_delete_block(
173
        Block* block, const std::vector<std::string>& equality_delete_col_names,
174
0
        const std::vector<DataTypePtr>& equality_delete_col_types) {
175
0
    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
176
0
        DataTypePtr data_type = make_nullable(equality_delete_col_types[i]);
177
0
        MutableColumnPtr data_column = data_type->create_column();
178
0
        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
179
0
                                            equality_delete_col_names[i]));
180
0
    }
181
0
}
182
183
2
Status IcebergTableReader::_expand_block_if_need(Block* block) {
184
2
    std::set<std::string> names;
185
2
    auto block_names = block->get_names();
186
2
    names.insert(block_names.begin(), block_names.end());
187
2
    for (auto& col : _expand_columns) {
188
0
        col.column->assume_mutable()->clear();
189
0
        if (names.contains(col.name)) {
190
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
191
0
        }
192
0
        names.insert(col.name);
193
0
        (*_col_name_to_block_idx)[col.name] = static_cast<uint32_t>(block->columns());
194
0
        block->insert(col);
195
0
    }
196
2
    return Status::OK();
197
2
}
198
199
2
Status IcebergTableReader::_shrink_block_if_need(Block* block) {
200
2
    std::set<size_t> positions_to_erase;
201
2
    for (const std::string& expand_col : _expand_col_names) {
202
0
        if (!_col_name_to_block_idx->contains(expand_col)) {
203
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
204
0
                                         block->dump_names());
205
0
        }
206
0
        positions_to_erase.emplace((*_col_name_to_block_idx)[expand_col]);
207
0
    }
208
2
    block->erase(positions_to_erase);
209
2
    for (const std::string& expand_col : _expand_col_names) {
210
0
        _col_name_to_block_idx->erase(expand_col);
211
0
    }
212
2
    return Status::OK();
213
2
}
214
215
Status IcebergTableReader::_position_delete_base(
216
0
        const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
217
0
    std::vector<DeleteRows*> delete_rows_array;
218
0
    int64_t num_delete_rows = 0;
219
0
    for (const auto& delete_file : delete_files) {
220
0
        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
221
0
        Status create_status = Status::OK();
222
0
        auto* delete_file_cache = _kv_cache->get<DeleteFile>(
223
0
                _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* {
224
0
                    auto* position_delete = new DeleteFile;
225
0
                    TFileRangeDesc delete_file_range;
226
                    // must use __set() method to make sure __isset is true
227
0
                    delete_file_range.__set_fs_name(_range.fs_name);
228
0
                    delete_file_range.path = delete_file.path;
229
0
                    delete_file_range.start_offset = 0;
230
0
                    delete_file_range.size = -1;
231
0
                    delete_file_range.file_size = -1;
232
                    //read position delete file base on delete_file_range , generate DeleteFile , add DeleteFile to kv_cache
233
0
                    create_status = _read_position_delete_file(&delete_file_range, position_delete);
234
235
0
                    if (!create_status) {
236
0
                        return nullptr;
237
0
                    }
238
239
0
                    return position_delete;
240
0
                });
241
0
        if (create_status.is<ErrorCode::END_OF_FILE>()) {
242
0
            continue;
243
0
        } else if (!create_status.ok()) {
244
0
            return create_status;
245
0
        }
246
247
0
        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
248
0
        auto get_value = [&](const auto& v) {
249
0
            DeleteRows* row_ids = v.second.get();
250
0
            if (!row_ids->empty()) {
251
0
                delete_rows_array.emplace_back(row_ids);
252
0
                num_delete_rows += row_ids->size();
253
0
            }
254
0
        };
255
0
        delete_file_map.if_contains(data_file_path, get_value);
256
0
    }
257
    // Use a KV cache to store the delete rows corresponding to a data file path.
258
    // The Parquet/ORC reader holds a reference (pointer) to this cached entry.
259
    // This allows delete rows to be reused when a single data file is split into
260
    // multiple splits, avoiding excessive memory usage when delete rows are large.
261
0
    if (num_delete_rows > 0) {
262
0
        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
263
0
        _iceberg_delete_rows =
264
0
                _kv_cache->get<DeleteRows>(data_file_path,
265
0
                                           [&]() -> DeleteRows* {
266
0
                                               auto* data_file_position_delete = new DeleteRows;
267
0
                                               _sort_delete_rows(delete_rows_array, num_delete_rows,
268
0
                                                                 *data_file_position_delete);
269
270
0
                                               return data_file_position_delete;
271
0
                                           }
272
273
0
                );
274
0
        set_delete_rows();
275
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
276
0
    }
277
0
    return Status::OK();
278
0
}
279
280
IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
281
0
        const ColumnDictI32& file_path_column) {
282
0
    IcebergTableReader::PositionDeleteRange range;
283
0
    size_t read_rows = file_path_column.get_data().size();
284
0
    const int* code_path = file_path_column.get_data().data();
285
0
    const int* code_path_start = code_path;
286
0
    const int* code_path_end = code_path + read_rows;
287
0
    while (code_path < code_path_end) {
288
0
        int code = code_path[0];
289
0
        const int* code_end = std::upper_bound(code_path, code_path_end, code);
290
0
        range.data_file_path.emplace_back(file_path_column.get_value(code).to_string());
291
0
        range.range.emplace_back(code_path - code_path_start, code_end - code_path_start);
292
0
        code_path = code_end;
293
0
    }
294
0
    return range;
295
0
}
296
297
IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
298
0
        const ColumnString& file_path_column) {
299
0
    IcebergTableReader::PositionDeleteRange range;
300
0
    size_t read_rows = file_path_column.size();
301
0
    size_t index = 0;
302
0
    while (index < read_rows) {
303
0
        StringRef data_path = file_path_column.get_data_at(index);
304
0
        size_t left = index - 1;
305
0
        size_t right = read_rows;
306
0
        while (left + 1 != right) {
307
0
            size_t mid = left + (right - left) / 2;
308
0
            if (file_path_column.get_data_at(mid) > data_path) {
309
0
                right = mid;
310
0
            } else {
311
0
                left = mid;
312
0
            }
313
0
        }
314
0
        range.data_file_path.emplace_back(data_path.to_string());
315
0
        range.range.emplace_back(index, left + 1);
316
0
        index = left + 1;
317
0
    }
318
0
    return range;
319
0
}
320
321
/**
322
 * https://iceberg.apache.org/spec/#position-delete-files
323
 * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
324
 * Sorting by file_path allows filter pushdown by file in columnar storage formats.
325
 * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
326
 */
327
void IcebergTableReader::_sort_delete_rows(
328
        const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows,
329
0
        std::vector<int64_t>& result) {
330
0
    if (delete_rows_array.empty()) {
331
0
        return;
332
0
    }
333
0
    if (delete_rows_array.size() == 1) {
334
0
        result.resize(num_delete_rows);
335
0
        memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows);
336
0
        return;
337
0
    }
338
0
    if (delete_rows_array.size() == 2) {
339
0
        result.resize(num_delete_rows);
340
0
        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
341
0
                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
342
0
                   result.begin());
343
0
        return;
344
0
    }
345
346
0
    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
347
0
    result.resize(num_delete_rows);
348
0
    auto row_id_iter = result.begin();
349
0
    auto iter_end = result.end();
350
0
    std::vector<vec_pair> rows_array;
351
0
    for (auto* rows : delete_rows_array) {
352
0
        if (!rows->empty()) {
353
0
            rows_array.emplace_back(rows->begin(), rows->end());
354
0
        }
355
0
    }
356
0
    size_t array_size = rows_array.size();
357
0
    while (row_id_iter != iter_end) {
358
0
        int64_t min_index = 0;
359
0
        int64_t min = *rows_array[0].first;
360
0
        for (size_t i = 0; i < array_size; ++i) {
361
0
            if (*rows_array[i].first < min) {
362
0
                min_index = i;
363
0
                min = *rows_array[i].first;
364
0
            }
365
0
        }
366
0
        *row_id_iter++ = min;
367
0
        rows_array[min_index].first++;
368
0
        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
369
0
            rows_array.erase(rows_array.begin() + min_index);
370
0
            array_size--;
371
0
        }
372
0
    }
373
0
}
374
375
void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete,
376
                                                         size_t read_rows,
377
0
                                                         bool file_path_column_dictionary_coded) {
378
0
    SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
379
    // todo: maybe do not need to build name to index map every time
380
0
    auto name_to_pos_map = block.get_name_to_pos_map();
381
0
    ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
382
0
    DCHECK_EQ(path_column->size(), read_rows);
383
0
    ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column;
384
0
    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
385
0
    const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data();
386
0
    IcebergTableReader::PositionDeleteRange range;
387
0
    if (file_path_column_dictionary_coded) {
388
0
        range = _get_range(assert_cast<const ColumnDictI32&>(*path_column));
389
0
    } else {
390
0
        range = _get_range(assert_cast<const ColumnString&>(*path_column));
391
0
    }
392
0
    for (int i = 0; i < range.range.size(); ++i) {
393
0
        std::string key = range.data_file_path[i];
394
0
        auto iter = position_delete->find(key);
395
0
        DeleteRows* delete_rows;
396
0
        if (iter == position_delete->end()) {
397
0
            delete_rows = new DeleteRows;
398
0
            std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows);
399
0
            (*position_delete)[key] = std::move(delete_rows_ptr);
400
0
        } else {
401
0
            delete_rows = iter->second.get();
402
0
        }
403
0
        const int64_t* cpy_start = src_data + range.range[i].first;
404
0
        const int64_t cpy_count = range.range[i].second - range.range[i].first;
405
0
        int64_t origin_size = delete_rows->size();
406
0
        delete_rows->resize(origin_size + cpy_count);
407
0
        int64_t* dest_position = &(*delete_rows)[origin_size];
408
0
        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
409
0
    }
410
0
}
411
412
Status IcebergParquetReader::init_reader(
413
        const std::vector<std::string>& file_col_names,
414
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
415
        const VExprContextSPtrs& conjuncts,
416
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
417
                slot_id_to_predicates,
418
        const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
419
        const std::unordered_map<std::string, int>* colname_to_slot_id,
420
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
421
1
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
422
1
    _file_format = Fileformat::PARQUET;
423
1
    _col_name_to_block_idx = col_name_to_block_idx;
424
1
    auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
425
1
    RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&_data_file_field_desc));
426
1
    DCHECK(_data_file_field_desc != nullptr);
427
428
1
    auto column_id_result = _create_column_ids(_data_file_field_desc, tuple_descriptor);
429
1
    auto& column_ids = column_id_result.column_ids;
430
1
    const auto& filter_column_ids = column_id_result.filter_column_ids;
431
432
1
    RETURN_IF_ERROR(init_row_filters());
433
1
    _all_required_col_names = file_col_names;
434
435
1
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
436
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(
437
1
                tuple_descriptor, *_data_file_field_desc, table_info_node_ptr));
438
1
    } else {
439
0
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
440
441
0
        bool exist_field_id = true;
442
0
        for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
443
0
            if (_data_file_field_desc->get_column(idx)->field_id == -1) {
444
                // the data file may be from hive table migrated to iceberg, field id is missing
445
0
                exist_field_id = false;
446
0
                break;
447
0
            }
448
0
        }
449
0
        const auto& table_schema = _params.history_schema_info.front().root_field;
450
451
0
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
452
0
        if (exist_field_id) {
453
            // id -> table column name. columns that need read data file.
454
0
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
455
0
            for (const auto& table_field : table_schema.fields) {
456
0
                auto field = table_field.field_ptr;
457
0
                DCHECK(field->__isset.name);
458
0
                if (!read_col_name_set.contains(field->name)) {
459
0
                    continue;
460
0
                }
461
0
                id_to_table_field.emplace(field->id, field);
462
0
            }
463
464
0
            for (int idx = 0; idx < _data_file_field_desc->size(); idx++) {
465
0
                const auto& data_file_field = _data_file_field_desc->get_column(idx);
466
0
                auto data_file_column_id = _data_file_field_desc->get_column(idx)->field_id;
467
468
0
                if (id_to_table_field.contains(data_file_column_id)) {
469
0
                    const auto& table_field = id_to_table_field[data_file_column_id];
470
471
0
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
472
0
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
473
0
                            *table_field, *data_file_field, exist_field_id, field_node));
474
0
                    table_info_node_ptr->add_children(table_field->name, data_file_field->name,
475
0
                                                      field_node);
476
477
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
478
0
                    id_to_table_field.erase(data_file_column_id);
479
0
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
480
                    // Columns that need to be read for equality delete.
481
0
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
482
483
                    // Construct table column names that avoid duplication with current table schema.
484
                    // As the columns currently being read may have been deleted in the latest
485
                    // table structure or have undergone a series of schema changes...
486
0
                    std::string table_column_name = EQ_DELETE_PRE + data_file_field->name;
487
0
                    table_info_node_ptr->add_children(
488
0
                            table_column_name, data_file_field->name,
489
0
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
490
491
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
492
0
                    _expand_col_names.emplace_back(table_column_name);
493
0
                    auto expand_data_type = make_nullable(data_file_field->data_type);
494
0
                    _expand_columns.emplace_back(
495
0
                            ColumnWithTypeAndName {expand_data_type->create_column(),
496
0
                                                   expand_data_type, table_column_name});
497
498
0
                    _all_required_col_names.emplace_back(table_column_name);
499
0
                    column_ids.insert(data_file_field->get_column_id());
500
0
                }
501
0
            }
502
0
            for (const auto& [id, table_field] : id_to_table_field) {
503
0
                table_info_node_ptr->add_not_exist_children(table_field->name);
504
0
            }
505
0
        } else {
506
0
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
507
0
                return Status::InternalError(
508
0
                        "Can not read missing field id data file when have equality delete");
509
0
            }
510
0
            std::map<std::string, size_t> file_column_idx_map;
511
0
            for (size_t idx = 0; idx < _data_file_field_desc->size(); idx++) {
512
0
                file_column_idx_map.emplace(_data_file_field_desc->get_column(idx)->name, idx);
513
0
            }
514
515
0
            for (const auto& table_field : table_schema.fields) {
516
0
                DCHECK(table_field.__isset.field_ptr);
517
0
                DCHECK(table_field.field_ptr->__isset.name);
518
0
                const auto& table_column_name = table_field.field_ptr->name;
519
0
                if (!read_col_name_set.contains(table_column_name)) {
520
0
                    continue;
521
0
                }
522
0
                if (!table_field.field_ptr->__isset.name_mapping ||
523
0
                    table_field.field_ptr->name_mapping.size() == 0) {
524
0
                    return Status::DataQualityError(
525
0
                            "name_mapping must be set when read missing field id data file.");
526
0
                }
527
0
                bool have_mapping = false;
528
0
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
529
0
                    if (file_column_idx_map.contains(mapped_name)) {
530
0
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
531
0
                        const auto& file_field = _data_file_field_desc->get_column(
532
0
                                file_column_idx_map.at(mapped_name));
533
0
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id(
534
0
                                *table_field.field_ptr, *file_field, exist_field_id, field_node));
535
0
                        table_info_node_ptr->add_children(table_column_name, file_field->name,
536
0
                                                          field_node);
537
0
                        have_mapping = true;
538
0
                        break;
539
0
                    }
540
0
                }
541
0
                if (!have_mapping) {
542
0
                    table_info_node_ptr->add_not_exist_children(table_column_name);
543
0
                }
544
0
            }
545
0
        }
546
0
    }
547
548
1
    return parquet_reader->init_reader(
549
1
            _all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates,
550
1
            tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts,
551
1
            slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids);
552
1
}
553
554
ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
555
7
                                                        const TupleDescriptor* tuple_descriptor) {
556
    // First, assign column IDs to the field descriptor
557
7
    auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc);
558
7
    mutable_field_desc->assign_ids();
559
560
    // map top-level table column iceberg_id -> FieldSchema*
561
7
    std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map;
562
563
58
    for (int i = 0; i < field_desc->size(); ++i) {
564
51
        auto field_schema = field_desc->get_column(i);
565
51
        if (!field_schema) continue;
566
567
51
        int iceberg_id = field_schema->field_id;
568
51
        iceberg_id_to_field_schema_map[iceberg_id] = field_schema;
569
51
    }
570
571
7
    std::set<uint64_t> column_ids;
572
7
    std::set<uint64_t> filter_column_ids;
573
574
    // helper to process access paths for a given top-level parquet field
575
7
    auto process_access_paths = [](const FieldSchema* parquet_field,
576
7
                                   const std::vector<TColumnAccessPath>& access_paths,
577
14
                                   std::set<uint64_t>& out_ids) {
578
14
        process_nested_access_paths(
579
14
                parquet_field, access_paths, out_ids,
580
14
                [](const FieldSchema* field) { return field->get_column_id(); },
581
14
                [](const FieldSchema* field) { return field->get_max_column_id(); },
582
14
                IcebergParquetNestedColumnUtils::extract_nested_column_ids);
583
14
    };
584
585
15
    for (const auto* slot : tuple_descriptor->slots()) {
586
15
        auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id());
587
15
        if (it == iceberg_id_to_field_schema_map.end()) {
588
            // Column not found in file (e.g., partition column, added column)
589
0
            continue;
590
0
        }
591
15
        auto field_schema = it->second;
592
593
        // primitive (non-nested) types: direct mapping by name
594
15
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
595
15
             slot->col_type() != TYPE_MAP)) {
596
7
            column_ids.insert(field_schema->column_id);
597
598
7
            if (slot->is_predicate()) {
599
0
                filter_column_ids.insert(field_schema->column_id);
600
0
            }
601
7
            continue;
602
7
        }
603
604
        // complex types:
605
8
        const auto& all_access_paths = slot->all_access_paths();
606
8
        process_access_paths(field_schema, all_access_paths, column_ids);
607
608
8
        const auto& predicate_access_paths = slot->predicate_access_paths();
609
8
        if (!predicate_access_paths.empty()) {
610
6
            process_access_paths(field_schema, predicate_access_paths, filter_column_ids);
611
6
        }
612
8
    }
613
7
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
614
7
}
615
616
Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* delete_range,
617
0
                                                         DeleteFile* position_delete) {
618
0
    ParquetReader parquet_delete_reader(_profile, _params, *delete_range,
619
0
                                        READ_DELETE_FILE_BATCH_SIZE, &_state->timezone_obj(),
620
0
                                        _io_ctx, _state, _meta_cache);
621
0
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
622
0
    RETURN_IF_ERROR(parquet_delete_reader.init_reader(
623
0
            delete_file_col_names,
624
0
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX),
625
0
            {}, tmp, nullptr, nullptr, nullptr, nullptr, nullptr,
626
0
            TableSchemaChangeHelper::ConstNode::get_instance(), false));
627
628
0
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
629
0
            partition_columns;
630
0
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
631
0
    RETURN_IF_ERROR(parquet_delete_reader.set_fill_columns(partition_columns, missing_columns));
632
633
0
    const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data();
634
0
    bool dictionary_coded = true;
635
0
    for (const auto& row_group : meta_data->row_groups) {
636
0
        const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX];
637
0
        if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
638
0
            dictionary_coded = false;
639
0
            break;
640
0
        }
641
0
    }
642
0
    DataTypePtr data_type_file_path {new DataTypeString};
643
0
    DataTypePtr data_type_pos {new DataTypeInt64};
644
0
    bool eof = false;
645
0
    while (!eof) {
646
0
        Block block = {dictionary_coded
647
0
                               ? ColumnWithTypeAndName {ColumnDictI32::create(
648
0
                                                                FieldType::OLAP_FIELD_TYPE_VARCHAR),
649
0
                                                        data_type_file_path, ICEBERG_FILE_PATH}
650
0
                               : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH},
651
652
0
                       {data_type_pos, ICEBERG_ROW_POS}};
653
0
        size_t read_rows = 0;
654
0
        RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof));
655
656
0
        if (read_rows <= 0) {
657
0
            break;
658
0
        }
659
0
        _gen_position_delete_file_range(block, position_delete, read_rows, dictionary_coded);
660
0
    }
661
0
    return Status::OK();
662
0
};
663
664
Status IcebergOrcReader::init_reader(
665
        const std::vector<std::string>& file_col_names,
666
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
667
        const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
668
        const RowDescriptor* row_descriptor,
669
        const std::unordered_map<std::string, int>* colname_to_slot_id,
670
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
671
1
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
672
1
    _file_format = Fileformat::ORC;
673
1
    _col_name_to_block_idx = col_name_to_block_idx;
674
1
    auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
675
1
    RETURN_IF_ERROR(orc_reader->get_file_type(&_data_file_type_desc));
676
1
    std::vector<std::string> data_file_col_names;
677
1
    std::vector<DataTypePtr> data_file_col_types;
678
1
    RETURN_IF_ERROR(orc_reader->get_parsed_schema(&data_file_col_names, &data_file_col_types));
679
680
1
    auto column_id_result = _create_column_ids(_data_file_type_desc, tuple_descriptor);
681
1
    auto& column_ids = column_id_result.column_ids;
682
1
    const auto& filter_column_ids = column_id_result.filter_column_ids;
683
684
1
    RETURN_IF_ERROR(init_row_filters());
685
686
1
    _all_required_col_names = file_col_names;
687
1
    if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] {
688
1
        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, _data_file_type_desc,
689
1
                                                        table_info_node_ptr));
690
1
    } else {
691
0
        std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end());
692
693
0
        bool exist_field_id = true;
694
0
        for (size_t idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
695
0
            if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
696
0
                exist_field_id = false;
697
0
                break;
698
0
            }
699
0
        }
700
701
0
        const auto& table_schema = _params.history_schema_info.front().root_field;
702
0
        table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>();
703
0
        if (exist_field_id) {
704
            // id -> table column name. columns that need read data file.
705
0
            std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field;
706
0
            for (const auto& table_field : table_schema.fields) {
707
0
                auto field = table_field.field_ptr;
708
0
                DCHECK(field->__isset.name);
709
0
                if (!read_col_name_set.contains(field->name)) {
710
0
                    continue;
711
0
                }
712
713
0
                id_to_table_field.emplace(field->id, field);
714
0
            }
715
716
0
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
717
0
                const auto& data_file_field = _data_file_type_desc->getSubtype(idx);
718
0
                auto data_file_column_id =
719
0
                        std::stoi(data_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
720
0
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
721
722
0
                if (id_to_table_field.contains(data_file_column_id)) {
723
0
                    const auto& table_field = id_to_table_field[data_file_column_id];
724
725
0
                    std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
726
0
                    RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
727
0
                            *table_field, data_file_field, ICEBERG_ORC_ATTRIBUTE, exist_field_id,
728
0
                            field_node));
729
0
                    table_info_node_ptr->add_children(table_field->name, file_column_name,
730
0
                                                      field_node);
731
732
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_field->name);
733
0
                    id_to_table_field.erase(data_file_column_id);
734
0
                } else if (_equality_delete_col_ids.contains(data_file_column_id)) {
735
                    // Columns that need to be read for equality delete.
736
0
                    const static std::string EQ_DELETE_PRE = "__equality_delete_column__";
737
738
                    // Construct table column names that avoid duplication with current table schema.
739
                    // As the columns currently being read may have been deleted in the latest
740
                    // table structure or have undergone a series of schema changes...
741
0
                    std::string table_column_name = EQ_DELETE_PRE + file_column_name;
742
0
                    table_info_node_ptr->add_children(
743
0
                            table_column_name, file_column_name,
744
0
                            std::make_shared<TableSchemaChangeHelper::ConstNode>());
745
746
0
                    _id_to_block_column_name.emplace(data_file_column_id, table_column_name);
747
0
                    _expand_col_names.emplace_back(table_column_name);
748
749
0
                    auto expand_data_type = make_nullable(data_file_col_types[idx]);
750
0
                    _expand_columns.emplace_back(
751
0
                            ColumnWithTypeAndName {expand_data_type->create_column(),
752
0
                                                   expand_data_type, table_column_name});
753
754
0
                    _all_required_col_names.emplace_back(table_column_name);
755
0
                    column_ids.insert(data_file_field->getColumnId());
756
0
                }
757
0
            }
758
0
            for (const auto& [id, table_field] : id_to_table_field) {
759
0
                table_info_node_ptr->add_not_exist_children(table_field->name);
760
0
            }
761
0
        } else {
762
0
            if (!_equality_delete_col_ids.empty()) [[unlikely]] {
763
0
                return Status::InternalError(
764
0
                        "Can not read missing field id data file when have equality delete");
765
0
            }
766
0
            std::map<std::string, size_t> file_column_idx_map;
767
0
            for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) {
768
0
                auto const& file_column_name = _data_file_type_desc->getFieldName(idx);
769
0
                file_column_idx_map.emplace(file_column_name, idx);
770
0
            }
771
772
0
            for (const auto& table_field : table_schema.fields) {
773
0
                DCHECK(table_field.__isset.field_ptr);
774
0
                DCHECK(table_field.field_ptr->__isset.name);
775
0
                const auto& table_column_name = table_field.field_ptr->name;
776
0
                if (!read_col_name_set.contains(table_column_name)) {
777
0
                    continue;
778
0
                }
779
0
                if (!table_field.field_ptr->__isset.name_mapping ||
780
0
                    table_field.field_ptr->name_mapping.size() == 0) {
781
0
                    return Status::DataQualityError(
782
0
                            "name_mapping must be set when read missing field id data file.");
783
0
                }
784
0
                auto have_mapping = false;
785
0
                for (const auto& mapped_name : table_field.field_ptr->name_mapping) {
786
0
                    if (file_column_idx_map.contains(mapped_name)) {
787
0
                        auto file_column_idx = file_column_idx_map.at(mapped_name);
788
0
                        std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr;
789
0
                        const auto& file_field = _data_file_type_desc->getSubtype(file_column_idx);
790
0
                        RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id(
791
0
                                *table_field.field_ptr, file_field, ICEBERG_ORC_ATTRIBUTE,
792
0
                                exist_field_id, field_node));
793
0
                        table_info_node_ptr->add_children(
794
0
                                table_column_name,
795
0
                                _data_file_type_desc->getFieldName(file_column_idx), field_node);
796
0
                        have_mapping = true;
797
0
                        break;
798
0
                    }
799
0
                }
800
0
                if (!have_mapping) {
801
0
                    table_info_node_ptr->add_not_exist_children(table_column_name);
802
0
                }
803
0
            }
804
0
        }
805
0
    }
806
807
1
    return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts,
808
1
                                   false, tuple_descriptor, row_descriptor,
809
1
                                   not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts,
810
1
                                   table_info_node_ptr, column_ids, filter_column_ids);
811
1
}
812
813
ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type,
814
7
                                                    const TupleDescriptor* tuple_descriptor) {
815
    // map top-level table column iceberg_id -> orc::Type*
816
7
    std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map;
817
58
    for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) {
818
51
        auto orc_sub_type = orc_type->getSubtype(i);
819
51
        if (!orc_sub_type) continue;
820
821
51
        if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
822
0
            continue;
823
0
        }
824
51
        int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
825
51
        iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type;
826
51
    }
827
828
7
    std::set<uint64_t> column_ids;
829
7
    std::set<uint64_t> filter_column_ids;
830
831
    // helper to process access paths for a given top-level orc field
832
7
    auto process_access_paths = [](const orc::Type* orc_field,
833
7
                                   const std::vector<TColumnAccessPath>& access_paths,
834
14
                                   std::set<uint64_t>& out_ids) {
835
14
        process_nested_access_paths(
836
14
                orc_field, access_paths, out_ids,
837
14
                [](const orc::Type* type) { return type->getColumnId(); },
838
14
                [](const orc::Type* type) { return type->getMaximumColumnId(); },
839
14
                IcebergOrcNestedColumnUtils::extract_nested_column_ids);
840
14
    };
841
842
15
    for (const auto* slot : tuple_descriptor->slots()) {
843
15
        auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id());
844
15
        if (it == iceberg_id_to_orc_type_map.end()) {
845
            // Column not found in file
846
0
            continue;
847
0
        }
848
15
        const orc::Type* orc_field = it->second;
849
850
        // primitive (non-nested) types
851
15
        if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY &&
852
15
             slot->col_type() != TYPE_MAP)) {
853
7
            column_ids.insert(orc_field->getColumnId());
854
7
            if (slot->is_predicate()) {
855
0
                filter_column_ids.insert(orc_field->getColumnId());
856
0
            }
857
7
            continue;
858
7
        }
859
860
        // complex types
861
8
        const auto& all_access_paths = slot->all_access_paths();
862
8
        process_access_paths(orc_field, all_access_paths, column_ids);
863
864
8
        const auto& predicate_access_paths = slot->predicate_access_paths();
865
8
        if (!predicate_access_paths.empty()) {
866
6
            process_access_paths(orc_field, predicate_access_paths, filter_column_ids);
867
6
        }
868
8
    }
869
870
7
    return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids));
871
7
}
872
873
Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
874
0
                                                    DeleteFile* position_delete) {
875
0
    OrcReader orc_delete_reader(_profile, _state, _params, *delete_range,
876
0
                                READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx,
877
0
                                _meta_cache);
878
0
    RETURN_IF_ERROR(orc_delete_reader.init_reader(
879
0
            &delete_file_col_names,
880
0
            const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX),
881
0
            {}, false, {}, {}, nullptr, nullptr));
882
883
0
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
884
0
            partition_columns;
885
0
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
886
0
    RETURN_IF_ERROR(orc_delete_reader.set_fill_columns(partition_columns, missing_columns));
887
888
0
    bool eof = false;
889
0
    DataTypePtr data_type_file_path {new DataTypeString};
890
0
    DataTypePtr data_type_pos {new DataTypeInt64};
891
0
    while (!eof) {
892
0
        Block block = {{data_type_file_path, ICEBERG_FILE_PATH}, {data_type_pos, ICEBERG_ROW_POS}};
893
894
0
        size_t read_rows = 0;
895
0
        RETURN_IF_ERROR(orc_delete_reader.get_next_block(&block, &read_rows, &eof));
896
897
0
        _gen_position_delete_file_range(block, position_delete, read_rows, false);
898
0
    }
899
0
    return Status::OK();
900
0
}
901
902
// Directly read the deletion vector using the `content_offset` and
903
// `content_size_in_bytes` provided by FE in `delete_file_desc`.
904
// These two fields indicate the location of a blob in storage.
905
// Since the current format is `deletion-vector-v1`, which does not
906
// compress any blobs, we can temporarily skip parsing the Puffin footer.
907
Status IcebergTableReader::read_deletion_vector(const std::string& data_file_path,
908
0
                                                const TIcebergDeleteFileDesc& delete_file_desc) {
909
0
    Status create_status = Status::OK();
910
0
    SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
911
0
    _iceberg_delete_rows = _kv_cache->get<DeleteRows>(data_file_path, [&]() -> DeleteRows* {
912
0
        auto* delete_rows = new DeleteRows;
913
914
0
        TFileRangeDesc delete_range;
915
        // must use __set() method to make sure __isset is true
916
0
        delete_range.__set_fs_name(_range.fs_name);
917
0
        delete_range.path = delete_file_desc.path;
918
0
        delete_range.start_offset = delete_file_desc.content_offset;
919
0
        delete_range.size = delete_file_desc.content_size_in_bytes;
920
0
        delete_range.file_size = -1;
921
922
        // We may consider caching the DeletionVectorReader when reading Puffin files,
923
        // where the underlying reader is an `InMemoryFileReader` and a single data file is
924
        // split into multiple splits. However, we need to ensure that the underlying
925
        // reader supports multi-threaded access.
926
0
        DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx);
927
0
        create_status = dv_reader.open();
928
0
        if (!create_status.ok()) [[unlikely]] {
929
0
            return nullptr;
930
0
        }
931
932
0
        size_t buffer_size = delete_range.size;
933
0
        std::vector<char> buf(buffer_size);
934
0
        if (buffer_size < 12) [[unlikely]] {
935
            // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32
936
0
            create_status = Status::DataQualityError("Deletion vector file size too small: {}",
937
0
                                                     buffer_size);
938
0
            return nullptr;
939
0
        }
940
941
0
        create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size});
942
0
        if (!create_status) [[unlikely]] {
943
0
            return nullptr;
944
0
        }
945
        // The serialized blob contains:
946
        //
947
        // Combined length of the vector and magic bytes stored as 4 bytes, big-endian
948
        // A 4-byte magic sequence, D1 D3 39 64
949
        // The vector, serialized as described below
950
        // A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian
951
952
0
        auto total_length = BigEndian::Load32(buf.data());
953
0
        if (total_length + 8 != buffer_size) [[unlikely]] {
954
0
            create_status = Status::DataQualityError(
955
0
                    "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8,
956
0
                    buffer_size);
957
0
            return nullptr;
958
0
        }
959
960
0
        constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
961
0
        if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] {
962
0
            create_status = Status::DataQualityError("Deletion vector magic number mismatch");
963
0
            return nullptr;
964
0
        }
965
966
0
        roaring::Roaring64Map bitmap;
967
0
        SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
968
0
        try {
969
0
            bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12);
970
0
        } catch (const std::runtime_error& e) {
971
0
            create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
972
0
            return nullptr;
973
0
        }
974
        // skip CRC-32 checksum
975
976
0
        delete_rows->reserve(bitmap.cardinality());
977
0
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
978
0
            delete_rows->push_back(*it);
979
0
        }
980
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
981
0
        return delete_rows;
982
0
    });
983
984
0
    RETURN_IF_ERROR(create_status);
985
0
    if (!_iceberg_delete_rows->empty()) [[likely]] {
986
0
        set_delete_rows();
987
0
    }
988
0
    return Status::OK();
989
0
}
990
991
// Similar to the code structure of IcebergOrcReader::_process_equality_delete,
992
// but considering the significant differences in how parquet/orc obtains
993
// attributes/column IDs, it is not easy to combine them.
994
Status IcebergParquetReader::_process_equality_delete(
995
0
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
996
0
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
997
0
            partition_columns;
998
0
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
999
1000
0
    std::map<int, const FieldSchema*> data_file_id_to_field_schema;
1001
0
    for (int idx = 0; idx < _data_file_field_desc->size(); ++idx) {
1002
0
        auto field_schema = _data_file_field_desc->get_column(idx);
1003
0
        if (_data_file_field_desc->get_column(idx)->field_id == -1) {
1004
0
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
1005
0
        }
1006
0
        data_file_id_to_field_schema[_data_file_field_desc->get_column(idx)->field_id] =
1007
0
                field_schema;
1008
0
    }
1009
1010
0
    for (const auto& delete_file : delete_files) {
1011
0
        TFileRangeDesc delete_desc;
1012
        // must use __set() method to make sure __isset is true
1013
0
        delete_desc.__set_fs_name(_range.fs_name);
1014
0
        delete_desc.path = delete_file.path;
1015
0
        delete_desc.start_offset = 0;
1016
0
        delete_desc.size = -1;
1017
0
        delete_desc.file_size = -1;
1018
1019
0
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1020
0
            return Status::InternalError(
1021
0
                    "missing delete field ids when reading equality delete file");
1022
0
        }
1023
0
        auto& read_column_field_ids = delete_file.field_ids;
1024
0
        std::set<int> read_column_field_ids_set;
1025
0
        for (const auto& field_id : read_column_field_ids) {
1026
0
            read_column_field_ids_set.insert(field_id);
1027
0
            _equality_delete_col_ids.insert(field_id);
1028
0
        }
1029
1030
0
        auto delete_reader = ParquetReader::create_unique(
1031
0
                _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
1032
0
                &_state->timezone_obj(), _io_ctx, _state, _meta_cache);
1033
0
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1034
1035
        // the column that to read equality delete file.
1036
        // (delete file may be have extra columns that don't need to read)
1037
0
        std::vector<std::string> delete_col_names;
1038
0
        std::vector<DataTypePtr> delete_col_types;
1039
0
        std::vector<int> delete_col_ids;
1040
0
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1041
1042
0
        const FieldDescriptor* delete_field_desc = nullptr;
1043
0
        RETURN_IF_ERROR(delete_reader->get_file_metadata_schema(&delete_field_desc));
1044
0
        DCHECK(delete_field_desc != nullptr);
1045
1046
0
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1047
0
        for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) {
1048
0
            if (delete_file_field.field_id == -1) [[unlikely]] { // missing delete_file_field id
1049
                // equality delete file must have delete_file_field id to match column.
1050
0
                return Status::DataQualityError(
1051
0
                        "missing delete_file_field id when reading equality delete file");
1052
0
            } else if (read_column_field_ids_set.contains(delete_file_field.field_id)) {
1053
                // the column that need to read.
1054
0
                if (delete_file_field.children.size() > 0) [[unlikely]] { // complex column
1055
0
                    return Status::InternalError(
1056
0
                            "can not support read complex column in equality delete file");
1057
0
                } else if (!data_file_id_to_field_schema.contains(delete_file_field.field_id))
1058
0
                        [[unlikely]] {
1059
0
                    return Status::DataQualityError(
1060
0
                            "can not find delete field id in data file schema when reading "
1061
0
                            "equality delete file");
1062
0
                }
1063
0
                auto data_file_field = data_file_id_to_field_schema[delete_file_field.field_id];
1064
0
                if (data_file_field->data_type->get_primitive_type() !=
1065
0
                    delete_file_field.data_type->get_primitive_type()) [[unlikely]] {
1066
0
                    return Status::NotSupported(
1067
0
                            "Not Support type change in equality delete, field: {}, delete "
1068
0
                            "file type: {}, data file type: {}",
1069
0
                            delete_file_field.field_id, delete_file_field.data_type->get_name(),
1070
0
                            data_file_field->data_type->get_name());
1071
0
                }
1072
1073
0
                std::string filed_lower_name = to_lower(delete_file_field.name);
1074
0
                eq_file_node->add_children(filed_lower_name, delete_file_field.name,
1075
0
                                           std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1076
1077
0
                delete_col_ids.emplace_back(delete_file_field.field_id);
1078
0
                delete_col_names.emplace_back(filed_lower_name);
1079
0
                delete_col_types.emplace_back(make_nullable(delete_file_field.data_type));
1080
1081
0
                read_column_field_ids_set.erase(delete_file_field.field_id);
1082
0
            } else {
1083
                // delete file may be have extra columns that don't need to read
1084
0
            }
1085
0
        }
1086
0
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1087
0
            return Status::DataQualityError("some field ids not found in equality delete file.");
1088
0
        }
1089
1090
0
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1091
0
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1092
0
        }
1093
0
        phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
1094
0
        RETURN_IF_ERROR(delete_reader->init_reader(delete_col_names, &delete_col_name_to_block_idx,
1095
0
                                                   {}, tmp, nullptr, nullptr, nullptr, nullptr,
1096
0
                                                   nullptr, eq_file_node, false));
1097
0
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1098
1099
0
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1100
0
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1101
0
            Block block;
1102
0
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1103
0
            _equality_delete_blocks.emplace_back(block);
1104
0
        }
1105
0
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1106
0
        bool eof = false;
1107
0
        while (!eof) {
1108
0
            Block tmp_block;
1109
0
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1110
0
            size_t read_rows = 0;
1111
0
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1112
0
            if (read_rows > 0) {
1113
0
                MutableBlock mutable_block(&eq_file_block);
1114
0
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1115
0
            }
1116
0
        }
1117
0
    }
1118
1119
0
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1120
0
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1121
0
        auto equality_delete_impl =
1122
0
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1123
0
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1124
0
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1125
0
    }
1126
0
    return Status::OK();
1127
0
}
1128
1129
Status IcebergOrcReader::_process_equality_delete(
1130
0
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
1131
0
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
1132
0
            partition_columns;
1133
0
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
1134
1135
0
    std::map<int, int> data_file_id_to_field_idx;
1136
0
    for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); ++idx) {
1137
0
        if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) {
1138
0
            return Status::DataQualityError("Iceberg equality delete data file missing field id.");
1139
0
        }
1140
0
        auto field_id = std::stoi(
1141
0
                _data_file_type_desc->getSubtype(idx)->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1142
0
        data_file_id_to_field_idx[field_id] = idx;
1143
0
    }
1144
1145
0
    for (const auto& delete_file : delete_files) {
1146
0
        TFileRangeDesc delete_desc;
1147
        // must use __set() method to make sure __isset is true
1148
0
        delete_desc.__set_fs_name(_range.fs_name);
1149
0
        delete_desc.path = delete_file.path;
1150
0
        delete_desc.start_offset = 0;
1151
0
        delete_desc.size = -1;
1152
0
        delete_desc.file_size = -1;
1153
1154
0
        if (!delete_file.__isset.field_ids) [[unlikely]] {
1155
0
            return Status::InternalError(
1156
0
                    "missing delete field ids when reading equality delete file");
1157
0
        }
1158
0
        auto& read_column_field_ids = delete_file.field_ids;
1159
0
        std::set<int> read_column_field_ids_set;
1160
0
        for (const auto& field_id : read_column_field_ids) {
1161
0
            read_column_field_ids_set.insert(field_id);
1162
0
            _equality_delete_col_ids.insert(field_id);
1163
0
        }
1164
1165
0
        auto delete_reader = OrcReader::create_unique(_profile, _state, _params, delete_desc,
1166
0
                                                      READ_DELETE_FILE_BATCH_SIZE,
1167
0
                                                      _state->timezone(), _io_ctx, _meta_cache);
1168
0
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
1169
        // delete file schema
1170
0
        std::vector<std::string> delete_file_col_names;
1171
0
        std::vector<DataTypePtr> delete_file_col_types;
1172
0
        RETURN_IF_ERROR(
1173
0
                delete_reader->get_parsed_schema(&delete_file_col_names, &delete_file_col_types));
1174
1175
        // the column that to read equality delete file.
1176
        // (delete file maybe have extra columns that don't need to read)
1177
0
        std::vector<std::string> delete_col_names;
1178
0
        std::vector<DataTypePtr> delete_col_types;
1179
0
        std::vector<int> delete_col_ids;
1180
0
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
1181
1182
0
        const orc::Type* delete_field_desc = nullptr;
1183
0
        RETURN_IF_ERROR(delete_reader->get_file_type(&delete_field_desc));
1184
0
        DCHECK(delete_field_desc != nullptr);
1185
1186
0
        auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1187
1188
0
        for (size_t idx = 0; idx < delete_field_desc->getSubtypeCount(); idx++) {
1189
0
            auto delete_file_field = delete_field_desc->getSubtype(idx);
1190
1191
0
            if (!delete_file_field->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE))
1192
0
                    [[unlikely]] { // missing delete_file_field id
1193
                // equality delete file must have delete_file_field id to match column.
1194
0
                return Status::DataQualityError(
1195
0
                        "missing delete_file_field id when reading equality delete file");
1196
0
            } else {
1197
0
                auto delete_field_id =
1198
0
                        std::stoi(delete_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE));
1199
0
                if (read_column_field_ids_set.contains(delete_field_id)) {
1200
                    // the column that need to read.
1201
0
                    if (is_complex_type(delete_file_col_types[idx]->get_primitive_type()))
1202
0
                            [[unlikely]] {
1203
0
                        return Status::InternalError(
1204
0
                                "can not support read complex column in equality delete file.");
1205
0
                    } else if (!data_file_id_to_field_idx.contains(delete_field_id)) [[unlikely]] {
1206
0
                        return Status::DataQualityError(
1207
0
                                "can not find delete field id in data file schema when reading "
1208
0
                                "equality delete file");
1209
0
                    }
1210
1211
0
                    auto data_file_field = _data_file_type_desc->getSubtype(
1212
0
                            data_file_id_to_field_idx[delete_field_id]);
1213
1214
0
                    if (delete_file_field->getKind() != data_file_field->getKind()) [[unlikely]] {
1215
0
                        return Status::NotSupported(
1216
0
                                "Not Support type change in equality delete, field: {}, delete "
1217
0
                                "file type: {}, data file type: {}",
1218
0
                                delete_field_id, delete_file_field->getKind(),
1219
0
                                data_file_field->getKind());
1220
0
                    }
1221
0
                    std::string filed_lower_name = to_lower(delete_field_desc->getFieldName(idx));
1222
0
                    eq_file_node->add_children(
1223
0
                            filed_lower_name, delete_field_desc->getFieldName(idx),
1224
0
                            std::make_shared<TableSchemaChangeHelper::ScalarNode>());
1225
1226
0
                    delete_col_ids.emplace_back(delete_field_id);
1227
0
                    delete_col_names.emplace_back(filed_lower_name);
1228
0
                    delete_col_types.emplace_back(make_nullable(delete_file_col_types[idx]));
1229
0
                    read_column_field_ids_set.erase(delete_field_id);
1230
0
                }
1231
0
            }
1232
0
        }
1233
0
        if (!read_column_field_ids_set.empty()) [[unlikely]] {
1234
0
            return Status::DataQualityError("some field ids not found in equality delete file.");
1235
0
        }
1236
1237
0
        for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
1238
0
            delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
1239
0
        }
1240
1241
0
        RETURN_IF_ERROR(delete_reader->init_reader(&delete_col_names, &delete_col_name_to_block_idx,
1242
0
                                                   {}, false, nullptr, nullptr, nullptr, nullptr,
1243
0
                                                   eq_file_node));
1244
0
        RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns));
1245
1246
0
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
1247
0
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
1248
0
            Block block;
1249
0
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
1250
0
            _equality_delete_blocks.emplace_back(block);
1251
0
        }
1252
0
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
1253
0
        bool eof = false;
1254
0
        while (!eof) {
1255
0
            Block tmp_block;
1256
0
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
1257
0
            size_t read_rows = 0;
1258
0
            RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof));
1259
0
            if (read_rows > 0) {
1260
0
                MutableBlock mutable_block(&eq_file_block);
1261
0
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
1262
0
            }
1263
0
        }
1264
0
    }
1265
1266
0
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
1267
0
        auto& eq_file_block = _equality_delete_blocks[block_idx];
1268
0
        auto equality_delete_impl =
1269
0
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
1270
0
        RETURN_IF_ERROR(equality_delete_impl->init(_profile));
1271
0
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
1272
0
    }
1273
0
    return Status::OK();
1274
0
}
1275
#include "common/compile_check_end.h"
1276
} // namespace doris