Coverage Report

Created: 2026-04-22 18:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/iceberg_reader_mixin.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstddef>
21
#include <cstdint>
22
#include <string>
23
#include <unordered_map>
24
#include <vector>
25
26
#include "common/consts.h"
27
#include "common/status.h"
28
#include "core/block/block.h"
29
#include "core/column/column_dictionary.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_string.h"
32
#include "core/column/column_struct.h"
33
#include "core/data_type/data_type_number.h"
34
#include "core/data_type/data_type_string.h"
35
#include "format/generic_reader.h"
36
#include "format/table/deletion_vector_reader.h"
37
#include "format/table/equality_delete.h"
38
#include "format/table/table_schema_change_helper.h"
39
#include "runtime/runtime_profile.h"
40
#include "runtime/runtime_state.h"
41
#include "storage/olap_common.h"
42
43
namespace doris {
44
class TIcebergDeleteFileDesc;
45
} // namespace doris
46
47
namespace doris {
48
49
class ShardedKVCache;
50
51
// CRTP mixin for Iceberg reader functionality.
52
// BaseReader should be ParquetReader or OrcReader.
53
// Inherits BaseReader + TableSchemaChangeHelper, providing shared Iceberg logic
54
// (delete files, deletion vectors, equality delete, $row_id synthesis).
55
//
56
// Inheritance chain:
57
//   IcebergParquetReader -> IcebergReaderMixin<ParquetReader> -> ParquetReader -> GenericReader
58
//   IcebergOrcReader     -> IcebergReaderMixin<OrcReader>     -> OrcReader     -> GenericReader
59
template <typename BaseReader>
60
class IcebergReaderMixin : public BaseReader, public TableSchemaChangeHelper {
61
public:
62
    struct PositionDeleteRange {
63
        std::vector<std::string> data_file_path;
64
        std::vector<std::pair<int, int>> range;
65
    };
66
67
    // Forward BaseReader constructor arguments + Iceberg-specific kv_cache
68
    template <typename... Args>
69
    IcebergReaderMixin(ShardedKVCache* kv_cache, Args&&... args)
70
14
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
14
        static const char* iceberg_profile = "IcebergProfile";
72
14
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
14
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
14
                                                              TUnit::UNIT, iceberg_profile);
75
14
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
14
                                                             TUnit::UNIT, iceberg_profile);
77
14
        _iceberg_profile.delete_files_read_time =
78
14
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
14
        _iceberg_profile.delete_rows_sort_time =
80
14
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
14
        _iceberg_profile.parse_delete_file_time =
82
14
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
14
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEEC2IJRPNS_14RuntimeProfileERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRPKN4cctz9time_zoneERPNS_2io9IOContextERPNS_12RuntimeStateERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_
Line
Count
Source
70
7
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
7
        static const char* iceberg_profile = "IcebergProfile";
72
7
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
7
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
7
                                                              TUnit::UNIT, iceberg_profile);
75
7
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
7
                                                             TUnit::UNIT, iceberg_profile);
77
7
        _iceberg_profile.delete_files_read_time =
78
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
7
        _iceberg_profile.delete_rows_sort_time =
80
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
7
        _iceberg_profile.parse_delete_file_time =
82
7
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
7
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEEC2IJRPNS_14RuntimeProfileERPNS_12RuntimeStateERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERPNS_2io9IOContextERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_
Line
Count
Source
70
7
            : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) {
71
7
        static const char* iceberg_profile = "IcebergProfile";
72
7
        ADD_TIMER(this->get_profile(), iceberg_profile);
73
7
        _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles",
74
7
                                                              TUnit::UNIT, iceberg_profile);
75
7
        _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows",
76
7
                                                             TUnit::UNIT, iceberg_profile);
77
7
        _iceberg_profile.delete_files_read_time =
78
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile);
79
7
        _iceberg_profile.delete_rows_sort_time =
80
7
                ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile);
81
7
        _iceberg_profile.parse_delete_file_time =
82
7
                ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile);
83
7
    }
84
85
14
    ~IcebergReaderMixin() override = default;
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEED2Ev
Line
Count
Source
85
7
    ~IcebergReaderMixin() override = default;
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEED2Ev
Line
Count
Source
85
7
    ~IcebergReaderMixin() override = default;
86
87
    void set_current_file_info(const std::string& file_path, int32_t partition_spec_id,
88
0
                               const std::string& partition_data_json) {
89
0
        _current_file_path = file_path;
90
0
        _partition_spec_id = partition_spec_id;
91
0
        _partition_data_json = partition_data_json;
92
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_
93
94
    enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR };
95
    enum Fileformat { NONE, PARQUET, ORC, AVRO };
96
97
    virtual void set_delete_rows() = 0;
98
99
    // Table-level COUNT(*) is handled by CountReader (created by FileScanner after
100
    // init_reader). If _do_get_next_block is called, COUNT must have been resolved.
101
2
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
2
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
2
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE18_do_get_next_blockEPNS_5BlockEPmPb
Line
Count
Source
101
1
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
1
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE18_do_get_next_blockEPNS_5BlockEPmPb
Line
Count
Source
101
1
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
102
        DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT);
103
1
        return BaseReader::_do_get_next_block(block, read_rows, eof);
104
1
    }
105
106
    void set_create_row_id_column_iterator_func(
107
0
            std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> create_func) {
108
0
        _create_topn_row_id_column_iterator = create_func;
109
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE
110
111
protected:
112
    // ---- Hook implementations ----
113
114
    // Called before reading a block: expand block for equality delete columns + detect row_id
115
2
    Status on_before_read_block(Block* block) override {
116
2
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
2
        return Status::OK();
118
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20on_before_read_blockEPNS_5BlockE
Line
Count
Source
115
1
    Status on_before_read_block(Block* block) override {
116
1
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
1
        return Status::OK();
118
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20on_before_read_blockEPNS_5BlockE
Line
Count
Source
115
1
    Status on_before_read_block(Block* block) override {
116
1
        RETURN_IF_ERROR(_expand_block_if_need(block));
117
1
        return Status::OK();
118
1
    }
119
120
    /// Fill Iceberg $row_id synthesized column. Registered as handler during init.
121
0
    Status _fill_iceberg_row_id(Block* block, size_t rows) {
122
0
        int row_id_pos = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL);
123
0
        DORIS_CHECK(row_id_pos >= 0);
124
125
        // Lazy-init file info: only set when $row_id is actually needed.
126
0
        const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
127
0
        std::string file_path = table_desc.original_file_path;
128
0
        int32_t partition_spec_id =
129
0
                table_desc.__isset.partition_spec_id ? table_desc.partition_spec_id : 0;
130
0
        std::string partition_data_json;
131
0
        if (table_desc.__isset.partition_data_json) {
132
0
            partition_data_json = table_desc.partition_data_json;
133
0
        }
134
0
        set_current_file_info(file_path, partition_spec_id, partition_data_json);
135
136
0
        const auto& row_ids = this->current_batch_row_positions();
137
0
        auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_pos));
138
0
        MutableColumnPtr row_id_column;
139
0
        RETURN_IF_ERROR(_build_iceberg_rowid_column(col_with_type.type, _current_file_path, row_ids,
140
0
                                                    _partition_spec_id, _partition_data_json,
141
0
                                                    &row_id_column));
142
0
        col_with_type.column = std::move(row_id_column);
143
0
        return Status::OK();
144
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20_fill_iceberg_row_idEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20_fill_iceberg_row_idEPNS_5BlockEm
145
146
0
    void _init_row_lineage_columns() {
147
0
        const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
148
0
        if (table_desc.__isset.first_row_id) {
149
0
            _row_lineage_columns.first_row_id = table_desc.first_row_id;
150
0
        }
151
0
        if (table_desc.__isset.last_updated_sequence_number) {
152
0
            _row_lineage_columns.last_updated_sequence_number =
153
0
                    table_desc.last_updated_sequence_number;
154
0
        }
155
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE25_init_row_lineage_columnsEv
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE25_init_row_lineage_columnsEv
156
157
0
    Status _fill_row_lineage_row_id(Block* block, size_t rows) {
158
0
        int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID);
159
0
        DORIS_CHECK(col_pos >= 0);
160
161
0
        if (_row_lineage_columns.first_row_id >= 0) {
162
0
            auto col = block->get_by_position(col_pos).column->assume_mutable();
163
0
            auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
164
0
            auto& null_map = nullable_column->get_null_map_data();
165
0
            auto& data =
166
0
                    assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
167
0
            const auto& row_ids = this->current_batch_row_positions();
168
0
            for (size_t i = 0; i < rows; ++i) {
169
0
                if (null_map[i] != 0) {
170
0
                    null_map[i] = 0;
171
0
                    data[i] = _row_lineage_columns.first_row_id + static_cast<int64_t>(row_ids[i]);
172
0
                }
173
0
            }
174
0
        }
175
0
        return Status::OK();
176
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm
177
178
0
    Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t rows) {
179
0
        int col_pos = block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER);
180
0
        DORIS_CHECK(col_pos >= 0);
181
182
0
        if (_row_lineage_columns.last_updated_sequence_number >= 0) {
183
0
            auto col = block->get_by_position(col_pos).column->assume_mutable();
184
0
            auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
185
0
            auto& null_map = nullable_column->get_null_map_data();
186
0
            auto& data =
187
0
                    assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
188
0
            for (size_t i = 0; i < rows; ++i) {
189
0
                if (null_map[i] != 0) {
190
0
                    null_map[i] = 0;
191
0
                    data[i] = _row_lineage_columns.last_updated_sequence_number;
192
0
                }
193
0
            }
194
0
        }
195
0
        return Status::OK();
196
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm
197
198
    // Called after reading a block: apply equality delete filter + shrink block
199
2
    Status on_after_read_block(Block* block, size_t* read_rows) override {
200
2
        if (!_equality_delete_impls.empty()) {
201
0
            std::unique_ptr<IColumn::Filter> filter =
202
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
203
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
204
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
205
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
206
0
                        *filter));
207
0
            }
208
0
            Block::filter_block_internal(block, *filter, block->columns());
209
0
            *read_rows = block->rows();
210
0
        }
211
2
        return _shrink_block_if_need(block);
212
2
    }
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE19on_after_read_blockEPNS_5BlockEPm
Line
Count
Source
199
1
    Status on_after_read_block(Block* block, size_t* read_rows) override {
200
1
        if (!_equality_delete_impls.empty()) {
201
0
            std::unique_ptr<IColumn::Filter> filter =
202
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
203
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
204
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
205
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
206
0
                        *filter));
207
0
            }
208
0
            Block::filter_block_internal(block, *filter, block->columns());
209
0
            *read_rows = block->rows();
210
0
        }
211
1
        return _shrink_block_if_need(block);
212
1
    }
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE19on_after_read_blockEPNS_5BlockEPm
Line
Count
Source
199
1
    Status on_after_read_block(Block* block, size_t* read_rows) override {
200
1
        if (!_equality_delete_impls.empty()) {
201
0
            std::unique_ptr<IColumn::Filter> filter =
202
0
                    std::make_unique<IColumn::Filter>(block->rows(), 1);
203
0
            for (auto& equality_delete_impl : _equality_delete_impls) {
204
0
                RETURN_IF_ERROR(equality_delete_impl->filter_data_block(
205
0
                        block, this->col_name_to_block_idx_ref(), _id_to_block_column_name,
206
0
                        *filter));
207
0
            }
208
0
            Block::filter_block_internal(block, *filter, block->columns());
209
0
            *read_rows = block->rows();
210
0
        }
211
1
        return _shrink_block_if_need(block);
212
1
    }
213
214
    // ---- Shared Iceberg methods ----
215
216
    Status _init_row_filters();
217
    Status _position_delete_base(const std::string data_file_path,
218
                                 const std::vector<TIcebergDeleteFileDesc>& delete_files);
219
    Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files);
220
    Status read_deletion_vector(const std::string& data_file_path,
221
                                const TIcebergDeleteFileDesc& delete_file_desc);
222
223
    Status _expand_block_if_need(Block* block);
224
    Status _shrink_block_if_need(Block* block);
225
226
    // Type aliases — must be defined before member function declarations that use them.
227
    using DeleteRows = std::vector<int64_t>;
228
    using DeleteFile = phmap::parallel_flat_hash_map<
229
            std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>,
230
            std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8,
231
            std::mutex>;
232
233
    PositionDeleteRange _get_range(const ColumnDictI32& file_path_column);
234
    PositionDeleteRange _get_range(const ColumnString& file_path_column);
235
    static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array,
236
                                  int64_t num_delete_rows, std::vector<int64_t>& result);
237
    void _gen_position_delete_file_range(Block& block, DeleteFile* position_delete,
238
                                         size_t read_rows, bool file_path_column_dictionary_coded);
239
    void _generate_equality_delete_block(Block* block,
240
                                         const std::vector<std::string>& equality_delete_col_names,
241
                                         const std::vector<DataTypePtr>& equality_delete_col_types);
242
243
    // Pure virtual: format-specific delete file reading
244
    virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0;
245
    virtual std::unique_ptr<GenericReader> _create_equality_reader(
246
            const TFileRangeDesc& delete_desc) = 0;
247
248
0
    static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
249
250
    /// Build the Iceberg V2 row-id struct column.
251
    static Status _build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path,
252
                                              const std::vector<rowid_t>& row_ids,
253
                                              int32_t partition_spec_id,
254
                                              const std::string& partition_data_json,
255
0
                                              MutableColumnPtr* column_out) {
256
0
        if (type == nullptr || column_out == nullptr) {
257
0
            return Status::InvalidArgument("Invalid iceberg rowid column type or output column");
258
0
        }
259
0
        MutableColumnPtr column = type->create_column();
260
0
        ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get());
261
0
        ColumnStruct* struct_col = nullptr;
262
0
        if (nullable_col != nullptr) {
263
0
            struct_col =
264
0
                    check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get());
265
0
        } else {
266
0
            struct_col = check_and_get_column<ColumnStruct>(column.get());
267
0
        }
268
0
        if (struct_col == nullptr || struct_col->tuple_size() < 4) {
269
0
            return Status::InternalError("Invalid iceberg rowid column structure");
270
0
        }
271
0
        size_t num_rows = row_ids.size();
272
0
        auto& file_path_col = struct_col->get_column(0);
273
0
        auto& row_pos_col = struct_col->get_column(1);
274
0
        auto& spec_id_col = struct_col->get_column(2);
275
0
        auto& partition_data_col = struct_col->get_column(3);
276
0
        file_path_col.reserve(num_rows);
277
0
        row_pos_col.reserve(num_rows);
278
0
        spec_id_col.reserve(num_rows);
279
0
        partition_data_col.reserve(num_rows);
280
0
        for (size_t i = 0; i < num_rows; ++i) {
281
0
            file_path_col.insert_data(file_path.data(), file_path.size());
282
0
        }
283
0
        for (size_t i = 0; i < num_rows; ++i) {
284
0
            int64_t row_pos = static_cast<int64_t>(row_ids[i]);
285
0
            row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos));
286
0
        }
287
0
        for (size_t i = 0; i < num_rows; ++i) {
288
0
            int32_t spec_id = partition_spec_id;
289
0
            spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id));
290
0
        }
291
0
        for (size_t i = 0; i < num_rows; ++i) {
292
0
            partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size());
293
0
        }
294
0
        if (nullable_col != nullptr) {
295
0
            nullable_col->get_null_map_data().resize_fill(num_rows, 0);
296
0
        }
297
0
        *column_out = std::move(column);
298
0
        return Status::OK();
299
0
    }
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE
300
301
    struct IcebergProfile {
302
        RuntimeProfile::Counter* num_delete_files;
303
        RuntimeProfile::Counter* num_delete_rows;
304
        RuntimeProfile::Counter* delete_files_read_time;
305
        RuntimeProfile::Counter* delete_rows_sort_time;
306
        RuntimeProfile::Counter* parse_delete_file_time;
307
    };
308
309
    bool _need_row_id_column = false;
310
    std::string _current_file_path;
311
    int32_t _partition_spec_id = 0;
312
    std::string _partition_data_json;
313
314
    ShardedKVCache* _kv_cache;
315
    IcebergProfile _iceberg_profile;
316
    const std::vector<int64_t>* _iceberg_delete_rows = nullptr;
317
    std::vector<std::string> _expand_col_names;
318
    std::vector<ColumnWithTypeAndName> _expand_columns;
319
    std::vector<std::string> _all_required_col_names;
320
    Fileformat _file_format = Fileformat::NONE;
321
322
    const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
323
    const std::string ICEBERG_FILE_PATH = "file_path";
324
    const std::string ICEBERG_ROW_POS = "pos";
325
    const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS};
326
    const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = {
327
            {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}};
328
    const int ICEBERG_FILE_PATH_INDEX = 0;
329
    const int ICEBERG_FILE_POS_INDEX = 1;
330
    const int READ_DELETE_FILE_BATCH_SIZE = 102400;
331
332
    // all ids that need read for eq delete (from all eq delete files)
333
    std::set<int> _equality_delete_col_ids;
334
    // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls
335
    std::map<std::vector<int>, int> _equality_delete_block_map;
336
    // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after
337
    // creating entries in _equality_delete_impls.
338
    std::vector<Block> _equality_delete_blocks;
339
    std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls;
340
341
    // id -> block column name
342
    std::unordered_map<int, std::string> _id_to_block_column_name;
343
344
    // File column names used during init
345
    std::vector<std::string> _file_col_names;
346
347
    std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
348
            _create_topn_row_id_column_iterator;
349
350
    static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
351
    static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
352
            "_last_updated_sequence_number";
353
    struct RowLineageColumns {
354
        int64_t first_row_id = -1;
355
        int64_t last_updated_sequence_number = -1;
356
    };
357
    RowLineageColumns _row_lineage_columns;
358
};
359
360
// ============================================================================
361
// Template method implementations (must be in header for templates)
362
// ============================================================================
363
364
template <typename BaseReader>
365
2
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
366
    // COUNT(*) short-circuit
367
2
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
368
2
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
369
2
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
370
0
        return Status::OK();
371
0
    }
372
373
2
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
374
2
    const auto& version = table_desc.format_version;
375
2
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
376
2
        return Status::OK();
377
2
    }
378
379
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
380
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
381
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
382
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
383
0
        if (desc.content == POSITION_DELETE) {
384
0
            position_delete_files.emplace_back(desc);
385
0
        } else if (desc.content == EQUALITY_DELETE) {
386
0
            equality_delete_files.emplace_back(desc);
387
0
        } else if (desc.content == DELETION_VECTOR) {
388
0
            deletion_vector_files.emplace_back(desc);
389
0
        }
390
0
    }
391
392
0
    if (!equality_delete_files.empty()) {
393
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
394
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
395
0
    }
396
397
0
    if (!deletion_vector_files.empty()) {
398
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
399
            /*
400
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
401
             * at execution time than position delete files. Unlike equality or position delete files, there can be
402
             * at most one deletion vector for a given data file in a snapshot.
403
             */
404
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
405
0
        }
406
0
        RETURN_IF_ERROR(
407
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
408
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
409
0
    } else if (!position_delete_files.empty()) {
410
0
        RETURN_IF_ERROR(
411
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
412
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
413
0
    }
414
415
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
416
0
    return Status::OK();
417
0
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_init_row_filtersEv
Line
Count
Source
365
1
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
366
    // COUNT(*) short-circuit
367
1
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
368
1
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
369
1
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
370
0
        return Status::OK();
371
0
    }
372
373
1
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
374
1
    const auto& version = table_desc.format_version;
375
1
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
376
1
        return Status::OK();
377
1
    }
378
379
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
380
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
381
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
382
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
383
0
        if (desc.content == POSITION_DELETE) {
384
0
            position_delete_files.emplace_back(desc);
385
0
        } else if (desc.content == EQUALITY_DELETE) {
386
0
            equality_delete_files.emplace_back(desc);
387
0
        } else if (desc.content == DELETION_VECTOR) {
388
0
            deletion_vector_files.emplace_back(desc);
389
0
        }
390
0
    }
391
392
0
    if (!equality_delete_files.empty()) {
393
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
394
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
395
0
    }
396
397
0
    if (!deletion_vector_files.empty()) {
398
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
399
            /*
400
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
401
             * at execution time than position delete files. Unlike equality or position delete files, there can be
402
             * at most one deletion vector for a given data file in a snapshot.
403
             */
404
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
405
0
        }
406
0
        RETURN_IF_ERROR(
407
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
408
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
409
0
    } else if (!position_delete_files.empty()) {
410
0
        RETURN_IF_ERROR(
411
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
412
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
413
0
    }
414
415
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
416
0
    return Status::OK();
417
0
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_init_row_filtersEv
Line
Count
Source
365
1
Status IcebergReaderMixin<BaseReader>::_init_row_filters() {
366
    // COUNT(*) short-circuit
367
1
    if (this->_push_down_agg_type == TPushAggOp::type::COUNT &&
368
1
        this->get_scan_range().table_format_params.__isset.table_level_row_count &&
369
1
        this->get_scan_range().table_format_params.table_level_row_count > 0) {
370
0
        return Status::OK();
371
0
    }
372
373
1
    const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params;
374
1
    const auto& version = table_desc.format_version;
375
1
    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
376
1
        return Status::OK();
377
1
    }
378
379
0
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
380
0
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
381
0
    std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
382
0
    for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
383
0
        if (desc.content == POSITION_DELETE) {
384
0
            position_delete_files.emplace_back(desc);
385
0
        } else if (desc.content == EQUALITY_DELETE) {
386
0
            equality_delete_files.emplace_back(desc);
387
0
        } else if (desc.content == DELETION_VECTOR) {
388
0
            deletion_vector_files.emplace_back(desc);
389
0
        }
390
0
    }
391
392
0
    if (!equality_delete_files.empty()) {
393
0
        RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
394
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
395
0
    }
396
397
0
    if (!deletion_vector_files.empty()) {
398
0
        if (deletion_vector_files.size() != 1) [[unlikely]] {
399
            /*
400
             * Deletion vectors are a binary representation of deletes for a single data file that is more efficient
401
             * at execution time than position delete files. Unlike equality or position delete files, there can be
402
             * at most one deletion vector for a given data file in a snapshot.
403
             */
404
0
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
405
0
        }
406
0
        RETURN_IF_ERROR(
407
0
                read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));
408
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
409
0
    } else if (!position_delete_files.empty()) {
410
0
        RETURN_IF_ERROR(
411
0
                _position_delete_base(table_desc.original_file_path, position_delete_files));
412
0
        this->set_push_down_agg_type(TPushAggOp::NONE);
413
0
    }
414
415
0
    COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
416
0
    return Status::OK();
417
0
}
418
419
template <typename BaseReader>
420
Status IcebergReaderMixin<BaseReader>::_equality_delete_base(
421
0
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
422
0
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
423
0
            partition_columns;
424
0
    std::unordered_map<std::string, VExprContextSPtr> missing_columns;
425
426
0
    for (const auto& delete_file : delete_files) {
427
0
        TFileRangeDesc delete_desc;
428
0
        delete_desc.__set_fs_name(this->get_scan_range().fs_name);
429
0
        delete_desc.path = delete_file.path;
430
0
        delete_desc.start_offset = 0;
431
0
        delete_desc.size = -1;
432
0
        delete_desc.file_size = -1;
433
434
0
        if (!delete_file.__isset.field_ids) [[unlikely]] {
435
0
            return Status::InternalError(
436
0
                    "missing delete field ids when reading equality delete file");
437
0
        }
438
0
        auto& read_column_field_ids = delete_file.field_ids;
439
0
        std::set<int> read_column_field_ids_set;
440
0
        for (const auto& field_id : read_column_field_ids) {
441
0
            read_column_field_ids_set.insert(field_id);
442
0
            _equality_delete_col_ids.insert(field_id);
443
0
        }
444
445
0
        std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc);
446
0
        RETURN_IF_ERROR(delete_reader->init_schema_reader());
447
448
0
        std::vector<std::string> equality_delete_col_names;
449
0
        std::vector<DataTypePtr> equality_delete_col_types;
450
451
        // Build delete col names/types/ids by matching field_ids from delete file schema.
452
        // Master iterates delete file's FieldDescriptor and uses field_id to match,
453
        // NOT idx-based pairing (get_parsed_schema order != field_ids order).
454
0
        std::vector<std::string> delete_col_names;
455
0
        std::vector<DataTypePtr> delete_col_types;
456
0
        std::vector<int> delete_col_ids;
457
0
        std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx;
458
459
0
        if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) {
460
0
            const FieldDescriptor* delete_field_desc = nullptr;
461
0
            RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&delete_field_desc));
462
0
            DCHECK(delete_field_desc != nullptr);
463
464
0
            for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) {
465
0
                if (delete_file_field.field_id == -1) [[unlikely]] {
466
0
                    return Status::DataQualityError(
467
0
                            "missing field id when reading equality delete file");
468
0
                }
469
0
                if (!read_column_field_ids_set.contains(delete_file_field.field_id)) {
470
0
                    continue;
471
0
                }
472
0
                if (delete_file_field.children.size() > 0) [[unlikely]] {
473
0
                    return Status::InternalError(
474
0
                            "can not support read complex column in equality delete file");
475
0
                }
476
477
0
                delete_col_ids.emplace_back(delete_file_field.field_id);
478
0
                delete_col_names.emplace_back(delete_file_field.name);
479
0
                delete_col_types.emplace_back(make_nullable(delete_file_field.data_type));
480
481
0
                int field_id = delete_file_field.field_id;
482
0
                if (!_id_to_block_column_name.contains(field_id)) {
483
0
                    _id_to_block_column_name.emplace(field_id, delete_file_field.name);
484
0
                    _expand_col_names.emplace_back(delete_file_field.name);
485
0
                    _expand_columns.emplace_back(
486
0
                            make_nullable(delete_file_field.data_type)->create_column(),
487
0
                            make_nullable(delete_file_field.data_type), delete_file_field.name);
488
0
                }
489
0
            }
490
0
            for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
491
0
                delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
492
0
            }
493
            // Delete files have TFileRangeDesc.size=-1, which would cause
494
            // set_fill_columns to return EndOfFile("No row group to read")
495
            // when _filter_groups is true. Master passes filter_groups=false.
496
0
            ParquetInitContext eq_delete_ctx;
497
0
            eq_delete_ctx.filter_groups = false;
498
0
            eq_delete_ctx.column_names = delete_col_names;
499
0
            eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx;
500
0
            auto st2 = parquet_reader->init_reader(&eq_delete_ctx);
501
0
            if (!st2.ok()) {
502
0
                return st2;
503
0
            }
504
0
        } else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) {
505
            // For ORC: use get_parsed_schema with field_ids from delete_file
506
            // ORC field_ids come from the Thrift descriptor, not from ORC metadata
507
0
            RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names,
508
0
                                                             &equality_delete_col_types));
509
0
            for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) {
510
0
                if (idx < read_column_field_ids.size()) {
511
0
                    int field_id = read_column_field_ids[idx];
512
0
                    if (!read_column_field_ids_set.contains(field_id)) continue;
513
0
                    delete_col_ids.emplace_back(field_id);
514
0
                    delete_col_names.emplace_back(equality_delete_col_names[idx]);
515
0
                    delete_col_types.emplace_back(make_nullable(equality_delete_col_types[idx]));
516
0
                    if (!_id_to_block_column_name.contains(field_id)) {
517
0
                        _id_to_block_column_name.emplace(field_id, equality_delete_col_names[idx]);
518
0
                        _expand_col_names.emplace_back(equality_delete_col_names[idx]);
519
0
                        _expand_columns.emplace_back(
520
0
                                make_nullable(equality_delete_col_types[idx])->create_column(),
521
0
                                make_nullable(equality_delete_col_types[idx]),
522
0
                                equality_delete_col_names[idx]);
523
0
                    }
524
0
                }
525
0
            }
526
0
            for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) {
527
0
                delete_col_name_to_block_idx[delete_col_names[idx]] = idx;
528
0
            }
529
0
            OrcInitContext eq_delete_ctx;
530
0
            eq_delete_ctx.column_names = delete_col_names;
531
0
            eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx;
532
0
            RETURN_IF_ERROR(orc_reader->init_reader(&eq_delete_ctx));
533
0
        } else {
534
0
            return Status::InternalError("Unsupported format of delete file");
535
0
        }
536
537
0
        if (!_equality_delete_block_map.contains(delete_col_ids)) {
538
0
            _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size());
539
0
            Block block;
540
0
            _generate_equality_delete_block(&block, delete_col_names, delete_col_types);
541
0
            _equality_delete_blocks.emplace_back(block);
542
0
        }
543
0
        Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]];
544
545
0
        bool eof = false;
546
0
        while (!eof) {
547
0
            Block tmp_block;
548
0
            _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types);
549
0
            size_t read_rows = 0;
550
0
            auto st = delete_reader->get_next_block(&tmp_block, &read_rows, &eof);
551
0
            if (!st.ok()) {
552
0
                return st;
553
0
            }
554
0
            if (read_rows > 0) {
555
0
                MutableBlock mutable_block(&eq_file_block);
556
0
                RETURN_IF_ERROR(mutable_block.merge(tmp_block));
557
0
            }
558
0
        }
559
0
    }
560
561
0
    for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) {
562
0
        auto& eq_file_block = _equality_delete_blocks[block_idx];
563
0
        auto equality_delete_impl =
564
0
                EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids);
565
0
        RETURN_IF_ERROR(equality_delete_impl->init(this->get_profile()));
566
0
        _equality_delete_impls.emplace_back(std::move(equality_delete_impl));
567
0
    }
568
0
    return Status::OK();
569
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE
570
571
template <typename BaseReader>
572
void IcebergReaderMixin<BaseReader>::_generate_equality_delete_block(
573
        Block* block, const std::vector<std::string>& equality_delete_col_names,
574
0
        const std::vector<DataTypePtr>& equality_delete_col_types) {
575
0
    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
576
0
        DataTypePtr data_type = make_nullable(equality_delete_col_types[i]);
577
0
        MutableColumnPtr data_column = data_type->create_column();
578
0
        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
579
0
                                            equality_delete_col_names[i]));
580
0
    }
581
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE
582
583
template <typename BaseReader>
584
2
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
585
2
    std::set<std::string> names;
586
2
    auto block_names = block->get_names();
587
2
    names.insert(block_names.begin(), block_names.end());
588
2
    for (auto& col : _expand_columns) {
589
0
        col.column->assume_mutable()->clear();
590
0
        if (names.contains(col.name)) {
591
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
592
0
        }
593
0
        names.insert(col.name);
594
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
595
0
        block->insert(col);
596
0
    }
597
2
    return Status::OK();
598
2
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_expand_block_if_needEPNS_5BlockE
Line
Count
Source
584
1
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
585
1
    std::set<std::string> names;
586
1
    auto block_names = block->get_names();
587
1
    names.insert(block_names.begin(), block_names.end());
588
1
    for (auto& col : _expand_columns) {
589
0
        col.column->assume_mutable()->clear();
590
0
        if (names.contains(col.name)) {
591
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
592
0
        }
593
0
        names.insert(col.name);
594
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
595
0
        block->insert(col);
596
0
    }
597
1
    return Status::OK();
598
1
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_expand_block_if_needEPNS_5BlockE
Line
Count
Source
584
1
Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) {
585
1
    std::set<std::string> names;
586
1
    auto block_names = block->get_names();
587
1
    names.insert(block_names.begin(), block_names.end());
588
1
    for (auto& col : _expand_columns) {
589
0
        col.column->assume_mutable()->clear();
590
0
        if (names.contains(col.name)) {
591
0
            return Status::InternalError("Wrong expand column '{}'", col.name);
592
0
        }
593
0
        names.insert(col.name);
594
0
        (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns());
595
0
        block->insert(col);
596
0
    }
597
1
    return Status::OK();
598
1
}
599
600
template <typename BaseReader>
601
2
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
602
2
    std::set<size_t> positions_to_erase;
603
2
    for (const std::string& expand_col : _expand_col_names) {
604
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
605
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
606
0
                                         block->dump_names());
607
0
        }
608
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
609
0
    }
610
2
    block->erase(positions_to_erase);
611
2
    for (const std::string& expand_col : _expand_col_names) {
612
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
613
0
    }
614
2
    return Status::OK();
615
2
}
_ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_shrink_block_if_needEPNS_5BlockE
Line
Count
Source
601
1
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
602
1
    std::set<size_t> positions_to_erase;
603
1
    for (const std::string& expand_col : _expand_col_names) {
604
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
605
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
606
0
                                         block->dump_names());
607
0
        }
608
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
609
0
    }
610
1
    block->erase(positions_to_erase);
611
1
    for (const std::string& expand_col : _expand_col_names) {
612
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
613
0
    }
614
1
    return Status::OK();
615
1
}
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_shrink_block_if_needEPNS_5BlockE
Line
Count
Source
601
1
Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) {
602
1
    std::set<size_t> positions_to_erase;
603
1
    for (const std::string& expand_col : _expand_col_names) {
604
0
        if (!this->col_name_to_block_idx_ref()->contains(expand_col)) {
605
0
            return Status::InternalError("Wrong erase column '{}', block: {}", expand_col,
606
0
                                         block->dump_names());
607
0
        }
608
0
        positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]);
609
0
    }
610
1
    block->erase(positions_to_erase);
611
1
    for (const std::string& expand_col : _expand_col_names) {
612
0
        this->col_name_to_block_idx_ref()->erase(expand_col);
613
0
    }
614
1
    return Status::OK();
615
1
}
616
617
template <typename BaseReader>
618
Status IcebergReaderMixin<BaseReader>::_position_delete_base(
619
0
        const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) {
620
0
    std::vector<DeleteRows*> delete_rows_array;
621
0
    int64_t num_delete_rows = 0;
622
0
    for (const auto& delete_file : delete_files) {
623
0
        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
624
0
        Status create_status = Status::OK();
625
0
        auto* delete_file_cache = _kv_cache->template get<DeleteFile>(
626
0
                _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* {
627
0
                    auto* position_delete = new DeleteFile;
628
0
                    TFileRangeDesc delete_file_range;
629
0
                    delete_file_range.__set_fs_name(this->get_scan_range().fs_name);
630
0
                    delete_file_range.path = delete_file.path;
631
0
                    delete_file_range.start_offset = 0;
632
0
                    delete_file_range.size = -1;
633
0
                    delete_file_range.file_size = -1;
634
0
                    create_status = _read_position_delete_file(&delete_file_range, position_delete);
635
0
                    if (!create_status) {
636
0
                        return nullptr;
637
0
                    }
638
0
                    return position_delete;
639
0
                });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev
640
0
        if (create_status.is<ErrorCode::END_OF_FILE>()) {
641
0
            continue;
642
0
        } else if (!create_status.ok()) {
643
0
            return create_status;
644
0
        }
645
646
0
        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
647
0
        auto get_value = [&](const auto& v) {
648
0
            DeleteRows* row_ids = v.second.get();
649
0
            if (!row_ids->empty()) {
650
0
                delete_rows_array.emplace_back(row_ids);
651
0
                num_delete_rows += row_ids->size();
652
0
            }
653
0
        };
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_
654
0
        delete_file_map.if_contains(data_file_path, get_value);
655
0
    }
656
0
    if (num_delete_rows > 0) {
657
0
        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
658
0
        _iceberg_delete_rows =
659
0
                _kv_cache->template get<DeleteRows>(data_file_path, [&]() -> DeleteRows* {
660
0
                    auto* data_file_position_delete = new DeleteRows;
661
0
                    _sort_delete_rows(delete_rows_array, num_delete_rows,
662
0
                                      *data_file_position_delete);
663
0
                    return data_file_position_delete;
664
0
                });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv
665
0
        set_delete_rows();
666
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
667
0
    }
668
0
    return Status::OK();
669
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE
670
671
template <typename BaseReader>
672
typename IcebergReaderMixin<BaseReader>::PositionDeleteRange
673
0
IcebergReaderMixin<BaseReader>::_get_range(const ColumnDictI32& file_path_column) {
674
0
    PositionDeleteRange range;
675
0
    size_t read_rows = file_path_column.get_data().size();
676
0
    const int* code_path = file_path_column.get_data().data();
677
0
    const int* code_path_start = code_path;
678
0
    const int* code_path_end = code_path + read_rows;
679
0
    while (code_path < code_path_end) {
680
0
        int code = code_path[0];
681
0
        const int* code_end = std::upper_bound(code_path, code_path_end, code);
682
0
        range.data_file_path.emplace_back(file_path_column.get_value(code).to_string());
683
0
        range.range.emplace_back(code_path - code_path_start, code_end - code_path_start);
684
0
        code_path = code_end;
685
0
    }
686
0
    return range;
687
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_13ColumnDictI32E
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_13ColumnDictI32E
688
689
template <typename BaseReader>
690
typename IcebergReaderMixin<BaseReader>::PositionDeleteRange
691
0
IcebergReaderMixin<BaseReader>::_get_range(const ColumnString& file_path_column) {
692
0
    PositionDeleteRange range;
693
0
    size_t read_rows = file_path_column.size();
694
0
    size_t index = 0;
695
0
    while (index < read_rows) {
696
0
        StringRef data_path = file_path_column.get_data_at(index);
697
0
        size_t left = index - 1;
698
0
        size_t right = read_rows;
699
0
        while (left + 1 != right) {
700
0
            size_t mid = left + (right - left) / 2;
701
0
            if (file_path_column.get_data_at(mid) > data_path) {
702
0
                right = mid;
703
0
            } else {
704
0
                left = mid;
705
0
            }
706
0
        }
707
0
        range.data_file_path.emplace_back(data_path.to_string());
708
0
        range.range.emplace_back(index, left + 1);
709
0
        index = left + 1;
710
0
    }
711
0
    return range;
712
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_9ColumnStrIjEE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_9ColumnStrIjEE
713
714
template <typename BaseReader>
715
void IcebergReaderMixin<BaseReader>::_sort_delete_rows(
716
        const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows,
717
0
        std::vector<int64_t>& result) {
718
0
    if (delete_rows_array.empty()) {
719
0
        return;
720
0
    }
721
0
    if (delete_rows_array.size() == 1) {
722
0
        result.resize(num_delete_rows);
723
0
        memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows);
724
0
        return;
725
0
    }
726
0
    if (delete_rows_array.size() == 2) {
727
0
        result.resize(num_delete_rows);
728
0
        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
729
0
                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
730
0
                   result.begin());
731
0
        return;
732
0
    }
733
734
0
    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
735
0
    result.resize(num_delete_rows);
736
0
    auto row_id_iter = result.begin();
737
0
    auto iter_end = result.end();
738
0
    std::vector<vec_pair> rows_array;
739
0
    for (auto* rows : delete_rows_array) {
740
0
        if (!rows->empty()) {
741
0
            rows_array.emplace_back(rows->begin(), rows->end());
742
0
        }
743
0
    }
744
0
    size_t array_size = rows_array.size();
745
0
    while (row_id_iter != iter_end) {
746
0
        int64_t min_index = 0;
747
0
        int64_t min = *rows_array[0].first;
748
0
        for (size_t i = 0; i < array_size; ++i) {
749
0
            if (*rows_array[i].first < min) {
750
0
                min_index = i;
751
0
                min = *rows_array[i].first;
752
0
            }
753
0
        }
754
0
        *row_id_iter++ = min;
755
0
        rows_array[min_index].first++;
756
0
        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
757
0
            rows_array.erase(rows_array.begin() + min_index);
758
0
            array_size--;
759
0
        }
760
0
    }
761
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_
762
763
template <typename BaseReader>
764
void IcebergReaderMixin<BaseReader>::_gen_position_delete_file_range(
765
        Block& block, DeleteFile* position_delete, size_t read_rows,
766
0
        bool file_path_column_dictionary_coded) {
767
0
    SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
768
0
    auto name_to_pos_map = block.get_name_to_pos_map();
769
0
    ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
770
0
    DCHECK_EQ(path_column->size(), read_rows);
771
0
    ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column;
772
0
    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
773
0
    const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data();
774
0
    PositionDeleteRange range;
775
0
    if (file_path_column_dictionary_coded) {
776
0
        range = _get_range(assert_cast<const ColumnDictI32&>(*path_column));
777
0
    } else {
778
0
        range = _get_range(assert_cast<const ColumnString&>(*path_column));
779
0
    }
780
0
    for (int i = 0; i < range.range.size(); ++i) {
781
0
        std::string key = range.data_file_path[i];
782
0
        auto iter = position_delete->find(key);
783
0
        DeleteRows* delete_rows;
784
0
        if (iter == position_delete->end()) {
785
0
            delete_rows = new DeleteRows;
786
0
            std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows);
787
0
            (*position_delete)[key] = std::move(delete_rows_ptr);
788
0
        } else {
789
0
            delete_rows = iter->second.get();
790
0
        }
791
0
        const int64_t* cpy_start = src_data + range.range[i].first;
792
0
        const int64_t cpy_count = range.range[i].second - range.range[i].first;
793
0
        int64_t origin_size = delete_rows->size();
794
0
        delete_rows->resize(origin_size + cpy_count);
795
0
        int64_t* dest_position = &(*delete_rows)[origin_size];
796
0
        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
797
0
    }
798
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb
799
800
template <typename BaseReader>
801
Status IcebergReaderMixin<BaseReader>::read_deletion_vector(
802
0
        const std::string& data_file_path, const TIcebergDeleteFileDesc& delete_file_desc) {
803
0
    Status create_status = Status::OK();
804
0
    SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
805
0
    _iceberg_delete_rows = _kv_cache->template get<
806
0
            DeleteRows>(data_file_path, [&]() -> DeleteRows* {
807
0
        auto* delete_rows = new DeleteRows;
808
809
0
        TFileRangeDesc delete_range;
810
0
        delete_range.__set_fs_name(this->get_scan_range().fs_name);
811
0
        delete_range.path = delete_file_desc.path;
812
0
        delete_range.start_offset = delete_file_desc.content_offset;
813
0
        delete_range.size = delete_file_desc.content_size_in_bytes;
814
0
        delete_range.file_size = -1;
815
816
0
        DeletionVectorReader dv_reader(this->get_state(), this->get_profile(),
817
0
                                       this->get_scan_params(), delete_range, this->get_io_ctx());
818
0
        create_status = dv_reader.open();
819
0
        if (!create_status.ok()) [[unlikely]] {
820
0
            return nullptr;
821
0
        }
822
823
0
        size_t buffer_size = delete_range.size;
824
0
        std::vector<char> buf(buffer_size);
825
0
        if (buffer_size < 12) [[unlikely]] {
826
0
            create_status = Status::DataQualityError("Deletion vector file size too small: {}",
827
0
                                                     buffer_size);
828
0
            return nullptr;
829
0
        }
830
831
0
        create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size});
832
0
        if (!create_status) [[unlikely]] {
833
0
            return nullptr;
834
0
        }
835
836
0
        auto total_length = BigEndian::Load32(buf.data());
837
0
        if (total_length + 8 != buffer_size) [[unlikely]] {
838
0
            create_status = Status::DataQualityError(
839
0
                    "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8,
840
0
                    buffer_size);
841
0
            return nullptr;
842
0
        }
843
844
0
        constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
845
0
        if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] {
846
0
            create_status = Status::DataQualityError("Deletion vector magic number mismatch");
847
0
            return nullptr;
848
0
        }
849
850
0
        roaring::Roaring64Map bitmap;
851
0
        SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
852
0
        try {
853
0
            bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12);
854
0
        } catch (const std::runtime_error& e) {
855
0
            create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
856
0
            return nullptr;
857
0
        }
858
859
0
        delete_rows->reserve(bitmap.cardinality());
860
0
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
861
0
            delete_rows->push_back(*it);
862
0
        }
863
0
        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
864
0
        return delete_rows;
865
0
    });
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv
866
867
0
    RETURN_IF_ERROR(create_status);
868
0
    if (!_iceberg_delete_rows->empty()) [[likely]] {
869
0
        set_delete_rows();
870
0
    }
871
0
    return Status::OK();
872
0
}
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE
Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE
873
874
} // namespace doris