Coverage Report

Created: 2026-06-27 16:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table/iceberg_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <memory>
21
#include <optional>
22
#include <string>
23
#include <vector>
24
25
#include "common/status.h"
26
#include "core/block/block.h"
27
#include "format/table/iceberg_delete_file_reader_helper.h"
28
#include "format_v2/file_reader.h"
29
#include "format_v2/table_reader.h"
30
#include "gen_cpp/PlanNodes_types.h"
31
32
namespace doris {
33
class Block;
34
struct DeleteFileDesc;
35
namespace io {
36
struct FileDescription;
37
struct FileSystemProperties;
38
} // namespace io
39
} // namespace doris
40
41
namespace doris::format::iceberg {
42
43
// Iceberg table-level reader.
44
// It reuses TableReader for split orchestration, dynamic partition pruning and table-block
45
// finalization, while composing a FileReader for physical data-file reads instead of inheriting
46
// from a concrete file-format reader.
47
class IcebergTableReader : public format::TableReader {
48
public:
49
25
    ~IcebergTableReader() override = default;
50
21
    Status init(format::TableReadOptions&& options) override {
51
21
        RETURN_IF_ERROR(format::TableReader::init(std::move(options)));
52
21
        _mapper_options.mode = format::TableColumnMappingMode::BY_FIELD_ID;
53
21
        return Status::OK();
54
21
    }
55
56
    Status prepare_split(const format::SplitReadOptions& options) override;
57
    std::string debug_string() const override;
58
21
    format::TableColumnMappingMode mapping_mode() const override {
59
21
        return !_data_reader.file_schema.empty() && _has_field_id(_data_reader.file_schema)
60
21
                       ? format::TableColumnMappingMode::BY_FIELD_ID
61
21
                       : format::TableColumnMappingMode::BY_NAME;
62
21
    }
63
64
protected:
65
    Status materialize_virtual_columns(Block* table_block) override;
66
67
    Status customize_file_scan_request(format::FileScanRequest* file_request) override;
68
69
    bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const override;
70
71
    Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc,
72
                                       bool* has_delete_file) override;
73
74
    Status _init_delete_predicates(const TTableFormatFileDesc& t_desc);
75
76
private:
77
81
    bool _has_field_id(const std::vector<format::ColumnDefinition>& schema) const {
78
81
        for (const auto& field : schema) {
79
            // TopN lazy materialization asks the file reader to synthesize GLOBAL_ROWID in the
80
            // first-phase scan. That virtual column is not an Iceberg data field and therefore has
81
            // no Iceberg field id. Do not let it downgrade schema-evolution reads to BY_NAME,
82
            // otherwise old data files whose physical names predate a rename (for example,
83
            // table column `new_new_id` stored as file column `id`) are materialized as defaults.
84
62
            if (field.column_type != format::ColumnType::DATA_COLUMN) {
85
1
                continue;
86
1
            }
87
61
            if (!field.has_identifier_field_id()) {
88
1
                return false;
89
1
            }
90
60
            if (!_has_field_id(field.children)) {
91
0
                return false;
92
0
            }
93
60
        }
94
80
        return true;
95
81
    }
96
    static constexpr int MIN_SUPPORT_DELETE_FILES_VERSION = 2;
97
    static constexpr int POSITION_DELETE = 1;
98
    static constexpr int EQUALITY_DELETE = 2;
99
    static constexpr int DELETION_VECTOR = 3;
100
101
    struct RowLineageColumns {
102
        int64_t first_row_id = -1;
103
        int64_t last_updated_sequence_number = -1;
104
    };
105
106
    static constexpr const char* ICEBERG_FILE_PATH = "file_path";
107
    static constexpr const char* ICEBERG_ROW_POS = "pos";
108
    static constexpr size_t ICEBERG_FILE_PATH_BLOCK_POSITION = 0;
109
    static constexpr size_t ICEBERG_ROW_POS_BLOCK_POSITION = 1;
110
111
    class PositionDeleteRowsCollector final {
112
    public:
113
        PositionDeleteRowsCollector(std::string data_file_path, format::DeleteRows* rows);
114
115
        Status collect(const Block& block, size_t read_rows);
116
117
    private:
118
        std::string _data_file_path;
119
        format::DeleteRows* _rows = nullptr;
120
    };
121
122
    static std::string _iceberg_delete_vector_cache_key(const TIcebergDeleteFileDesc& delete_file);
123
124
    static std::shared_ptr<io::FileSystemProperties> _delete_file_system_properties(
125
            const TFileScanRangeParams& scan_params);
126
127
    static std::unique_ptr<io::FileDescription> _delete_file_description(
128
            const TFileRangeDesc& range);
129
130
    std::string _data_file_path() const;
131
132
    // Append row position column to file scan request for position delete handling.
133
    Status _append_row_position_output_column(format::FileScanRequest* request);
134
    // Append equality delete predicates to file scan request based on the delete files in iceberg
135
    // params. DeleteVector and position delete files use the common DeleteRows path in TableReader.
136
    Status _append_equality_delete_predicates(format::FileScanRequest* request);
137
138
    Status _init_equality_delete_predicates(
139
            const std::vector<TIcebergDeleteFileDesc>& delete_files);
140
141
    // Read equality/position delete files.
142
    Status _read_parquet_equality_delete_file(const TIcebergDeleteFileDesc& delete_file,
143
                                              const TFileScanRangeParams& scan_params,
144
                                              IcebergDeleteFileIOContext* delete_io_ctx);
145
    Status _read_parquet_position_delete_file(const TIcebergDeleteFileDesc& delete_file,
146
                                              const TFileScanRangeParams& scan_params,
147
                                              IcebergDeleteFileIOContext* delete_io_ctx,
148
                                              PositionDeleteRowsCollector* collector);
149
150
    // Read position delete files and collect deleted row positions to update DeletePredicate.
151
    Status _init_position_delete_rows(const std::vector<TIcebergDeleteFileDesc>& delete_files);
152
153
    // Materialize row lineage virtual columns based on the position delete file.
154
    Status _materialize_iceberg_rowid(Block* table_block, size_t column_idx);
155
    Status _materialize_row_lineage_row_id(Block* table_block, size_t column_idx);
156
    Status _materialize_row_lineage_last_updated_sequence_number(Block* table_block,
157
                                                                 size_t column_idx);
158
159
    RowLineageColumns _row_lineage_columns;
160
    size_t _row_position_block_position = 0;
161
    std::optional<TIcebergFileDesc> _iceberg_params;
162
    bool _delete_predicates_initialized = false;
163
    format::DeleteRows _position_delete_rows_storage;
164
    struct EqualityDeleteFilter {
165
        std::vector<int> field_ids;
166
        std::vector<DataTypePtr> key_types;
167
        Block delete_block;
168
    };
169
    std::vector<EqualityDeleteFilter> _equality_delete_filters;
170
171
    bool _need_row_lineage_row_id() const;
172
    bool _need_iceberg_rowid() const;
173
};
174
175
} // namespace doris::format::iceberg