Coverage Report

Created: 2026-06-09 13:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table/iceberg_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <memory>
21
#include <string>
22
#include <vector>
23
24
#include "common/status.h"
25
#include "core/block/block.h"
26
#include "format_v2/file_reader.h"
27
#include "format_v2/table_reader.h"
28
#include "format/table/iceberg_delete_file_reader_helper.h"
29
#include "gen_cpp/PlanNodes_types.h"
30
31
namespace doris {
32
class Block;
33
struct DeleteFileDesc;
34
namespace io {
35
struct FileDescription;
36
struct FileSystemProperties;
37
} // namespace io
38
} // namespace doris
39
40
namespace doris::iceberg {
41
42
// Iceberg table-level reader。
43
// 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合
44
// FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
45
class IcebergTableReader : public format::TableReader {
46
public:
47
0
    ~IcebergTableReader() override = default;
48
0
    Status init(format::TableReadOptions&& options) override {
49
0
        RETURN_IF_ERROR(format::TableReader::init(std::move(options)));
50
0
        _mapper_options.mode = format::TableColumnMappingMode::BY_FIELD_ID;
51
0
        return Status::OK();
52
0
    }
53
54
    Status prepare_split(const format::SplitReadOptions& options) override;
55
0
    format::TableColumnMappingMode mapping_mode() const override {
56
0
        return !_data_reader.file_schema.empty() && _has_field_id(_data_reader.file_schema)
57
0
                       ? format::TableColumnMappingMode::BY_FIELD_ID
58
0
                       : format::TableColumnMappingMode::BY_NAME;
59
0
    }
60
61
protected:
62
    Status materialize_virtual_columns(Block* table_block) override;
63
64
    Status customize_file_scan_request(format::FileScanRequest* file_request) override;
65
66
    bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const override;
67
68
    Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc,
69
                                       bool* has_delete_file) override;
70
71
    Status _init_delete_predicates(const TTableFormatFileDesc& t_desc);
72
73
private:
74
0
    bool _has_field_id(const std::vector<format::ColumnDefinition>& schema) const {
75
0
        for (const auto& field : schema) {
76
0
            if (!field.has_identifier_field_id()) {
77
0
                return false;
78
0
            }
79
0
            if (!_has_field_id(field.children)) {
80
0
                return false;
81
0
            }
82
0
        }
83
0
        return true;
84
0
    }
85
    static constexpr int MIN_SUPPORT_DELETE_FILES_VERSION = 2;
86
    static constexpr int POSITION_DELETE = 1;
87
    static constexpr int EQUALITY_DELETE = 2;
88
    static constexpr int DELETION_VECTOR = 3;
89
90
    struct RowLineageColumns {
91
        int64_t first_row_id = -1;
92
        int64_t last_updated_sequence_number = -1;
93
    };
94
95
    static constexpr const char* ICEBERG_FILE_PATH = "file_path";
96
    static constexpr const char* ICEBERG_ROW_POS = "pos";
97
    static constexpr size_t ICEBERG_FILE_PATH_BLOCK_POSITION = 0;
98
    static constexpr size_t ICEBERG_ROW_POS_BLOCK_POSITION = 1;
99
100
    class PositionDeleteRowsCollector final {
101
    public:
102
        PositionDeleteRowsCollector(std::string data_file_path, format::DeleteRows* rows);
103
104
        Status collect(const Block& block, size_t read_rows);
105
106
    private:
107
        std::string _data_file_path;
108
        format::DeleteRows* _rows = nullptr;
109
    };
110
111
    static std::string _iceberg_delete_vector_cache_key(const TIcebergDeleteFileDesc& delete_file);
112
113
    static std::shared_ptr<io::FileSystemProperties> _delete_file_system_properties(
114
            const TFileScanRangeParams& scan_params);
115
116
    static std::unique_ptr<io::FileDescription> _delete_file_description(
117
            const TFileRangeDesc& range);
118
119
    std::string _data_file_path() const;
120
121
    // Append row position column to file scan request for position delete handling.
122
    Status _append_row_position_output_column(format::FileScanRequest* request);
123
    // Append equality delete predicates to file scan request based on the delete files in iceberg
124
    // params. DeleteVector and position delete files use the common DeleteRows path in TableReader.
125
    Status _append_equality_delete_predicates(format::FileScanRequest* request);
126
127
    Status _init_equality_delete_predicates(
128
            const std::vector<TIcebergDeleteFileDesc>& delete_files);
129
130
    // Read equality/position delete files.
131
    Status _read_parquet_equality_delete_file(const TIcebergDeleteFileDesc& delete_file,
132
                                              const TFileScanRangeParams& scan_params,
133
                                              IcebergDeleteFileIOContext* delete_io_ctx);
134
    Status _read_parquet_position_delete_file(const TIcebergDeleteFileDesc& delete_file,
135
                                              const TFileScanRangeParams& scan_params,
136
                                              IcebergDeleteFileIOContext* delete_io_ctx,
137
                                              PositionDeleteRowsCollector* collector);
138
139
    // Read position delete files and collect deleted row positions to update DeletePredicate.
140
    Status _init_position_delete_rows(const std::vector<TIcebergDeleteFileDesc>& delete_files);
141
142
    // Materialize row lineage virtual columns based on the position delete file.
143
    Status _materialize_row_lineage_row_id(Block* table_block, size_t column_idx);
144
    Status _materialize_row_lineage_last_updated_sequence_number(Block* table_block,
145
                                                                 size_t column_idx);
146
147
    RowLineageColumns _row_lineage_columns;
148
    size_t _row_position_block_position = 0;
149
    const TIcebergFileDesc* _iceberg_params = nullptr;
150
    bool _delete_predicates_initialized = false;
151
    format::DeleteRows _position_delete_rows_storage;
152
    // TODO: Support nested types
153
    struct EqualityDeleteFilter {
154
        std::vector<int> field_ids;
155
        std::vector<DataTypePtr> key_types;
156
        Block delete_block;
157
    };
158
    std::vector<EqualityDeleteFilter> _equality_delete_filters;
159
160
    bool _need_row_lineage_row_id() const;
161
};
162
163
} // namespace doris::iceberg