Coverage Report

Created: 2026-05-26 00:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader_mixin.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstddef>
21
#include <cstdint>
22
#include <string>
23
#include <unordered_map>
24
#include <vector>
25
26
#include "common/consts.h"
27
#include "common/status.h"
28
#include "core/block/block.h"
29
#include "core/column/column_dictionary.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_string.h"
32
#include "core/column/column_struct.h"
33
#include "core/data_type/data_type_number.h"
34
#include "core/data_type/data_type_string.h"
35
#include "format/generic_reader.h"
36
#include "format/table/deletion_vector_reader.h"
37
#include "format/table/equality_delete.h"
38
#include "format/table/table_schema_change_helper.h"
39
#include "runtime/runtime_profile.h"
40
#include "runtime/runtime_state.h"
41
#include "storage/olap_common.h"
42
43
namespace doris {
44
class TIcebergDeleteFileDesc;
45
} // namespace doris
46
47
namespace doris {
48
49
class ShardedKVCache;
50
51
// CRTP mixin for Iceberg reader functionality.
52
// BaseReader should be ParquetReader or OrcReader.
53
// Inherits BaseReader + TableSchemaChangeHelper, providing shared Iceberg logic
54
// (delete files, deletion vectors, equality delete, $row_id synthesis).
55
//
56
// Inheritance chain:
57
//   IcebergParquetReader -> IcebergReaderMixin<ParquetReader> -> ParquetReader -> GenericReader
58
//   IcebergOrcReader     -> IcebergReaderMixin<OrcReader>     -> OrcReader     -> GenericReader
59
template <typename BaseReader>
60
class IcebergReaderMixin : public BaseReader, public TableSchemaChangeHelper {
61
public:
62
    struct PositionDeleteRange {
63
        std::vector<std::string> data_file_path;
64
        std::vector<std::pair<int, int>> range;
65
    };
66
67
    // Forward BaseReader constructor arguments + Iceberg-specific kv_cache
68
    template <typename... Args>
69
    IcebergReaderMixin(ShardedKVCache* kv_cache, Args&&... args)
70
14
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
14
        static const char* iceberg_profile = "IcebergProfile";
72
14
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
14
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
14
                                                              TUnit::UNIT, iceberg_profile);
75
14
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
14
                                                             TUnit::UNIT, iceberg_profile);
77
14
        _iceberg_profile.delete_files_read_time =
78
14
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
14
        _iceberg_profile.delete_rows_sort_time =
80
14
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
14
        _iceberg_profile.parse_delete_file_time =
82
14
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
14
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEEC2IJRPNS_14RuntimeProfileERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRPKN4cctz9time_zoneERPNS_2io9IOContextERPNS_12RuntimeStateERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_
Line
Count
Source
70
7
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
7
        static const char* iceberg_profile = "IcebergProfile";
72
7
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
7
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
7
                                                              TUnit::UNIT, iceberg_profile);
75
7
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
7
                                                             TUnit::UNIT, iceberg_profile);
77
7
        _iceberg_profile.delete_files_read_time =
78
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
7
        _iceberg_profile.delete_rows_sort_time =
80
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
7
        _iceberg_profile.parse_delete_file_time =
82
7
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
7
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEEC2IJRPNS_14RuntimeProfileERPNS_12RuntimeStateERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERPNS_2io9IOContextERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_
Line
Count
Source
70
7
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
7
        static const char* iceberg_profile = "IcebergProfile";
72
7
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
7
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
7
                                                              TUnit::UNIT, iceberg_profile);
75
7
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
7
                                                             TUnit::UNIT, iceberg_profile);
77
7
        _iceberg_profile.delete_files_read_time =
78
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
7
        _iceberg_profile.delete_rows_sort_time =
80
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
7
        _iceberg_profile.parse_delete_file_time =
82
7
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
7
    }
84
85
14
    ~IcebergReaderMixin() override = default;
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEED2Ev
Line
Count
Source
85
7
    ~IcebergReaderMixin() override = default;
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEED2Ev
Line
Count
Source
85
7
    ~IcebergReaderMixin() override = default;
86
87
    void set_current_file_info(const std::string& file_path, int32_t partition_spec_id,
88
0
                               const std::string& partition_data_json) {
89
0
        _current_file_path = file_path;
90
0
        _partition_spec_id = partition_spec_id;
91
0
        _partition_data_json = partition_data_json;
92
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_
93
94
    enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR };
95
    enum Fileformat { NONE, PARQUET, ORC, AVRO };
96
97
    virtual void set_delete_rows() = 0;
98
99
    // Table-level COUNT(*) is handled by CountReader (created by FileScanner after
100
    // init_reader). If _do_get_next_block is called, COUNT must have been resolved.
101
2
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
2
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
2
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE18_do_get_next_blockEPNS_5BlockEPmPb
Line
Count
Source
101
1
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
1
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE18_do_get_next_blockEPNS_5BlockEPmPb
Line
Count
Source
101
1
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
1
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
1
    }
105
106
    void set_create_row_id_column_iterator_func(
107
0
            std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> create_func) {
108
0
        _create_topn_row_id_column_iterator = create_func;
109
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE
110
111
protected:
112
    // ---- Hook implementations ----
113
114
    // Called before reading a block: expand block for equality delete columns + detect row_id
115
2
    Status on_before_read_block(Block* block) override {
116
2
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
2
        return Status::OK();
118
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20on_before_read_blockEPNS_5BlockE
Line
Count
Source
115
1
    Status on_before_read_block(Block* block) override {
116
1
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
1
        return Status::OK();
118
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20on_before_read_blockEPNS_5BlockE
Line
Count
Source
115
1
    Status on_before_read_block(Block* block) override {
116
1
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
1
        return Status::OK();
118
1
    }
119
120
    /// Fill Iceberg $row_id synthesized column. Registered as handler during init.
121
0
    Status _fill_iceberg_row_id(Block* block, size_t rows) {
122
0
        int row_id_pos = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL);
123
0
        DORIS_CHECK(row_id_pos >= 0);
124
125
        // Lazy-init file info: only set when $row_id is actually needed.
126
0
        const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
127
0
        std::string file_path = table_desc.original_file_path;
128
0
        int32_t partition_spec_id =
129
0
                table_desc.__isset.partition_spec_id ? table_desc.partition_spec_id : 0;
130
0
        std::string partition_data_json;
131
0
        if (table_desc.__isset.partition_data_json) {
132
0
            partition_data_json = table_desc.partition_data_json;
133
0
        }
134
0
        set_current_file_info(file_path, partition_spec_id, partition_data_json);
135
136
0
        const auto& row_ids = this->current_batch_row_positions();
137
0
        auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_pos));
138
0
        MutableColumnPtr row_id_column;
139
0
        RETURN_IF_ERROR(_build_iceberg_rowid_column(col_with_type.type, _current_file_path, row_ids,
140
0
                                                    _partition_spec_id, _partition_data_json,
141
0
                                                    &row_id_column));
142
0
        col_with_type.column = std::move(row_id_column);
143
0
        return Status::OK();
144
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20_fill_iceberg_row_idEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20_fill_iceberg_row_idEPNS_5BlockEm
145
146
0
    void _init_row_lineage_columns() {
147
0
        const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
148
0
        if (table_desc.__isset.first_row_id) {
149
0
            _row_lineage_columns.first_row_id = table_desc.first_row_id;
150
0
        }
151
0
        if (table_desc.__isset.last_updated_sequence_number) {
152
0
            _row_lineage_columns.last_updated_sequence_number =
153
0
                    table_desc.last_updated_sequence_number;
154
0
        }
155
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE25_init_row_lineage_columnsEv
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE25_init_row_lineage_columnsEv
156
157
0
    Status _fill_row_lineage_row_id(Block* block, size_t rows) {
158
0
        int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID);
159
0
        DORIS_CHECK(col_pos >= 0);
160
161
0
        if (_row_lineage_columns.first_row_id >= 0) {
162
0
            auto column_guard = block->mutate_column_scoped(col_pos);
163
0
            auto* nullable_column =
164
0
                    assert_cast<ColumnNullable*>(column_guard.mutable_column().get());
165
0
            auto& null_map = nullable_column->get_null_map_data();
166
0
            auto& data =
167
0
                    assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
168
0
            const auto& row_ids = this->current_batch_row_positions();
169
0
            for (size_t i = 0; i < rows; ++i) {
170
0
                if (null_map[i] != 0) {
171
0
                    null_map[i] = 0;
172
0
                    data[i] = _row_lineage_columns.first_row_id + static_cast<int64_t>(row_ids[i]);
173
0
                }
174
0
            }
175
0
        }
176
0
        return Status::OK();
177
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm
178
179
0
    Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t rows) {
180
0
        int col_pos = block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER);
181
0
        DORIS_CHECK(col_pos >= 0);
182
183
0
        if (_row_lineage_columns.last_updated_sequence_number >= 0) {
184
0
            auto column_guard = block->mutate_column_scoped(col_pos);
185
0
            auto* nullable_column =
186
0
                    assert_cast<ColumnNullable*>(column_guard.mutable_column().get());
187
0
            auto& null_map = nullable_column->get_null_map_data();
188
0
            auto& data =
189
0
                    assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
190
0
            for (size_t i = 0; i < rows; ++i) {
191
0
                if (null_map[i] != 0) {
192
0
                    null_map[i] = 0;
193
0
                    data[i] = _row_lineage_columns.last_updated_sequence_number;
194
0
                }
195
0
            }
196
0
        }
197
0
        return Status::OK();
198
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm
199
200
    // Called after reading a block: apply equality delete filter + shrink block
201
2
    Status on_after_read_block(Block* block, size_t* read_rows) override {
202
2
        if (!_equality_delete_impls.empty()) {
203
0
            std::unique_ptr<IColumn::Filter> filter =
204
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
205
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
206
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
207
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
208
0
                        *filter));
209
0
            }
210
0
            Block::filter_block_internal(block, *filter, block->columns());
211
0
            *read_rows = block->rows();
212
0
        }
213
2
        return _shrink_block_if_need(block);
214
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE19on_after_read_blockEPNS_5BlockEPm
Line
Count
Source
201
1
    Status on_after_read_block(Block* block, size_t* read_rows) override {
202
1
        if (!_equality_delete_impls.empty()) {
203
0
            std::unique_ptr<IColumn::Filter> filter =
204
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
205
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
206
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
207
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
208
0
                        *filter));
209
0
            }
210
0
            Block::filter_block_internal(block, *filter, block->columns());
211
0
            *read_rows = block->rows();
212
0
        }
213
1
        return _shrink_block_if_need(block);
214
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE19on_after_read_blockEPNS_5BlockEPm
Line
Count
Source
201
1
    Status on_after_read_block(Block* block, size_t* read_rows) override {
202
1
        if (!_equality_delete_impls.empty()) {
203
0
            std::unique_ptr<IColumn::Filter> filter =
204
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
205
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
206
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
207
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
208
0
                        *filter));
209
0
            }
210
0
            Block::filter_block_internal(block, *filter, block->columns());
211
0
            *read_rows = block->rows();
212
0
        }
213
1
        return _shrink_block_if_need(block);
214
1
    }
215
216
    // ---- Shared Iceberg methods ----
217
218
    Status _init_row_filters();
219
    Status _position_delete_base(const std::string data_file_path,
220
                                 const std::vector<TIcebergDeleteFileDesc>& delete_files);
221
    Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files);
222
    Status read_deletion_vector(const std::string& data_file_path,
223
                                const TIcebergDeleteFileDesc& delete_file_desc);
224
225
    Status _expand_block_if_need(Block* block);
226
    Status _shrink_block_if_need(Block* block);
227
228
    // Type aliases — must be defined before member function declarations that use them.
229
    using DeleteRows = std::vector<int64_t>;
230
    using DeleteFile = phmap::parallel_flat_hash_map<
231
            std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>,
232
            std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8,
233
            std::mutex>;
234
235
    PositionDeleteRange _get_range(const ColumnDictI32& file_path_column);
236
    PositionDeleteRange _get_range(const ColumnString& file_path_column);
237
    static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array,
238
                                  int64_t num_delete_rows, std::vector<int64_t>& result);
239
    void _gen_position_delete_file_range(Block& block, DeleteFile* position_delete,
240
                                         size_t read_rows, bool file_path_column_dictionary_coded);
241
    void _generate_equality_delete_block(Block* block,
242
                                         const std::vector<std::string>& equality_delete_col_names,
243
                                         const std::vector<DataTypePtr>& equality_delete_col_types);
244
245
    // Pure virtual: format-specific delete file reading
246
    virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0;
247
    virtual std::unique_ptr<GenericReader> _create_equality_reader(
248
            const TFileRangeDesc& delete_desc) = 0;
249
250
0
    static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
251
252
    /// Build the Iceberg V2 row-id struct column.
253
    static Status _build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path,
254
                                              const std::vector<rowid_t>& row_ids,
255
                                              int32_t partition_spec_id,
256
                                              const std::string& partition_data_json,
257
0
                                              MutableColumnPtr* column_out) {
258
0
        if (type == nullptr || column_out == nullptr) {
259
0
            return Status::InvalidArgument("Invalid iceberg rowid column type or output column");
260
0
        }
261
0
        MutableColumnPtr column = type->create_column();
262
0
        ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get());
263
0
        ColumnStruct* struct_col = nullptr;
264
0
        if (nullable_col != nullptr) {
265
0
            struct_col =
266
0
                    check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get());
267
0
        } else {
268
0
            struct_col = check_and_get_column<ColumnStruct>(column.get());
269
0
        }
270
0
        if (struct_col == nullptr || struct_col->tuple_size() < 4) {
271
0
            return Status::InternalError("Invalid iceberg rowid column structure");
272
0
        }
273
0
        size_t num_rows = row_ids.size();
274
0
        auto& file_path_col = struct_col->get_column(0);
275
0
        auto& row_pos_col = struct_col->get_column(1);
276
0
        auto& spec_id_col = struct_col->get_column(2);
277
0
        auto& partition_data_col = struct_col->get_column(3);
278
0
        file_path_col.reserve(num_rows);
279
0
        row_pos_col.reserve(num_rows);
280
0
        spec_id_col.reserve(num_rows);
281
0
        partition_data_col.reserve(num_rows);
282
0
        for (size_t i = 0; i < num_rows; ++i) {
283
0
            file_path_col.insert_data(file_path.data(), file_path.size());
284
0
        }
285
0
        for (size_t i = 0; i < num_rows; ++i) {
286
0
            int64_t row_pos = static_cast<int64_t>(row_ids[i]);
287
0
            row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos));
288
0
        }
289
0
        for (size_t i = 0; i < num_rows; ++i) {
290
0
            int32_t spec_id = partition_spec_id;
291
0
            spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id));
292
0
        }
293
0
        for (size_t i = 0; i < num_rows; ++i) {
294
0
            partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size());
295
0
        }
296
0
        if (nullable_col != nullptr) {
297
0
            nullable_col->get_null_map_data().resize_fill(num_rows, 0);
298
0
        }
299
0
        *column_out = std::move(column);
300
0
        return Status::OK();
301
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE
302
303
    struct IcebergProfile {
304
        RuntimeProfile::Counter* num_delete_files;
305
        RuntimeProfile::Counter* num_delete_rows;
306
        RuntimeProfile::Counter* delete_files_read_time;
307
        RuntimeProfile::Counter* delete_rows_sort_time;
308
        RuntimeProfile::Counter* parse_delete_file_time;
309
    };
310
311
    bool _need_row_id_column = false;
312
    std::string _current_file_path;
313
    int32_t _partition_spec_id = 0;
314
    std::string _partition_data_json;
315
316
    ShardedKVCache* _kv_cache;
317
    IcebergProfile _iceberg_profile;
318
    const std::vector<int64_t>* _iceberg_delete_rows = nullptr;
319
    std::vector<std::string> _expand_col_names;
320
    std::vector<ColumnWithTypeAndName> _expand_columns;
321
    std::vector<std::string> _all_required_col_names;
322
    Fileformat _file_format = Fileformat::NONE;
323
324
    const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
325
    const std::string ICEBERG_FILE_PATH = "file_path";
326
    const std::string ICEBERG_ROW_POS = "pos";
327
    const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS};
328
    const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = {
329
            {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}};
330
    const int ICEBERG_FILE_PATH_INDEX = 0;
331
    const int ICEBERG_FILE_POS_INDEX = 1;
332
    const int READ_DELETE_FILE_BATCH_SIZE = 102400;
333
334
    // all ids that need read for eq delete (from all eq delete files)
335
    std::set<int> _equality_delete_col_ids;
336
    // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls
337
    std::map<std::vector<int>, int> _equality_delete_block_map;
338
    // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after
339
    // creating entries in _equality_delete_impls.
340
    std::vector<Block> _equality_delete_blocks;
341
    std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls;
342
343
    // id -> block column name
344
    std::unordered_map<int, std::string> _id_to_block_column_name;
345
346
    // File column names used during init
347
    std::vector<std::string> _file_col_names;
348
349
    std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
350
            _create_topn_row_id_column_iterator;
351
352
    static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
353
    static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
354
            "_last_updated_sequence_number";
355
    struct RowLineageColumns {
356
        int64_t first_row_id = -1;
357
        int64_t last_updated_sequence_number = -1;
358
    };
359
    RowLineageColumns _row_lineage_columns;
360
};
361
362
// ============================================================================
363
// Template method implementations (must be in header for templates)
364
// ============================================================================
365
366
template <typename BaseReader>
367
2
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
368
    // COUNT(*) short-circuit
369
2
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
370
2
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
371
2
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
372
0
        return Status::OK();
373
0
    }
374
375
2
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
376
2
    const auto& version = table_desc.format_version;
377
2
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
378
2
        return Status::OK();
379
2
    }
380
381
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
382
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
383
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
384
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
385
0
        if (desc.content == POSITION_DELETE) {
386
0
            position_delete_files.emplace_back(desc);
387
0
        } else if (desc.content == EQUALITY_DELETE) {
388
0
            equality_delete_files.emplace_back(desc);
389
0
        } else if (desc.content == DELETION_VECTOR) {
390
0
            deletion_vector_files.emplace_back(desc);
391
0
        }
392
0
    }
393
394
0
    if (!equality_delete_files.empty()) {
395
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
396
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
397
0
    }
398
399
0
    if (!deletion_vector_files.empty()) {
400
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
401
            /*
402
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
403
             * at execution time than position delete files. Unlike equality or position delete files, there can be
404
             * at most one deletion vector for a given data file in a snapshot.
405
             */
406
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
407
0
        }
408
0
        RETURN_IF_ERROR(
409
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
410
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
411
0
    } else if (!position_delete_files.empty()) {
412
0
        RETURN_IF_ERROR(
413
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
414
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
415
0
    }
416
417
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
418
0
    return Status::OK();
419
0
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_init_row_filtersEv
Line
Count
Source
367
1
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
368
    // COUNT(*) short-circuit
369
1
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
370
1
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
371
1
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
372
0
        return Status::OK();
373
0
    }
374
375
1
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
376
1
    const auto& version = table_desc.format_version;
377
1
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
378
1
        return Status::OK();
379
1
    }
380
381
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
382
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
383
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
384
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
385
0
        if (desc.content == POSITION_DELETE) {
386
0
            position_delete_files.emplace_back(desc);
387
0
        } else if (desc.content == EQUALITY_DELETE) {
388
0
            equality_delete_files.emplace_back(desc);
389
0
        } else if (desc.content == DELETION_VECTOR) {
390
0
            deletion_vector_files.emplace_back(desc);
391
0
        }
392
0
    }
393
394
0
    if (!equality_delete_files.empty()) {
395
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
396
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
397
0
    }
398
399
0
    if (!deletion_vector_files.empty()) {
400
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
401
            /*
402
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
403
             * at execution time than position delete files. Unlike equality or position delete files, there can be
404
             * at most one deletion vector for a given data file in a snapshot.
405
             */
406
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
407
0
        }
408
0
        RETURN_IF_ERROR(
409
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
410
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
411
0
    } else if (!position_delete_files.empty()) {
412
0
        RETURN_IF_ERROR(
413
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
414
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
415
0
    }
416
417
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
418
0
    return Status::OK();
419
0
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_init_row_filtersEv
Line
Count
Source
367
1
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
368
    // COUNT(*) short-circuit
369
1
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
370
1
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
371
1
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
372
0
        return Status::OK();
373
0
    }
374
375
1
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
376
1
    const auto& version = table_desc.format_version;
377
1
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
378
1
        return Status::OK();
379
1
    }
380
381
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
382
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
383
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
384
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
385
0
        if (desc.content == POSITION_DELETE) {
386
0
            position_delete_files.emplace_back(desc);
387
0
        } else if (desc.content == EQUALITY_DELETE) {
388
0
            equality_delete_files.emplace_back(desc);
389
0
        } else if (desc.content == DELETION_VECTOR) {
390
0
            deletion_vector_files.emplace_back(desc);
391
0
        }
392
0
    }
393
394
0
    if (!equality_delete_files.empty()) {
395
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
396
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
397
0
    }
398
399
0
    if (!deletion_vector_files.empty()) {
400
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
401
            /*
402
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
403
             * at execution time than position delete files. Unlike equality or position delete files, there can be
404
             * at most one deletion vector for a given data file in a snapshot.
405
             */
406
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
407
0
        }
408
0
        RETURN_IF_ERROR(
409
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
410
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
411
0
    } else if (!position_delete_files.empty()) {
412
0
        RETURN_IF_ERROR(
413
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
414
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
415
0
    }
416
417
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
418
0
    return Status::OK();
419
0
}
420
421
template <typename BaseReader>
422
Status IcebergReaderMixin<BaseReader>::_equality_delete_base(
423
0
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
424
0
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
425
0
            partition_columns;
426
0
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
427
428
0
    for (const auto& delete_file : delete_files) {
429
0
        TFileRangeDesc delete_desc;
430
0
        delete_desc.__set_fs_name(this->get_scan_range().fs_name);
431
0
        delete_desc.path = delete_file.path;
432
0
        delete_desc.start_offset = 0;
433
0
        delete_desc.size = -1;
434
0
        delete_desc.file_size = -1;
435
436
0
        if (!delete_file.__isset.field_ids) [[unlikely]] {
437
0
            return Status::InternalError(
438
0
                    "missing delete field ids when reading equality delete file");
439
0
        }
440
0
        auto& read_column_field_ids = delete_file.field_ids;
441
0
        std::set<int> read_column_field_ids_set;
442
0
        for (const auto& field_id : read_column_field_ids) {
443
0
            read_column_field_ids_set.insert(field_id);
444
0
            _equality_delete_col_ids.insert(field_id);
445
0
        }
446
447
0
        std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc);
448
0
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
449
450
0
        std::vector<std::string> equality_delete_col_names;
451
0
        std::vector<DataTypePtr> equality_delete_col_types;
452
453
        // Build delete col names/types/ids by matching field_ids from delete file schema.
454
        // Master iterates delete file's FieldDescriptor and uses field_id to match,
455
        // NOT idx-based pairing (get_parsed_schema order != field_ids order).
456
0
        std::vector<std::string> delete_col_names;
457
0
        std::vector<DataTypePtr> delete_col_types;
458
0
        std::vector<int> delete_col_ids;
459
0
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
460
461
0
        if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) {
462
0
            const FieldDescriptor* delete_field_desc = nullptr;
463
0
            RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&delete_field_desc));
464
0
            DCHECK(delete_field_desc != nullptr);
465
466
0
            for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) {
467
0
                if (delete_file_field.field_id == -1) [[unlikely]] {
468
0
                    return Status::DataQualityError(
469
0
                            "missing field id when reading equality delete file");
470
0
                }
471
0
                if (!read_column_field_ids_set.contains(delete_file_field.field_id)) {
472
0
                    continue;
473
0
                }
474
0
                if (delete_file_field.children.size() > 0) [[unlikely]] {
475
0
                    return Status::InternalError(
476
0
                            "can not support read complex column in equality delete file");
477
0
                }
478
479
0
                delete_col_ids.emplace_back(delete_file_field.field_id);
480
0
                delete_col_names.emplace_back(delete_file_field.name);
481
0
                delete_col_types.emplace_back(make_nullable(delete_file_field.data_type));
482
483
0
                int field_id = delete_file_field.field_id;
484
0
                if (!_id_to_block_column_name.contains(field_id)) {
485
0
                    _id_to_block_column_name.emplace(field_id, delete_file_field.name);
486
0
                    _expand_col_names.emplace_back(delete_file_field.name);
487
0
                    _expand_columns.emplace_back(
488
0
                            make_nullable(delete_file_field.data_type)->create_column(),
489
0
                            make_nullable(delete_file_field.data_type), delete_file_field.name);
490
0
                }
491
0
            }
492
0
            for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
493
0
                delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
494
0
            }
495
            // Delete files have TFileRangeDesc.size=-1, which would cause
496
            // set_fill_columns to return EndOfFile("No row group to read")
497
            // when _filter_groups is true. Master passes filter_groups=false.
498
0
            ParquetInitContext eq_delete_ctx;
499
0
            eq_delete_ctx.filter_groups = false;
500
0
            eq_delete_ctx.column_names = delete_col_names;
501
0
            eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx;
502
0
            auto st2 = parquet_reader->init_reader(&eq_delete_ctx);
503
0
            if (!st2.ok()) {
504
0
                return st2;
505
0
            }
506
0
        } else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) {
507
            // For ORC: use get_parsed_schema with field_ids from delete_file
508
            // ORC field_ids come from the Thrift descriptor, not from ORC metadata
509
0
            RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names,
510
0
                                                             &equality_delete_col_types));
511
0
            for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) {
512
0
                if (idx < read_column_field_ids.size()) {
513
0
                    int field_id = read_column_field_ids[idx];
514
0
                    if (!read_column_field_ids_set.contains(field_id)) continue;
515
0
                    delete_col_ids.emplace_back(field_id);
516
0
                    delete_col_names.emplace_back(equality_delete_col_names[idx]);
517
0
                    delete_col_types.emplace_back(make_nullable(equality_delete_col_types[idx]));
518
0
                    if (!_id_to_block_column_name.contains(field_id)) {
519
0
                        _id_to_block_column_name.emplace(field_id, equality_delete_col_names[idx]);
520
0
                        _expand_col_names.emplace_back(equality_delete_col_names[idx]);
521
0
                        _expand_columns.emplace_back(
522
0
                                make_nullable(equality_delete_col_types[idx])->create_column(),
523
0
                                make_nullable(equality_delete_col_types[idx]),
524
0
                                equality_delete_col_names[idx]);
525
0
                    }
526
0
                }
527
0
            }
528
0
            for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
529
0
                delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
530
0
            }
531
0
            OrcInitContext eq_delete_ctx;
532
0
            eq_delete_ctx.column_names = delete_col_names;
533
0
            eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx;
534
0
            RETURN_IF_ERROR(orc_reader->init_reader(&eq_delete_ctx));
535
0
        } else {
536
0
            return Status::InternalError("Unsupported format of delete file");
537
0
        }
538
539
0
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
540
0
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
541
0
            Block block;
542
0
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
543
0
            _equality_delete_blocks.emplace_back(block);
544
0
        }
545
0
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
546
547
0
        bool eof = false;
548
0
        while (!eof) {
549
0
            Block tmp_block;
550
0
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
551
0
            size_t read_rows = 0;
552
0
            auto st = delete_reader->get_next_block(&tmp_block, &read_rows, &eof);
553
0
            if (!st.ok()) {
554
0
                return st;
555
0
            }
556
0
            if (read_rows > 0) {
557
0
                ScopedMutableBlock scoped_mutable_block(&eq_file_block);
558
0
                auto& mutable_block = scoped_mutable_block.mutable_block();
559
0
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
560
0
            }
561
0
        }
562
0
    }
563
564
0
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
565
0
        auto& eq_file_block = _equality_delete_blocks[block_idx];
566
0
        auto equality_delete_impl =
567
0
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
568
0
        RETURN_IF_ERROR(equality_delete_impl->init(this->get_profile()));
569
0
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
570
0
    }
571
0
    return Status::OK();
572
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE
573
574
template <typename BaseReader>
575
void IcebergReaderMixin<BaseReader>::_generate_equality_delete_block(
576
        Block* block, const std::vector<std::string>& equality_delete_col_names,
577
0
        const std::vector<DataTypePtr>& equality_delete_col_types) {
578
0
    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
579
0
        DataTypePtr data_type = make_nullable(equality_delete_col_types[i]);
580
0
        MutableColumnPtr data_column = data_type->create_column();
581
0
        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
582
0
                                            equality_delete_col_names[i]));
583
0
    }
584
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE
585
586
template <typename BaseReader>
587
2
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
588
2
    std::set<std::string> names;
589
2
    auto block_names = block->get_names();
590
2
    names.insert(block_names.begin(), block_names.end());
591
2
    for (auto& col : _expand_columns) {
592
0
        if (names.contains(col.name)) {
593
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
594
0
        }
595
0
        names.insert(col.name);
596
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
597
0
        block->insert({col.type->create_column(), col.type, col.name});
598
0
    }
599
2
    return Status::OK();
600
2
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_expand_block_if_needEPNS_5BlockE
Line
Count
Source
587
1
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
588
1
    std::set<std::string> names;
589
1
    auto block_names = block->get_names();
590
1
    names.insert(block_names.begin(), block_names.end());
591
1
    for (auto& col : _expand_columns) {
592
0
        if (names.contains(col.name)) {
593
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
594
0
        }
595
0
        names.insert(col.name);
596
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
597
0
        block->insert({col.type->create_column(), col.type, col.name});
598
0
    }
599
1
    return Status::OK();
600
1
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_expand_block_if_needEPNS_5BlockE
Line
Count
Source
587
1
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
588
1
    std::set<std::string> names;
589
1
    auto block_names = block->get_names();
590
1
    names.insert(block_names.begin(), block_names.end());
591
1
    for (auto& col : _expand_columns) {
592
0
        if (names.contains(col.name)) {
593
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
594
0
        }
595
0
        names.insert(col.name);
596
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
597
0
        block->insert({col.type->create_column(), col.type, col.name});
598
0
    }
599
1
    return Status::OK();
600
1
}
601
602
template <typename BaseReader>
603
2
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
604
2
    std::set<size_t> positions_to_erase;
605
2
    for (const std::string& expand_col : _expand_col_names) {
606
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
607
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
608
0
                                         block->dump_names());
609
0
        }
610
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
611
0
    }
612
2
    block->erase(positions_to_erase);
613
2
    for (const std::string& expand_col : _expand_col_names) {
614
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
615
0
    }
616
2
    return Status::OK();
617
2
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_shrink_block_if_needEPNS_5BlockE
Line
Count
Source
603
1
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
604
1
    std::set<size_t> positions_to_erase;
605
1
    for (const std::string& expand_col : _expand_col_names) {
606
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
607
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
608
0
                                         block->dump_names());
609
0
        }
610
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
611
0
    }
612
1
    block->erase(positions_to_erase);
613
1
    for (const std::string& expand_col : _expand_col_names) {
614
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
615
0
    }
616
1
    return Status::OK();
617
1
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_shrink_block_if_needEPNS_5BlockE
Line
Count
Source
603
1
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
604
1
    std::set<size_t> positions_to_erase;
605
1
    for (const std::string& expand_col : _expand_col_names) {
606
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
607
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
608
0
                                         block->dump_names());
609
0
        }
610
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
611
0
    }
612
1
    block->erase(positions_to_erase);
613
1
    for (const std::string& expand_col : _expand_col_names) {
614
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
615
0
    }
616
1
    return Status::OK();
617
1
}
618
619
template <typename BaseReader>
620
Status IcebergReaderMixin<BaseReader>::_position_delete_base(
621
0
        const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
622
0
    std::vector<DeleteRows*> delete_rows_array;
623
0
    int64_t num_delete_rows = 0;
624
0
    for (const auto& delete_file : delete_files) {
625
0
        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
626
0
        Status create_status = Status::OK();
627
0
        auto* delete_file_cache = _kv_cache->template get<DeleteFile>(
628
0
                _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* {
629
0
                    auto* position_delete = new DeleteFile;
630
0
                    TFileRangeDesc delete_file_range;
631
0
                    delete_file_range.__set_fs_name(this->get_scan_range().fs_name);
632
0
                    delete_file_range.path = delete_file.path;
633
0
                    delete_file_range.start_offset = 0;
634
0
                    delete_file_range.size = -1;
635
0
                    delete_file_range.file_size = -1;
636
0
                    create_status = _read_position_delete_file(&delete_file_range, position_delete);
637
0
                    if (!create_status) {
638
0
                        return nullptr;
639
0
                    }
640
0
                    return position_delete;
641
0
                });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev
642
0
        if (create_status.is<ErrorCode::END_OF_FILE>()) {
643
0
            continue;
644
0
        } else if (!create_status.ok()) {
645
0
            return create_status;
646
0
        }
647
648
0
        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
649
0
        auto get_value = [&](const auto& v) {
650
0
            DeleteRows* row_ids = v.second.get();
651
0
            if (!row_ids->empty()) {
652
0
                delete_rows_array.emplace_back(row_ids);
653
0
                num_delete_rows += row_ids->size();
654
0
            }
655
0
        };
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_
656
0
        delete_file_map.if_contains(data_file_path, get_value);
657
0
    }
658
0
    if (num_delete_rows > 0) {
659
0
        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
660
0
        _iceberg_delete_rows =
661
0
                _kv_cache->template get<DeleteRows>(data_file_path, [&]() -> DeleteRows* {
662
0
                    auto* data_file_position_delete = new DeleteRows;
663
0
                    _sort_delete_rows(delete_rows_array, num_delete_rows,
664
0
                                      *data_file_position_delete);
665
0
                    return data_file_position_delete;
666
0
                });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv
667
0
        set_delete_rows();
668
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
669
0
    }
670
0
    return Status::OK();
671
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE
672
673
template <typename BaseReader>
674
typename IcebergReaderMixin<BaseReader>::PositionDeleteRange
675
0
IcebergReaderMixin<BaseReader>::_get_range(const ColumnDictI32& file_path_column) {
676
0
    PositionDeleteRange range;
677
0
    size_t read_rows = file_path_column.get_data().size();
678
0
    const int* code_path = file_path_column.get_data().data();
679
0
    const int* code_path_start = code_path;
680
0
    const int* code_path_end = code_path + read_rows;
681
0
    while (code_path < code_path_end) {
682
0
        int code = code_path[0];
683
0
        const int* code_end = std::upper_bound(code_path, code_path_end, code);
684
0
        range.data_file_path.emplace_back(file_path_column.get_value(code).to_string());
685
0
        range.range.emplace_back(code_path - code_path_start, code_end - code_path_start);
686
0
        code_path = code_end;
687
0
    }
688
0
    return range;
689
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_13ColumnDictI32E
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_13ColumnDictI32E
690
691
template <typename BaseReader>
692
typename IcebergReaderMixin<BaseReader>::PositionDeleteRange
693
0
IcebergReaderMixin<BaseReader>::_get_range(const ColumnString& file_path_column) {
694
0
    PositionDeleteRange range;
695
0
    size_t read_rows = file_path_column.size();
696
0
    size_t index = 0;
697
0
    while (index < read_rows) {
698
0
        StringRef data_path = file_path_column.get_data_at(index);
699
0
        size_t left = index - 1;
700
0
        size_t right = read_rows;
701
0
        while (left + 1 != right) {
702
0
            size_t mid = left + (right - left) / 2;
703
0
            if (file_path_column.get_data_at(mid) > data_path) {
704
0
                right = mid;
705
0
            } else {
706
0
                left = mid;
707
0
            }
708
0
        }
709
0
        range.data_file_path.emplace_back(data_path.to_string());
710
0
        range.range.emplace_back(index, left + 1);
711
0
        index = left + 1;
712
0
    }
713
0
    return range;
714
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_9ColumnStrIjEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_9ColumnStrIjEE
715
716
template <typename BaseReader>
717
void IcebergReaderMixin<BaseReader>::_sort_delete_rows(
718
        const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows,
719
0
        std::vector<int64_t>& result) {
720
0
    if (delete_rows_array.empty()) {
721
0
        return;
722
0
    }
723
0
    if (delete_rows_array.size() == 1) {
724
0
        result.resize(num_delete_rows);
725
0
        memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows);
726
0
        return;
727
0
    }
728
0
    if (delete_rows_array.size() == 2) {
729
0
        result.resize(num_delete_rows);
730
0
        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
731
0
                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
732
0
                   result.begin());
733
0
        return;
734
0
    }
735
736
0
    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
737
0
    result.resize(num_delete_rows);
738
0
    auto row_id_iter = result.begin();
739
0
    auto iter_end = result.end();
740
0
    std::vector<vec_pair> rows_array;
741
0
    for (auto* rows : delete_rows_array) {
742
0
        if (!rows->empty()) {
743
0
            rows_array.emplace_back(rows->begin(), rows->end());
744
0
        }
745
0
    }
746
0
    size_t array_size = rows_array.size();
747
0
    while (row_id_iter != iter_end) {
748
0
        int64_t min_index = 0;
749
0
        int64_t min = *rows_array[0].first;
750
0
        for (size_t i = 0; i < array_size; ++i) {
751
0
            if (*rows_array[i].first < min) {
752
0
                min_index = i;
753
0
                min = *rows_array[i].first;
754
0
            }
755
0
        }
756
0
        *row_id_iter++ = min;
757
0
        rows_array[min_index].first++;
758
0
        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
759
0
            rows_array.erase(rows_array.begin() + min_index);
760
0
            array_size--;
761
0
        }
762
0
    }
763
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_
764
765
template <typename BaseReader>
766
void IcebergReaderMixin<BaseReader>::_gen_position_delete_file_range(
767
        Block& block, DeleteFile* position_delete, size_t read_rows,
768
0
        bool file_path_column_dictionary_coded) {
769
0
    SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
770
0
    auto name_to_pos_map = block.get_name_to_pos_map();
771
0
    ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
772
0
    DCHECK_EQ(path_column->size(), read_rows);
773
0
    ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column;
774
0
    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
775
0
    const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data();
776
0
    PositionDeleteRange range;
777
0
    if (file_path_column_dictionary_coded) {
778
0
        range = _get_range(assert_cast<const ColumnDictI32&>(*path_column));
779
0
    } else {
780
0
        range = _get_range(assert_cast<const ColumnString&>(*path_column));
781
0
    }
782
0
    for (int i = 0; i < range.range.size(); ++i) {
783
0
        std::string key = range.data_file_path[i];
784
0
        auto iter = position_delete->find(key);
785
0
        DeleteRows* delete_rows;
786
0
        if (iter == position_delete->end()) {
787
0
            delete_rows = new DeleteRows;
788
0
            std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows);
789
0
            (*position_delete)[key] = std::move(delete_rows_ptr);
790
0
        } else {
791
0
            delete_rows = iter->second.get();
792
0
        }
793
0
        const int64_t* cpy_start = src_data + range.range[i].first;
794
0
        const int64_t cpy_count = range.range[i].second - range.range[i].first;
795
0
        int64_t origin_size = delete_rows->size();
796
0
        delete_rows->resize(origin_size + cpy_count);
797
0
        int64_t* dest_position = &(*delete_rows)[origin_size];
798
0
        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
799
0
    }
800
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb
801
802
template <typename BaseReader>
803
Status IcebergReaderMixin<BaseReader>::read_deletion_vector(
804
0
        const std::string& data_file_path, const TIcebergDeleteFileDesc& delete_file_desc) {
805
0
    Status create_status = Status::OK();
806
0
    SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
807
0
    _iceberg_delete_rows = _kv_cache->template get<
808
0
            DeleteRows>(data_file_path, [&]() -> DeleteRows* {
809
0
        auto* delete_rows = new DeleteRows;
810
811
0
        TFileRangeDesc delete_range;
812
0
        delete_range.__set_fs_name(this->get_scan_range().fs_name);
813
0
        delete_range.path = delete_file_desc.path;
814
0
        delete_range.start_offset = delete_file_desc.content_offset;
815
0
        delete_range.size = delete_file_desc.content_size_in_bytes;
816
0
        delete_range.file_size = -1;
817
818
0
        DeletionVectorReader dv_reader(this->get_state(), this->get_profile(),
819
0
                                       this->get_scan_params(), delete_range, this->get_io_ctx());
820
0
        create_status = dv_reader.open();
821
0
        if (!create_status.ok()) [[unlikely]] {
822
0
            return nullptr;
823
0
        }
824
825
0
        size_t buffer_size = delete_range.size;
826
0
        std::vector<char> buf(buffer_size);
827
0
        if (buffer_size < 12) [[unlikely]] {
828
0
            create_status = Status::DataQualityError("Deletion vector file size too small: {}",
829
0
                                                     buffer_size);
830
0
            return nullptr;
831
0
        }
832
833
0
        create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size});
834
0
        if (!create_status) [[unlikely]] {
835
0
            return nullptr;
836
0
        }
837
838
0
        auto total_length = BigEndian::Load32(buf.data());
839
0
        if (total_length + 8 != buffer_size) [[unlikely]] {
840
0
            create_status = Status::DataQualityError(
841
0
                    "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8,
842
0
                    buffer_size);
843
0
            return nullptr;
844
0
        }
845
846
0
        constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
847
0
        if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] {
848
0
            create_status = Status::DataQualityError("Deletion vector magic number mismatch");
849
0
            return nullptr;
850
0
        }
851
852
0
        roaring::Roaring64Map bitmap;
853
0
        SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
854
0
        try {
855
0
            bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12);
856
0
        } catch (const std::runtime_error& e) {
857
0
            create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
858
0
            return nullptr;
859
0
        }
860
861
0
        delete_rows->reserve(bitmap.cardinality());
862
0
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
863
0
            delete_rows->push_back(*it);
864
0
        }
865
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
866
0
        return delete_rows;
867
0
    });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv
868
869
0
    RETURN_IF_ERROR(create_status);
870
0
    if (!_iceberg_delete_rows->empty()) [[likely]] {
871
0
        set_delete_rows();
872
0
    }
873
0
    return Status::OK();
874
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE
875
876
} // namespace doris