Coverage Report

Created: 2026-05-14 20:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader_mixin.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstddef>
21
#include <cstdint>
22
#include <string>
23
#include <unordered_map>
24
#include <vector>
25
26
#include "common/consts.h"
27
#include "common/status.h"
28
#include "core/block/block.h"
29
#include "core/column/column_dictionary.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_string.h"
32
#include "core/column/column_struct.h"
33
#include "core/data_type/data_type_number.h"
34
#include "core/data_type/data_type_string.h"
35
#include "format/generic_reader.h"
36
#include "format/table/deletion_vector_reader.h"
37
#include "format/table/equality_delete.h"
38
#include "format/table/table_schema_change_helper.h"
39
#include "runtime/runtime_profile.h"
40
#include "runtime/runtime_state.h"
41
#include "storage/olap_common.h"
42
43
namespace doris {
44
class TIcebergDeleteFileDesc;
45
} // namespace doris
46
47
namespace doris {
48
49
class ShardedKVCache;
50
51
// CRTP mixin for Iceberg reader functionality.
52
// BaseReader should be ParquetReader or OrcReader.
53
// Inherits BaseReader + TableSchemaChangeHelper, providing shared Iceberg logic
54
// (delete files, deletion vectors, equality delete, $row_id synthesis).
55
//
56
// Inheritance chain:
57
//   IcebergParquetReader -> IcebergReaderMixin<ParquetReader> -> ParquetReader -> GenericReader
58
//   IcebergOrcReader     -> IcebergReaderMixin<OrcReader>     -> OrcReader     -> GenericReader
59
template <typename BaseReader>
60
class IcebergReaderMixin : public BaseReader, public TableSchemaChangeHelper {
61
public:
62
    struct PositionDeleteRange {
63
        std::vector<std::string> data_file_path;
64
        std::vector<std::pair<int, int>> range;
65
    };
66
67
    // Forward BaseReader constructor arguments + Iceberg-specific kv_cache
68
    template <typename... Args>
69
    IcebergReaderMixin(ShardedKVCache* kv_cache, Args&&... args)
70
14
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
14
        static const char* iceberg_profile = "IcebergProfile";
72
14
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
14
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
14
                                                              TUnit::UNIT, iceberg_profile);
75
14
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
14
                                                             TUnit::UNIT, iceberg_profile);
77
14
        _iceberg_profile.delete_files_read_time =
78
14
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
14
        _iceberg_profile.delete_rows_sort_time =
80
14
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
14
        _iceberg_profile.parse_delete_file_time =
82
14
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
14
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEEC2IJRPNS_14RuntimeProfileERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRPKN4cctz9time_zoneERPNS_2io9IOContextERPNS_12RuntimeStateERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_
Line
Count
Source
70
7
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
7
        static const char* iceberg_profile = "IcebergProfile";
72
7
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
7
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
7
                                                              TUnit::UNIT, iceberg_profile);
75
7
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
7
                                                             TUnit::UNIT, iceberg_profile);
77
7
        _iceberg_profile.delete_files_read_time =
78
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
7
        _iceberg_profile.delete_rows_sort_time =
80
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
7
        _iceberg_profile.parse_delete_file_time =
82
7
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
7
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEEC2IJRPNS_14RuntimeProfileERPNS_12RuntimeStateERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERPNS_2io9IOContextERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_
Line
Count
Source
70
7
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
7
        static const char* iceberg_profile = "IcebergProfile";
72
7
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
7
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
7
                                                              TUnit::UNIT, iceberg_profile);
75
7
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
7
                                                             TUnit::UNIT, iceberg_profile);
77
7
        _iceberg_profile.delete_files_read_time =
78
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
7
        _iceberg_profile.delete_rows_sort_time =
80
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
7
        _iceberg_profile.parse_delete_file_time =
82
7
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
7
    }
84
85
14
    ~IcebergReaderMixin() override = default;
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEED2Ev
Line
Count
Source
85
7
    ~IcebergReaderMixin() override = default;
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEED2Ev
Line
Count
Source
85
7
    ~IcebergReaderMixin() override = default;
86
87
    void set_current_file_info(const std::string& file_path, int32_t partition_spec_id,
88
0
                               const std::string& partition_data_json) {
89
0
        _current_file_path = file_path;
90
0
        _partition_spec_id = partition_spec_id;
91
0
        _partition_data_json = partition_data_json;
92
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_
93
94
    enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR };
95
    enum Fileformat { NONE, PARQUET, ORC, AVRO };
96
97
    virtual void set_delete_rows() = 0;
98
99
    // Table-level COUNT(*) is handled by CountReader (created by FileScanner after
100
    // init_reader). If _do_get_next_block is called, COUNT must have been resolved.
101
2
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
2
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
2
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE18_do_get_next_blockEPNS_5BlockEPmPb
Line
Count
Source
101
1
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
1
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE18_do_get_next_blockEPNS_5BlockEPmPb
Line
Count
Source
101
1
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
1
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
1
    }
105
106
    void set_create_row_id_column_iterator_func(
107
0
            std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> create_func) {
108
0
        _create_topn_row_id_column_iterator = create_func;
109
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE
110
111
protected:
112
    // ---- Hook implementations ----
113
114
    // Called before reading a block: expand block for equality delete columns + detect row_id
115
2
    Status on_before_read_block(Block* block) override {
116
2
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
2
        return Status::OK();
118
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20on_before_read_blockEPNS_5BlockE
Line
Count
Source
115
1
    Status on_before_read_block(Block* block) override {
116
1
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
1
        return Status::OK();
118
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20on_before_read_blockEPNS_5BlockE
Line
Count
Source
115
1
    Status on_before_read_block(Block* block) override {
116
1
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
1
        return Status::OK();
118
1
    }
119
120
    /// Fill Iceberg $row_id synthesized column. Registered as handler during init.
121
0
    Status _fill_iceberg_row_id(Block* block, size_t rows) {
122
0
        int row_id_pos = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL);
123
0
        DORIS_CHECK(row_id_pos >= 0);
124
125
        // Lazy-init file info: only set when $row_id is actually needed.
126
0
        const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
127
0
        std::string file_path = table_desc.original_file_path;
128
0
        int32_t partition_spec_id =
129
0
                table_desc.__isset.partition_spec_id ? table_desc.partition_spec_id : 0;
130
0
        std::string partition_data_json;
131
0
        if (table_desc.__isset.partition_data_json) {
132
0
            partition_data_json = table_desc.partition_data_json;
133
0
        }
134
0
        set_current_file_info(file_path, partition_spec_id, partition_data_json);
135
136
0
        const auto& row_ids = this->current_batch_row_positions();
137
0
        auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_pos));
138
0
        MutableColumnPtr row_id_column;
139
0
        RETURN_IF_ERROR(_build_iceberg_rowid_column(col_with_type.type, _current_file_path, row_ids,
140
0
                                                    _partition_spec_id, _partition_data_json,
141
0
                                                    &row_id_column));
142
0
        col_with_type.column = std::move(row_id_column);
143
0
        return Status::OK();
144
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20_fill_iceberg_row_idEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20_fill_iceberg_row_idEPNS_5BlockEm
145
146
0
    void _init_row_lineage_columns() {
147
0
        const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
148
0
        if (table_desc.__isset.first_row_id) {
149
0
            _row_lineage_columns.first_row_id = table_desc.first_row_id;
150
0
        }
151
0
        if (table_desc.__isset.last_updated_sequence_number) {
152
0
            _row_lineage_columns.last_updated_sequence_number =
153
0
                    table_desc.last_updated_sequence_number;
154
0
        }
155
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE25_init_row_lineage_columnsEv
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE25_init_row_lineage_columnsEv
156
157
0
    Status _fill_row_lineage_row_id(Block* block, size_t rows) {
158
0
        int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID);
159
0
        DORIS_CHECK(col_pos >= 0);
160
161
0
        if (_row_lineage_columns.first_row_id >= 0) {
162
0
            auto col = block->get_by_position(col_pos).column->assume_mutable();
163
0
            auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
164
0
            auto& null_map = nullable_column->get_null_map_data();
165
0
            auto& data =
166
0
                    assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
167
0
            const auto& row_ids = this->current_batch_row_positions();
168
0
            for (size_t i = 0; i < rows; ++i) {
169
0
                if (null_map[i] != 0) {
170
0
                    null_map[i] = 0;
171
0
                    data[i] = _row_lineage_columns.first_row_id + static_cast<int64_t>(row_ids[i]);
172
0
                }
173
0
            }
174
0
        }
175
0
        return Status::OK();
176
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm
177
178
0
    Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t rows) {
179
0
        int col_pos = block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER);
180
0
        DORIS_CHECK(col_pos >= 0);
181
182
0
        if (_row_lineage_columns.last_updated_sequence_number >= 0) {
183
0
            auto col = block->get_by_position(col_pos).column->assume_mutable();
184
0
            auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
185
0
            auto& null_map = nullable_column->get_null_map_data();
186
0
            auto& data =
187
0
                    assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
188
0
            for (size_t i = 0; i < rows; ++i) {
189
0
                if (null_map[i] != 0) {
190
0
                    null_map[i] = 0;
191
0
                    data[i] = _row_lineage_columns.last_updated_sequence_number;
192
0
                }
193
0
            }
194
0
        }
195
0
        return Status::OK();
196
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm
197
198
    // Called after reading a block: apply equality delete filter + shrink block
199
2
    Status on_after_read_block(Block* block, size_t* read_rows) override {
200
2
        if (!_equality_delete_impls.empty()) {
201
0
            std::unique_ptr<IColumn::Filter> filter =
202
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
203
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
204
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
205
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
206
0
                        *filter));
207
0
            }
208
0
            Block::filter_block_internal(block, *filter, block->columns());
209
0
            *read_rows = block->rows();
210
0
        }
211
2
        return _shrink_block_if_need(block);
212
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE19on_after_read_blockEPNS_5BlockEPm
Line
Count
Source
199
1
    Status on_after_read_block(Block* block, size_t* read_rows) override {
200
1
        if (!_equality_delete_impls.empty()) {
201
0
            std::unique_ptr<IColumn::Filter> filter =
202
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
203
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
204
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
205
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
206
0
                        *filter));
207
0
            }
208
0
            Block::filter_block_internal(block, *filter, block->columns());
209
0
            *read_rows = block->rows();
210
0
        }
211
1
        return _shrink_block_if_need(block);
212
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE19on_after_read_blockEPNS_5BlockEPm
Line
Count
Source
199
1
    Status on_after_read_block(Block* block, size_t* read_rows) override {
200
1
        if (!_equality_delete_impls.empty()) {
201
0
            std::unique_ptr<IColumn::Filter> filter =
202
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
203
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
204
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
205
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
206
0
                        *filter));
207
0
            }
208
0
            Block::filter_block_internal(block, *filter, block->columns());
209
0
            *read_rows = block->rows();
210
0
        }
211
1
        return _shrink_block_if_need(block);
212
1
    }
213
214
    // ---- Shared Iceberg methods ----
215
216
    Status _init_row_filters();
217
    Status _position_delete_base(const std::string data_file_path,
218
                                 const std::vector<TIcebergDeleteFileDesc>& delete_files);
219
    Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files);
220
    Status read_deletion_vector(const std::string& data_file_path,
221
                                const TIcebergDeleteFileDesc& delete_file_desc);
222
223
    Status _expand_block_if_need(Block* block);
224
    Status _shrink_block_if_need(Block* block);
225
226
    // Type aliases — must be defined before member function declarations that use them.
227
    using DeleteRows = std::vector<int64_t>;
228
    using DeleteFile = phmap::parallel_flat_hash_map<
229
            std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>,
230
            std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8,
231
            std::mutex>;
232
233
    PositionDeleteRange _get_range(const ColumnDictI32& file_path_column);
234
    PositionDeleteRange _get_range(const ColumnString& file_path_column);
235
    static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array,
236
                                  int64_t num_delete_rows, std::vector<int64_t>& result);
237
    void _gen_position_delete_file_range(Block& block, DeleteFile* position_delete,
238
                                         size_t read_rows, bool file_path_column_dictionary_coded);
239
    void _generate_equality_delete_block(Block* block,
240
                                         const std::vector<std::string>& equality_delete_col_names,
241
                                         const std::vector<DataTypePtr>& equality_delete_col_types);
242
243
    // Pure virtual: format-specific delete file reading
244
    virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0;
245
    virtual std::unique_ptr<GenericReader> _create_equality_reader(
246
            const TFileRangeDesc& delete_desc) = 0;
247
248
0
    static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
249
250
    /// Build the Iceberg V2 row-id struct column.
251
    static Status _build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path,
252
                                              const std::vector<rowid_t>& row_ids,
253
                                              int32_t partition_spec_id,
254
                                              const std::string& partition_data_json,
255
0
                                              MutableColumnPtr* column_out) {
256
0
        if (type == nullptr || column_out == nullptr) {
257
0
            return Status::InvalidArgument("Invalid iceberg rowid column type or output column");
258
0
        }
259
0
        MutableColumnPtr column = type->create_column();
260
0
        auto nullable_col = check_and_get_column<ColumnNullable>(column.get());
261
0
        ColumnStruct* struct_col = nullptr;
262
0
        if (nullable_col) {
263
0
            const auto struct_col_guard =
264
0
                    check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get());
265
0
            if (struct_col_guard) {
266
0
                struct_col = struct_col_guard.get();
267
0
            }
268
0
        } else {
269
0
            const auto struct_col_guard = check_and_get_column<ColumnStruct>(column.get());
270
0
            if (struct_col_guard) {
271
0
                struct_col = struct_col_guard.get();
272
0
            }
273
0
        }
274
0
        if (struct_col == nullptr || struct_col->tuple_size() < 4) {
275
0
            return Status::InternalError("Invalid iceberg rowid column structure");
276
0
        }
277
0
        size_t num_rows = row_ids.size();
278
0
        auto& file_path_col = struct_col->get_column(0);
279
0
        auto& row_pos_col = struct_col->get_column(1);
280
0
        auto& spec_id_col = struct_col->get_column(2);
281
0
        auto& partition_data_col = struct_col->get_column(3);
282
0
        file_path_col.reserve(num_rows);
283
0
        row_pos_col.reserve(num_rows);
284
0
        spec_id_col.reserve(num_rows);
285
0
        partition_data_col.reserve(num_rows);
286
0
        for (size_t i = 0; i < num_rows; ++i) {
287
0
            file_path_col.insert_data(file_path.data(), file_path.size());
288
0
        }
289
0
        for (size_t i = 0; i < num_rows; ++i) {
290
0
            int64_t row_pos = static_cast<int64_t>(row_ids[i]);
291
0
            row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos));
292
0
        }
293
0
        for (size_t i = 0; i < num_rows; ++i) {
294
0
            int32_t spec_id = partition_spec_id;
295
0
            spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id));
296
0
        }
297
0
        for (size_t i = 0; i < num_rows; ++i) {
298
0
            partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size());
299
0
        }
300
0
        if (nullable_col) {
301
0
            nullable_col->get_null_map_data().resize_fill(num_rows, 0);
302
0
        }
303
0
        *column_out = std::move(column);
304
0
        return Status::OK();
305
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE
306
307
    struct IcebergProfile {
308
        RuntimeProfile::Counter* num_delete_files;
309
        RuntimeProfile::Counter* num_delete_rows;
310
        RuntimeProfile::Counter* delete_files_read_time;
311
        RuntimeProfile::Counter* delete_rows_sort_time;
312
        RuntimeProfile::Counter* parse_delete_file_time;
313
    };
314
315
    bool _need_row_id_column = false;
316
    std::string _current_file_path;
317
    int32_t _partition_spec_id = 0;
318
    std::string _partition_data_json;
319
320
    ShardedKVCache* _kv_cache;
321
    IcebergProfile _iceberg_profile;
322
    const std::vector<int64_t>* _iceberg_delete_rows = nullptr;
323
    std::vector<std::string> _expand_col_names;
324
    std::vector<ColumnWithTypeAndName> _expand_columns;
325
    std::vector<std::string> _all_required_col_names;
326
    Fileformat _file_format = Fileformat::NONE;
327
328
    const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
329
    const std::string ICEBERG_FILE_PATH = "file_path";
330
    const std::string ICEBERG_ROW_POS = "pos";
331
    const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS};
332
    const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = {
333
            {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}};
334
    const int ICEBERG_FILE_PATH_INDEX = 0;
335
    const int ICEBERG_FILE_POS_INDEX = 1;
336
    const int READ_DELETE_FILE_BATCH_SIZE = 102400;
337
338
    // all ids that need read for eq delete (from all eq delete files)
339
    std::set<int> _equality_delete_col_ids;
340
    // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls
341
    std::map<std::vector<int>, int> _equality_delete_block_map;
342
    // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after
343
    // creating entries in _equality_delete_impls.
344
    std::vector<Block> _equality_delete_blocks;
345
    std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls;
346
347
    // id -> block column name
348
    std::unordered_map<int, std::string> _id_to_block_column_name;
349
350
    // File column names used during init
351
    std::vector<std::string> _file_col_names;
352
353
    std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
354
            _create_topn_row_id_column_iterator;
355
356
    static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
357
    static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
358
            "_last_updated_sequence_number";
359
    struct RowLineageColumns {
360
        int64_t first_row_id = -1;
361
        int64_t last_updated_sequence_number = -1;
362
    };
363
    RowLineageColumns _row_lineage_columns;
364
};
365
366
// ============================================================================
367
// Template method implementations (must be in header for templates)
368
// ============================================================================
369
370
template <typename BaseReader>
371
2
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
372
    // COUNT(*) short-circuit
373
2
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
374
2
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
375
2
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
376
0
        return Status::OK();
377
0
    }
378
379
2
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
380
2
    const auto& version = table_desc.format_version;
381
2
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
382
2
        return Status::OK();
383
2
    }
384
385
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
386
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
387
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
388
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
389
0
        if (desc.content == POSITION_DELETE) {
390
0
            position_delete_files.emplace_back(desc);
391
0
        } else if (desc.content == EQUALITY_DELETE) {
392
0
            equality_delete_files.emplace_back(desc);
393
0
        } else if (desc.content == DELETION_VECTOR) {
394
0
            deletion_vector_files.emplace_back(desc);
395
0
        }
396
0
    }
397
398
0
    if (!equality_delete_files.empty()) {
399
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
400
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
401
0
    }
402
403
0
    if (!deletion_vector_files.empty()) {
404
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
405
            /*
406
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
407
             * at execution time than position delete files. Unlike equality or position delete files, there can be
408
             * at most one deletion vector for a given data file in a snapshot.
409
             */
410
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
411
0
        }
412
0
        RETURN_IF_ERROR(
413
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
414
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
415
0
    } else if (!position_delete_files.empty()) {
416
0
        RETURN_IF_ERROR(
417
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
418
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
419
0
    }
420
421
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
422
0
    return Status::OK();
423
0
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_init_row_filtersEv
Line
Count
Source
371
1
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
372
    // COUNT(*) short-circuit
373
1
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
374
1
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
375
1
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
376
0
        return Status::OK();
377
0
    }
378
379
1
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
380
1
    const auto& version = table_desc.format_version;
381
1
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
382
1
        return Status::OK();
383
1
    }
384
385
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
386
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
387
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
388
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
389
0
        if (desc.content == POSITION_DELETE) {
390
0
            position_delete_files.emplace_back(desc);
391
0
        } else if (desc.content == EQUALITY_DELETE) {
392
0
            equality_delete_files.emplace_back(desc);
393
0
        } else if (desc.content == DELETION_VECTOR) {
394
0
            deletion_vector_files.emplace_back(desc);
395
0
        }
396
0
    }
397
398
0
    if (!equality_delete_files.empty()) {
399
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
400
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
401
0
    }
402
403
0
    if (!deletion_vector_files.empty()) {
404
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
405
            /*
406
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
407
             * at execution time than position delete files. Unlike equality or position delete files, there can be
408
             * at most one deletion vector for a given data file in a snapshot.
409
             */
410
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
411
0
        }
412
0
        RETURN_IF_ERROR(
413
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
414
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
415
0
    } else if (!position_delete_files.empty()) {
416
0
        RETURN_IF_ERROR(
417
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
418
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
419
0
    }
420
421
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
422
0
    return Status::OK();
423
0
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_init_row_filtersEv
Line
Count
Source
371
1
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
372
    // COUNT(*) short-circuit
373
1
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
374
1
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
375
1
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
376
0
        return Status::OK();
377
0
    }
378
379
1
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
380
1
    const auto& version = table_desc.format_version;
381
1
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
382
1
        return Status::OK();
383
1
    }
384
385
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
386
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
387
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
388
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
389
0
        if (desc.content == POSITION_DELETE) {
390
0
            position_delete_files.emplace_back(desc);
391
0
        } else if (desc.content == EQUALITY_DELETE) {
392
0
            equality_delete_files.emplace_back(desc);
393
0
        } else if (desc.content == DELETION_VECTOR) {
394
0
            deletion_vector_files.emplace_back(desc);
395
0
        }
396
0
    }
397
398
0
    if (!equality_delete_files.empty()) {
399
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
400
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
401
0
    }
402
403
0
    if (!deletion_vector_files.empty()) {
404
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
405
            /*
406
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
407
             * at execution time than position delete files. Unlike equality or position delete files, there can be
408
             * at most one deletion vector for a given data file in a snapshot.
409
             */
410
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
411
0
        }
412
0
        RETURN_IF_ERROR(
413
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
414
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
415
0
    } else if (!position_delete_files.empty()) {
416
0
        RETURN_IF_ERROR(
417
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
418
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
419
0
    }
420
421
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
422
0
    return Status::OK();
423
0
}
424
425
template <typename BaseReader>
426
Status IcebergReaderMixin<BaseReader>::_equality_delete_base(
427
0
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
428
0
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
429
0
            partition_columns;
430
0
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
431
432
0
    for (const auto& delete_file : delete_files) {
433
0
        TFileRangeDesc delete_desc;
434
0
        delete_desc.__set_fs_name(this->get_scan_range().fs_name);
435
0
        delete_desc.path = delete_file.path;
436
0
        delete_desc.start_offset = 0;
437
0
        delete_desc.size = -1;
438
0
        delete_desc.file_size = -1;
439
440
0
        if (!delete_file.__isset.field_ids) [[unlikely]] {
441
0
            return Status::InternalError(
442
0
                    "missing delete field ids when reading equality delete file");
443
0
        }
444
0
        auto& read_column_field_ids = delete_file.field_ids;
445
0
        std::set<int> read_column_field_ids_set;
446
0
        for (const auto& field_id : read_column_field_ids) {
447
0
            read_column_field_ids_set.insert(field_id);
448
0
            _equality_delete_col_ids.insert(field_id);
449
0
        }
450
451
0
        std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc);
452
0
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
453
454
0
        std::vector<std::string> equality_delete_col_names;
455
0
        std::vector<DataTypePtr> equality_delete_col_types;
456
457
        // Build delete col names/types/ids by matching field_ids from delete file schema.
458
        // Master iterates delete file's FieldDescriptor and uses field_id to match,
459
        // NOT idx-based pairing (get_parsed_schema order != field_ids order).
460
0
        std::vector<std::string> delete_col_names;
461
0
        std::vector<DataTypePtr> delete_col_types;
462
0
        std::vector<int> delete_col_ids;
463
0
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
464
465
0
        if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) {
466
0
            const FieldDescriptor* delete_field_desc = nullptr;
467
0
            RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&delete_field_desc));
468
0
            DCHECK(delete_field_desc != nullptr);
469
470
0
            for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) {
471
0
                if (delete_file_field.field_id == -1) [[unlikely]] {
472
0
                    return Status::DataQualityError(
473
0
                            "missing field id when reading equality delete file");
474
0
                }
475
0
                if (!read_column_field_ids_set.contains(delete_file_field.field_id)) {
476
0
                    continue;
477
0
                }
478
0
                if (delete_file_field.children.size() > 0) [[unlikely]] {
479
0
                    return Status::InternalError(
480
0
                            "can not support read complex column in equality delete file");
481
0
                }
482
483
0
                delete_col_ids.emplace_back(delete_file_field.field_id);
484
0
                delete_col_names.emplace_back(delete_file_field.name);
485
0
                delete_col_types.emplace_back(make_nullable(delete_file_field.data_type));
486
487
0
                int field_id = delete_file_field.field_id;
488
0
                if (!_id_to_block_column_name.contains(field_id)) {
489
0
                    _id_to_block_column_name.emplace(field_id, delete_file_field.name);
490
0
                    _expand_col_names.emplace_back(delete_file_field.name);
491
0
                    _expand_columns.emplace_back(
492
0
                            make_nullable(delete_file_field.data_type)->create_column(),
493
0
                            make_nullable(delete_file_field.data_type), delete_file_field.name);
494
0
                }
495
0
            }
496
0
            for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
497
0
                delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
498
0
            }
499
            // Delete files have TFileRangeDesc.size=-1, which would cause
500
            // set_fill_columns to return EndOfFile("No row group to read")
501
            // when _filter_groups is true. Master passes filter_groups=false.
502
0
            ParquetInitContext eq_delete_ctx;
503
0
            eq_delete_ctx.filter_groups = false;
504
0
            eq_delete_ctx.column_names = delete_col_names;
505
0
            eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx;
506
0
            auto st2 = parquet_reader->init_reader(&eq_delete_ctx);
507
0
            if (!st2.ok()) {
508
0
                return st2;
509
0
            }
510
0
        } else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) {
511
            // For ORC: use get_parsed_schema with field_ids from delete_file
512
            // ORC field_ids come from the Thrift descriptor, not from ORC metadata
513
0
            RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names,
514
0
                                                             &equality_delete_col_types));
515
0
            for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) {
516
0
                if (idx < read_column_field_ids.size()) {
517
0
                    int field_id = read_column_field_ids[idx];
518
0
                    if (!read_column_field_ids_set.contains(field_id)) continue;
519
0
                    delete_col_ids.emplace_back(field_id);
520
0
                    delete_col_names.emplace_back(equality_delete_col_names[idx]);
521
0
                    delete_col_types.emplace_back(make_nullable(equality_delete_col_types[idx]));
522
0
                    if (!_id_to_block_column_name.contains(field_id)) {
523
0
                        _id_to_block_column_name.emplace(field_id, equality_delete_col_names[idx]);
524
0
                        _expand_col_names.emplace_back(equality_delete_col_names[idx]);
525
0
                        _expand_columns.emplace_back(
526
0
                                make_nullable(equality_delete_col_types[idx])->create_column(),
527
0
                                make_nullable(equality_delete_col_types[idx]),
528
0
                                equality_delete_col_names[idx]);
529
0
                    }
530
0
                }
531
0
            }
532
0
            for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
533
0
                delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
534
0
            }
535
0
            OrcInitContext eq_delete_ctx;
536
0
            eq_delete_ctx.column_names = delete_col_names;
537
0
            eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx;
538
0
            RETURN_IF_ERROR(orc_reader->init_reader(&eq_delete_ctx));
539
0
        } else {
540
0
            return Status::InternalError("Unsupported format of delete file");
541
0
        }
542
543
0
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
544
0
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
545
0
            Block block;
546
0
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
547
0
            _equality_delete_blocks.emplace_back(block);
548
0
        }
549
0
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
550
551
0
        bool eof = false;
552
0
        while (!eof) {
553
0
            Block tmp_block;
554
0
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
555
0
            size_t read_rows = 0;
556
0
            auto st = delete_reader->get_next_block(&tmp_block, &read_rows, &eof);
557
0
            if (!st.ok()) {
558
0
                return st;
559
0
            }
560
0
            if (read_rows > 0) {
561
0
                MutableBlock mutable_block(&eq_file_block);
562
0
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
563
0
            }
564
0
        }
565
0
    }
566
567
0
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
568
0
        auto& eq_file_block = _equality_delete_blocks[block_idx];
569
0
        auto equality_delete_impl =
570
0
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
571
0
        RETURN_IF_ERROR(equality_delete_impl->init(this->get_profile()));
572
0
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
573
0
    }
574
0
    return Status::OK();
575
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE
576
577
template <typename BaseReader>
578
void IcebergReaderMixin<BaseReader>::_generate_equality_delete_block(
579
        Block* block, const std::vector<std::string>& equality_delete_col_names,
580
0
        const std::vector<DataTypePtr>& equality_delete_col_types) {
581
0
    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
582
0
        DataTypePtr data_type = make_nullable(equality_delete_col_types[i]);
583
0
        MutableColumnPtr data_column = data_type->create_column();
584
0
        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
585
0
                                            equality_delete_col_names[i]));
586
0
    }
587
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE
588
589
template <typename BaseReader>
590
2
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
591
2
    std::set<std::string> names;
592
2
    auto block_names = block->get_names();
593
2
    names.insert(block_names.begin(), block_names.end());
594
2
    for (auto& col : _expand_columns) {
595
0
        col.column->assume_mutable()->clear();
596
0
        if (names.contains(col.name)) {
597
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
598
0
        }
599
0
        names.insert(col.name);
600
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
601
0
        block->insert(col);
602
0
    }
603
2
    return Status::OK();
604
2
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_expand_block_if_needEPNS_5BlockE
Line
Count
Source
590
1
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
591
1
    std::set<std::string> names;
592
1
    auto block_names = block->get_names();
593
1
    names.insert(block_names.begin(), block_names.end());
594
1
    for (auto& col : _expand_columns) {
595
0
        col.column->assume_mutable()->clear();
596
0
        if (names.contains(col.name)) {
597
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
598
0
        }
599
0
        names.insert(col.name);
600
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
601
0
        block->insert(col);
602
0
    }
603
1
    return Status::OK();
604
1
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_expand_block_if_needEPNS_5BlockE
Line
Count
Source
590
1
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
591
1
    std::set<std::string> names;
592
1
    auto block_names = block->get_names();
593
1
    names.insert(block_names.begin(), block_names.end());
594
1
    for (auto& col : _expand_columns) {
595
0
        col.column->assume_mutable()->clear();
596
0
        if (names.contains(col.name)) {
597
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
598
0
        }
599
0
        names.insert(col.name);
600
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
601
0
        block->insert(col);
602
0
    }
603
1
    return Status::OK();
604
1
}
605
606
template <typename BaseReader>
607
2
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
608
2
    std::set<size_t> positions_to_erase;
609
2
    for (const std::string& expand_col : _expand_col_names) {
610
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
611
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
612
0
                                         block->dump_names());
613
0
        }
614
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
615
0
    }
616
2
    block->erase(positions_to_erase);
617
2
    for (const std::string& expand_col : _expand_col_names) {
618
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
619
0
    }
620
2
    return Status::OK();
621
2
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_shrink_block_if_needEPNS_5BlockE
Line
Count
Source
607
1
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
608
1
    std::set<size_t> positions_to_erase;
609
1
    for (const std::string& expand_col : _expand_col_names) {
610
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
611
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
612
0
                                         block->dump_names());
613
0
        }
614
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
615
0
    }
616
1
    block->erase(positions_to_erase);
617
1
    for (const std::string& expand_col : _expand_col_names) {
618
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
619
0
    }
620
1
    return Status::OK();
621
1
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_shrink_block_if_needEPNS_5BlockE
Line
Count
Source
607
1
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
608
1
    std::set<size_t> positions_to_erase;
609
1
    for (const std::string& expand_col : _expand_col_names) {
610
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
611
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
612
0
                                         block->dump_names());
613
0
        }
614
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
615
0
    }
616
1
    block->erase(positions_to_erase);
617
1
    for (const std::string& expand_col : _expand_col_names) {
618
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
619
0
    }
620
1
    return Status::OK();
621
1
}
622
623
template <typename BaseReader>
624
Status IcebergReaderMixin<BaseReader>::_position_delete_base(
625
0
        const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
626
0
    std::vector<DeleteRows*> delete_rows_array;
627
0
    int64_t num_delete_rows = 0;
628
0
    for (const auto& delete_file : delete_files) {
629
0
        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
630
0
        Status create_status = Status::OK();
631
0
        auto* delete_file_cache = _kv_cache->template get<DeleteFile>(
632
0
                _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* {
633
0
                    auto* position_delete = new DeleteFile;
634
0
                    TFileRangeDesc delete_file_range;
635
0
                    delete_file_range.__set_fs_name(this->get_scan_range().fs_name);
636
0
                    delete_file_range.path = delete_file.path;
637
0
                    delete_file_range.start_offset = 0;
638
0
                    delete_file_range.size = -1;
639
0
                    delete_file_range.file_size = -1;
640
0
                    create_status = _read_position_delete_file(&delete_file_range, position_delete);
641
0
                    if (!create_status) {
642
0
                        return nullptr;
643
0
                    }
644
0
                    return position_delete;
645
0
                });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev
646
0
        if (create_status.is<ErrorCode::END_OF_FILE>()) {
647
0
            continue;
648
0
        } else if (!create_status.ok()) {
649
0
            return create_status;
650
0
        }
651
652
0
        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
653
0
        auto get_value = [&](const auto& v) {
654
0
            DeleteRows* row_ids = v.second.get();
655
0
            if (!row_ids->empty()) {
656
0
                delete_rows_array.emplace_back(row_ids);
657
0
                num_delete_rows += row_ids->size();
658
0
            }
659
0
        };
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_
660
0
        delete_file_map.if_contains(data_file_path, get_value);
661
0
    }
662
0
    if (num_delete_rows > 0) {
663
0
        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
664
0
        _iceberg_delete_rows =
665
0
                _kv_cache->template get<DeleteRows>(data_file_path, [&]() -> DeleteRows* {
666
0
                    auto* data_file_position_delete = new DeleteRows;
667
0
                    _sort_delete_rows(delete_rows_array, num_delete_rows,
668
0
                                      *data_file_position_delete);
669
0
                    return data_file_position_delete;
670
0
                });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv
671
0
        set_delete_rows();
672
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
673
0
    }
674
0
    return Status::OK();
675
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE
676
677
template <typename BaseReader>
678
typename IcebergReaderMixin<BaseReader>::PositionDeleteRange
679
0
IcebergReaderMixin<BaseReader>::_get_range(const ColumnDictI32& file_path_column) {
680
0
    PositionDeleteRange range;
681
0
    size_t read_rows = file_path_column.get_data().size();
682
0
    const int* code_path = file_path_column.get_data().data();
683
0
    const int* code_path_start = code_path;
684
0
    const int* code_path_end = code_path + read_rows;
685
0
    while (code_path < code_path_end) {
686
0
        int code = code_path[0];
687
0
        const int* code_end = std::upper_bound(code_path, code_path_end, code);
688
0
        range.data_file_path.emplace_back(file_path_column.get_value(code).to_string());
689
0
        range.range.emplace_back(code_path - code_path_start, code_end - code_path_start);
690
0
        code_path = code_end;
691
0
    }
692
0
    return range;
693
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_13ColumnDictI32E
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_13ColumnDictI32E
694
695
template <typename BaseReader>
696
typename IcebergReaderMixin<BaseReader>::PositionDeleteRange
697
0
IcebergReaderMixin<BaseReader>::_get_range(const ColumnString& file_path_column) {
698
0
    PositionDeleteRange range;
699
0
    size_t read_rows = file_path_column.size();
700
0
    size_t index = 0;
701
0
    while (index < read_rows) {
702
0
        StringRef data_path = file_path_column.get_data_at(index);
703
0
        size_t left = index - 1;
704
0
        size_t right = read_rows;
705
0
        while (left + 1 != right) {
706
0
            size_t mid = left + (right - left) / 2;
707
0
            if (file_path_column.get_data_at(mid) > data_path) {
708
0
                right = mid;
709
0
            } else {
710
0
                left = mid;
711
0
            }
712
0
        }
713
0
        range.data_file_path.emplace_back(data_path.to_string());
714
0
        range.range.emplace_back(index, left + 1);
715
0
        index = left + 1;
716
0
    }
717
0
    return range;
718
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_9ColumnStrIjEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_9ColumnStrIjEE
719
720
template <typename BaseReader>
721
void IcebergReaderMixin<BaseReader>::_sort_delete_rows(
722
        const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows,
723
0
        std::vector<int64_t>& result) {
724
0
    if (delete_rows_array.empty()) {
725
0
        return;
726
0
    }
727
0
    if (delete_rows_array.size() == 1) {
728
0
        result.resize(num_delete_rows);
729
0
        memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows);
730
0
        return;
731
0
    }
732
0
    if (delete_rows_array.size() == 2) {
733
0
        result.resize(num_delete_rows);
734
0
        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
735
0
                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
736
0
                   result.begin());
737
0
        return;
738
0
    }
739
740
0
    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
741
0
    result.resize(num_delete_rows);
742
0
    auto row_id_iter = result.begin();
743
0
    auto iter_end = result.end();
744
0
    std::vector<vec_pair> rows_array;
745
0
    for (auto* rows : delete_rows_array) {
746
0
        if (!rows->empty()) {
747
0
            rows_array.emplace_back(rows->begin(), rows->end());
748
0
        }
749
0
    }
750
0
    size_t array_size = rows_array.size();
751
0
    while (row_id_iter != iter_end) {
752
0
        int64_t min_index = 0;
753
0
        int64_t min = *rows_array[0].first;
754
0
        for (size_t i = 0; i < array_size; ++i) {
755
0
            if (*rows_array[i].first < min) {
756
0
                min_index = i;
757
0
                min = *rows_array[i].first;
758
0
            }
759
0
        }
760
0
        *row_id_iter++ = min;
761
0
        rows_array[min_index].first++;
762
0
        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
763
0
            rows_array.erase(rows_array.begin() + min_index);
764
0
            array_size--;
765
0
        }
766
0
    }
767
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_
768
769
template <typename BaseReader>
770
void IcebergReaderMixin<BaseReader>::_gen_position_delete_file_range(
771
        Block& block, DeleteFile* position_delete, size_t read_rows,
772
0
        bool file_path_column_dictionary_coded) {
773
0
    SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
774
0
    auto name_to_pos_map = block.get_name_to_pos_map();
775
0
    ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
776
0
    DCHECK_EQ(path_column->size(), read_rows);
777
0
    ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column;
778
0
    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
779
0
    const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data();
780
0
    PositionDeleteRange range;
781
0
    if (file_path_column_dictionary_coded) {
782
0
        range = _get_range(assert_cast<const ColumnDictI32&>(*path_column));
783
0
    } else {
784
0
        range = _get_range(assert_cast<const ColumnString&>(*path_column));
785
0
    }
786
0
    for (int i = 0; i < range.range.size(); ++i) {
787
0
        std::string key = range.data_file_path[i];
788
0
        auto iter = position_delete->find(key);
789
0
        DeleteRows* delete_rows;
790
0
        if (iter == position_delete->end()) {
791
0
            delete_rows = new DeleteRows;
792
0
            std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows);
793
0
            (*position_delete)[key] = std::move(delete_rows_ptr);
794
0
        } else {
795
0
            delete_rows = iter->second.get();
796
0
        }
797
0
        const int64_t* cpy_start = src_data + range.range[i].first;
798
0
        const int64_t cpy_count = range.range[i].second - range.range[i].first;
799
0
        int64_t origin_size = delete_rows->size();
800
0
        delete_rows->resize(origin_size + cpy_count);
801
0
        int64_t* dest_position = &(*delete_rows)[origin_size];
802
0
        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
803
0
    }
804
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb
805
806
template <typename BaseReader>
807
Status IcebergReaderMixin<BaseReader>::read_deletion_vector(
808
0
        const std::string& data_file_path, const TIcebergDeleteFileDesc& delete_file_desc) {
809
0
    Status create_status = Status::OK();
810
0
    SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
811
0
    _iceberg_delete_rows = _kv_cache->template get<
812
0
            DeleteRows>(data_file_path, [&]() -> DeleteRows* {
813
0
        auto* delete_rows = new DeleteRows;
814
815
0
        TFileRangeDesc delete_range;
816
0
        delete_range.__set_fs_name(this->get_scan_range().fs_name);
817
0
        delete_range.path = delete_file_desc.path;
818
0
        delete_range.start_offset = delete_file_desc.content_offset;
819
0
        delete_range.size = delete_file_desc.content_size_in_bytes;
820
0
        delete_range.file_size = -1;
821
822
0
        DeletionVectorReader dv_reader(this->get_state(), this->get_profile(),
823
0
                                       this->get_scan_params(), delete_range, this->get_io_ctx());
824
0
        create_status = dv_reader.open();
825
0
        if (!create_status.ok()) [[unlikely]] {
826
0
            return nullptr;
827
0
        }
828
829
0
        size_t buffer_size = delete_range.size;
830
0
        std::vector<char> buf(buffer_size);
831
0
        if (buffer_size < 12) [[unlikely]] {
832
0
            create_status = Status::DataQualityError("Deletion vector file size too small: {}",
833
0
                                                     buffer_size);
834
0
            return nullptr;
835
0
        }
836
837
0
        create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size});
838
0
        if (!create_status) [[unlikely]] {
839
0
            return nullptr;
840
0
        }
841
842
0
        auto total_length = BigEndian::Load32(buf.data());
843
0
        if (total_length + 8 != buffer_size) [[unlikely]] {
844
0
            create_status = Status::DataQualityError(
845
0
                    "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8,
846
0
                    buffer_size);
847
0
            return nullptr;
848
0
        }
849
850
0
        constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
851
0
        if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] {
852
0
            create_status = Status::DataQualityError("Deletion vector magic number mismatch");
853
0
            return nullptr;
854
0
        }
855
856
0
        roaring::Roaring64Map bitmap;
857
0
        SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
858
0
        try {
859
0
            bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12);
860
0
        } catch (const std::runtime_error& e) {
861
0
            create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
862
0
            return nullptr;
863
0
        }
864
865
0
        delete_rows->reserve(bitmap.cardinality());
866
0
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
867
0
            delete_rows->push_back(*it);
868
0
        }
869
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
870
0
        return delete_rows;
871
0
    });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv
872
873
0
    RETURN_IF_ERROR(create_status);
874
0
    if (!_iceberg_delete_rows->empty()) [[likely]] {
875
0
        set_delete_rows();
876
0
    }
877
0
    return Status::OK();
878
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE
879
880
} // namespace doris