be/src/format/table/iceberg_reader_mixin.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <cstdint> |
22 | | #include <string> |
23 | | #include <unordered_map> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/consts.h" |
27 | | #include "common/status.h" |
28 | | #include "core/block/block.h" |
29 | | #include "core/column/column_dictionary.h" |
30 | | #include "core/column/column_nullable.h" |
31 | | #include "core/column/column_string.h" |
32 | | #include "core/column/column_struct.h" |
33 | | #include "core/data_type/data_type_number.h" |
34 | | #include "core/data_type/data_type_string.h" |
35 | | #include "format/generic_reader.h" |
36 | | #include "format/table/deletion_vector_reader.h" |
37 | | #include "format/table/equality_delete.h" |
38 | | #include "format/table/table_schema_change_helper.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "runtime/runtime_state.h" |
41 | | #include "storage/olap_common.h" |
42 | | |
43 | | namespace doris { |
44 | | class TIcebergDeleteFileDesc; |
45 | | } // namespace doris |
46 | | |
47 | | namespace doris { |
48 | | |
49 | | class ShardedKVCache; |
50 | | |
51 | | // CRTP mixin for Iceberg reader functionality. |
52 | | // BaseReader should be ParquetReader or OrcReader. |
53 | | // Inherits BaseReader + TableSchemaChangeHelper, providing shared Iceberg logic |
54 | | // (delete files, deletion vectors, equality delete, $row_id synthesis). |
55 | | // |
56 | | // Inheritance chain: |
57 | | // IcebergParquetReader -> IcebergReaderMixin<ParquetReader> -> ParquetReader -> GenericReader |
58 | | // IcebergOrcReader -> IcebergReaderMixin<OrcReader> -> OrcReader -> GenericReader |
59 | | template <typename BaseReader> |
60 | | class IcebergReaderMixin : public BaseReader, public TableSchemaChangeHelper { |
61 | | public: |
62 | | struct PositionDeleteRange { |
63 | | std::vector<std::string> data_file_path; |
64 | | std::vector<std::pair<int, int>> range; |
65 | | }; |
66 | | |
67 | | // Forward BaseReader constructor arguments + Iceberg-specific kv_cache |
68 | | template <typename... Args> |
69 | | IcebergReaderMixin(ShardedKVCache* kv_cache, Args&&... args) |
70 | 14 | : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) { |
71 | 14 | static const char* iceberg_profile = "IcebergProfile"; |
72 | 14 | ADD_TIMER(this->get_profile(), iceberg_profile); |
73 | 14 | _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles", |
74 | 14 | TUnit::UNIT, iceberg_profile); |
75 | 14 | _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows", |
76 | 14 | TUnit::UNIT, iceberg_profile); |
77 | 14 | _iceberg_profile.delete_files_read_time = |
78 | 14 | ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile); |
79 | 14 | _iceberg_profile.delete_rows_sort_time = |
80 | 14 | ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile); |
81 | 14 | _iceberg_profile.parse_delete_file_time = |
82 | 14 | ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile); |
83 | 14 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEEC2IJRPNS_14RuntimeProfileERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRPKN4cctz9time_zoneERPNS_2io9IOContextERPNS_12RuntimeStateERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_ Line | Count | Source | 70 | 7 | : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) { | 71 | 7 | static const char* iceberg_profile = "IcebergProfile"; | 72 | 7 | ADD_TIMER(this->get_profile(), iceberg_profile); | 73 | 7 | _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles", | 74 | 7 | TUnit::UNIT, iceberg_profile); | 75 | 7 | _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows", | 76 | 7 | TUnit::UNIT, iceberg_profile); | 77 | 7 | _iceberg_profile.delete_files_read_time = | 78 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile); | 79 | 7 | _iceberg_profile.delete_rows_sort_time = | 80 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile); | 81 | 7 | _iceberg_profile.parse_delete_file_time = | 82 | 7 | ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile); | 83 | 7 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEEC2IJRPNS_14RuntimeProfileERPNS_12RuntimeStateERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERPNS_2io9IOContextERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_ Line | Count | Source | 70 | 7 | : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) { | 71 | 7 | static const char* iceberg_profile = "IcebergProfile"; | 72 | 7 | ADD_TIMER(this->get_profile(), iceberg_profile); | 73 | 7 | _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles", | 74 | 7 | TUnit::UNIT, iceberg_profile); | 75 | 7 | _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows", | 76 | 7 | TUnit::UNIT, iceberg_profile); | 77 | 7 | _iceberg_profile.delete_files_read_time = | 78 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile); | 79 | 7 | _iceberg_profile.delete_rows_sort_time = | 80 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile); | 81 | 7 | _iceberg_profile.parse_delete_file_time = | 82 | 7 | ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile); | 83 | 7 | } |
|
84 | | |
85 | 14 | ~IcebergReaderMixin() override = default; _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEED2Ev Line | Count | Source | 85 | 7 | ~IcebergReaderMixin() override = default; |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEED2Ev Line | Count | Source | 85 | 7 | ~IcebergReaderMixin() override = default; |
|
86 | | |
87 | | void set_current_file_info(const std::string& file_path, int32_t partition_spec_id, |
88 | 0 | const std::string& partition_data_json) { |
89 | 0 | _current_file_path = file_path; |
90 | 0 | _partition_spec_id = partition_spec_id; |
91 | 0 | _partition_data_json = partition_data_json; |
92 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_ Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_ |
93 | | |
94 | | enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR }; |
95 | | enum Fileformat { NONE, PARQUET, ORC, AVRO }; |
96 | | |
97 | | virtual void set_delete_rows() = 0; |
98 | | |
99 | | // Table-level COUNT(*) is handled by CountReader (created by FileScanner after |
100 | | // init_reader). If _do_get_next_block is called, COUNT must have been resolved. |
101 | 2 | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override { |
102 | 2 | DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT); |
103 | 2 | return BaseReader::_do_get_next_block(block, read_rows, eof); |
104 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE18_do_get_next_blockEPNS_5BlockEPmPb Line | Count | Source | 101 | 1 | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override { | 102 | | DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT); | 103 | 1 | return BaseReader::_do_get_next_block(block, read_rows, eof); | 104 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE18_do_get_next_blockEPNS_5BlockEPmPb Line | Count | Source | 101 | 1 | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override { | 102 | | DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT); | 103 | 1 | return BaseReader::_do_get_next_block(block, read_rows, eof); | 104 | 1 | } |
|
105 | | |
106 | | void set_create_row_id_column_iterator_func( |
107 | 0 | std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> create_func) { |
108 | 0 | _create_topn_row_id_column_iterator = create_func; |
109 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE |
110 | | |
111 | | protected: |
112 | | // ---- Hook implementations ---- |
113 | | |
114 | | // Called before reading a block: expand block for equality delete columns + detect row_id |
115 | 2 | Status on_before_read_block(Block* block) override { |
116 | 2 | RETURN_IF_ERROR(_expand_block_if_need(block)); |
117 | 2 | return Status::OK(); |
118 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20on_before_read_blockEPNS_5BlockE Line | Count | Source | 115 | 1 | Status on_before_read_block(Block* block) override { | 116 | 1 | RETURN_IF_ERROR(_expand_block_if_need(block)); | 117 | 1 | return Status::OK(); | 118 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20on_before_read_blockEPNS_5BlockE Line | Count | Source | 115 | 1 | Status on_before_read_block(Block* block) override { | 116 | 1 | RETURN_IF_ERROR(_expand_block_if_need(block)); | 117 | 1 | return Status::OK(); | 118 | 1 | } |
|
119 | | |
120 | | /// Fill Iceberg $row_id synthesized column. Registered as handler during init. |
121 | 0 | Status _fill_iceberg_row_id(Block* block, size_t rows) { |
122 | 0 | int row_id_pos = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL); |
123 | 0 | DORIS_CHECK(row_id_pos >= 0); |
124 | | |
125 | | // Lazy-init file info: only set when $row_id is actually needed. |
126 | 0 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
127 | 0 | std::string file_path = table_desc.original_file_path; |
128 | 0 | int32_t partition_spec_id = |
129 | 0 | table_desc.__isset.partition_spec_id ? table_desc.partition_spec_id : 0; |
130 | 0 | std::string partition_data_json; |
131 | 0 | if (table_desc.__isset.partition_data_json) { |
132 | 0 | partition_data_json = table_desc.partition_data_json; |
133 | 0 | } |
134 | 0 | set_current_file_info(file_path, partition_spec_id, partition_data_json); |
135 | |
|
136 | 0 | const auto& row_ids = this->current_batch_row_positions(); |
137 | 0 | auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_pos)); |
138 | 0 | MutableColumnPtr row_id_column; |
139 | 0 | RETURN_IF_ERROR(_build_iceberg_rowid_column(col_with_type.type, _current_file_path, row_ids, |
140 | 0 | _partition_spec_id, _partition_data_json, |
141 | 0 | &row_id_column)); |
142 | 0 | col_with_type.column = std::move(row_id_column); |
143 | 0 | return Status::OK(); |
144 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20_fill_iceberg_row_idEPNS_5BlockEm Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20_fill_iceberg_row_idEPNS_5BlockEm |
145 | | |
146 | 0 | void _init_row_lineage_columns() { |
147 | 0 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
148 | 0 | if (table_desc.__isset.first_row_id) { |
149 | 0 | _row_lineage_columns.first_row_id = table_desc.first_row_id; |
150 | 0 | } |
151 | 0 | if (table_desc.__isset.last_updated_sequence_number) { |
152 | 0 | _row_lineage_columns.last_updated_sequence_number = |
153 | 0 | table_desc.last_updated_sequence_number; |
154 | 0 | } |
155 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE25_init_row_lineage_columnsEv Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE25_init_row_lineage_columnsEv |
156 | | |
157 | 0 | Status _fill_row_lineage_row_id(Block* block, size_t rows) { |
158 | 0 | int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID); |
159 | 0 | DORIS_CHECK(col_pos >= 0); |
160 | | |
161 | 0 | if (_row_lineage_columns.first_row_id >= 0) { |
162 | 0 | auto column_guard = block->mutate_column_scoped(col_pos); |
163 | 0 | auto* nullable_column = |
164 | 0 | assert_cast<ColumnNullable*>(column_guard.mutable_column().get()); |
165 | 0 | auto& null_map = nullable_column->get_null_map_data(); |
166 | 0 | auto& data = |
167 | 0 | assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
168 | 0 | const auto& row_ids = this->current_batch_row_positions(); |
169 | 0 | for (size_t i = 0; i < rows; ++i) { |
170 | 0 | if (null_map[i] != 0) { |
171 | 0 | null_map[i] = 0; |
172 | 0 | data[i] = _row_lineage_columns.first_row_id + static_cast<int64_t>(row_ids[i]); |
173 | 0 | } |
174 | 0 | } |
175 | 0 | } |
176 | 0 | return Status::OK(); |
177 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm |
178 | | |
179 | 0 | Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t rows) { |
180 | 0 | int col_pos = block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER); |
181 | 0 | DORIS_CHECK(col_pos >= 0); |
182 | | |
183 | 0 | if (_row_lineage_columns.last_updated_sequence_number >= 0) { |
184 | 0 | auto column_guard = block->mutate_column_scoped(col_pos); |
185 | 0 | auto* nullable_column = |
186 | 0 | assert_cast<ColumnNullable*>(column_guard.mutable_column().get()); |
187 | 0 | auto& null_map = nullable_column->get_null_map_data(); |
188 | 0 | auto& data = |
189 | 0 | assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
190 | 0 | for (size_t i = 0; i < rows; ++i) { |
191 | 0 | if (null_map[i] != 0) { |
192 | 0 | null_map[i] = 0; |
193 | 0 | data[i] = _row_lineage_columns.last_updated_sequence_number; |
194 | 0 | } |
195 | 0 | } |
196 | 0 | } |
197 | 0 | return Status::OK(); |
198 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm |
199 | | |
200 | | // Called after reading a block: apply equality delete filter + shrink block |
201 | 2 | Status on_after_read_block(Block* block, size_t* read_rows) override { |
202 | 2 | if (!_equality_delete_impls.empty()) { |
203 | 0 | std::unique_ptr<IColumn::Filter> filter = |
204 | 0 | std::make_unique<IColumn::Filter>(block->rows(), 1); |
205 | 0 | for (auto& equality_delete_impl : _equality_delete_impls) { |
206 | 0 | RETURN_IF_ERROR(equality_delete_impl->filter_data_block( |
207 | 0 | block, this->col_name_to_block_idx_ref(), _id_to_block_column_name, |
208 | 0 | *filter)); |
209 | 0 | } |
210 | 0 | Block::filter_block_internal(block, *filter, block->columns()); |
211 | 0 | *read_rows = block->rows(); |
212 | 0 | } |
213 | 2 | return _shrink_block_if_need(block); |
214 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE19on_after_read_blockEPNS_5BlockEPm Line | Count | Source | 201 | 1 | Status on_after_read_block(Block* block, size_t* read_rows) override { | 202 | 1 | if (!_equality_delete_impls.empty()) { | 203 | 0 | std::unique_ptr<IColumn::Filter> filter = | 204 | 0 | std::make_unique<IColumn::Filter>(block->rows(), 1); | 205 | 0 | for (auto& equality_delete_impl : _equality_delete_impls) { | 206 | 0 | RETURN_IF_ERROR(equality_delete_impl->filter_data_block( | 207 | 0 | block, this->col_name_to_block_idx_ref(), _id_to_block_column_name, | 208 | 0 | *filter)); | 209 | 0 | } | 210 | 0 | Block::filter_block_internal(block, *filter, block->columns()); | 211 | 0 | *read_rows = block->rows(); | 212 | 0 | } | 213 | 1 | return _shrink_block_if_need(block); | 214 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE19on_after_read_blockEPNS_5BlockEPm Line | Count | Source | 201 | 1 | Status on_after_read_block(Block* block, size_t* read_rows) override { | 202 | 1 | if (!_equality_delete_impls.empty()) { | 203 | 0 | std::unique_ptr<IColumn::Filter> filter = | 204 | 0 | std::make_unique<IColumn::Filter>(block->rows(), 1); | 205 | 0 | for (auto& equality_delete_impl : _equality_delete_impls) { | 206 | 0 | RETURN_IF_ERROR(equality_delete_impl->filter_data_block( | 207 | 0 | block, this->col_name_to_block_idx_ref(), _id_to_block_column_name, | 208 | 0 | *filter)); | 209 | 0 | } | 210 | 0 | Block::filter_block_internal(block, *filter, block->columns()); | 211 | 0 | *read_rows = block->rows(); | 212 | 0 | } | 213 | 1 | return _shrink_block_if_need(block); | 214 | 1 | } |
|
215 | | |
216 | | // ---- Shared Iceberg methods ---- |
217 | | |
218 | | Status _init_row_filters(); |
219 | | Status _position_delete_base(const std::string data_file_path, |
220 | | const std::vector<TIcebergDeleteFileDesc>& delete_files); |
221 | | Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files); |
222 | | Status read_deletion_vector(const std::string& data_file_path, |
223 | | const TIcebergDeleteFileDesc& delete_file_desc); |
224 | | |
225 | | Status _expand_block_if_need(Block* block); |
226 | | Status _shrink_block_if_need(Block* block); |
227 | | |
228 | | // Type aliases — must be defined before member function declarations that use them. |
229 | | using DeleteRows = std::vector<int64_t>; |
230 | | using DeleteFile = phmap::parallel_flat_hash_map< |
231 | | std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>, |
232 | | std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, |
233 | | std::mutex>; |
234 | | |
235 | | PositionDeleteRange _get_range(const ColumnDictI32& file_path_column); |
236 | | PositionDeleteRange _get_range(const ColumnString& file_path_column); |
237 | | static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array, |
238 | | int64_t num_delete_rows, std::vector<int64_t>& result); |
239 | | void _gen_position_delete_file_range(Block& block, DeleteFile* position_delete, |
240 | | size_t read_rows, bool file_path_column_dictionary_coded); |
241 | | void _generate_equality_delete_block(Block* block, |
242 | | const std::vector<std::string>& equality_delete_col_names, |
243 | | const std::vector<DataTypePtr>& equality_delete_col_types); |
244 | | |
245 | | // Pure virtual: format-specific delete file reading |
246 | | virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0; |
247 | | virtual std::unique_ptr<GenericReader> _create_equality_reader( |
248 | | const TFileRangeDesc& delete_desc) = 0; |
249 | | |
250 | 0 | static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; }Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
251 | | |
252 | | /// Build the Iceberg V2 row-id struct column. |
253 | | static Status _build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path, |
254 | | const std::vector<rowid_t>& row_ids, |
255 | | int32_t partition_spec_id, |
256 | | const std::string& partition_data_json, |
257 | 0 | MutableColumnPtr* column_out) { |
258 | 0 | if (type == nullptr || column_out == nullptr) { |
259 | 0 | return Status::InvalidArgument("Invalid iceberg rowid column type or output column"); |
260 | 0 | } |
261 | 0 | MutableColumnPtr column = type->create_column(); |
262 | 0 | ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get()); |
263 | 0 | ColumnStruct* struct_col = nullptr; |
264 | 0 | if (nullable_col != nullptr) { |
265 | 0 | struct_col = |
266 | 0 | check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get()); |
267 | 0 | } else { |
268 | 0 | struct_col = check_and_get_column<ColumnStruct>(column.get()); |
269 | 0 | } |
270 | 0 | if (struct_col == nullptr || struct_col->tuple_size() < 4) { |
271 | 0 | return Status::InternalError("Invalid iceberg rowid column structure"); |
272 | 0 | } |
273 | 0 | size_t num_rows = row_ids.size(); |
274 | 0 | auto& file_path_col = struct_col->get_column(0); |
275 | 0 | auto& row_pos_col = struct_col->get_column(1); |
276 | 0 | auto& spec_id_col = struct_col->get_column(2); |
277 | 0 | auto& partition_data_col = struct_col->get_column(3); |
278 | 0 | file_path_col.reserve(num_rows); |
279 | 0 | row_pos_col.reserve(num_rows); |
280 | 0 | spec_id_col.reserve(num_rows); |
281 | 0 | partition_data_col.reserve(num_rows); |
282 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
283 | 0 | file_path_col.insert_data(file_path.data(), file_path.size()); |
284 | 0 | } |
285 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
286 | 0 | int64_t row_pos = static_cast<int64_t>(row_ids[i]); |
287 | 0 | row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos)); |
288 | 0 | } |
289 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
290 | 0 | int32_t spec_id = partition_spec_id; |
291 | 0 | spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id)); |
292 | 0 | } |
293 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
294 | 0 | partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size()); |
295 | 0 | } |
296 | 0 | if (nullable_col != nullptr) { |
297 | 0 | nullable_col->get_null_map_data().resize_fill(num_rows, 0); |
298 | 0 | } |
299 | 0 | *column_out = std::move(column); |
300 | 0 | return Status::OK(); |
301 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE |
302 | | |
303 | | struct IcebergProfile { |
304 | | RuntimeProfile::Counter* num_delete_files; |
305 | | RuntimeProfile::Counter* num_delete_rows; |
306 | | RuntimeProfile::Counter* delete_files_read_time; |
307 | | RuntimeProfile::Counter* delete_rows_sort_time; |
308 | | RuntimeProfile::Counter* parse_delete_file_time; |
309 | | }; |
310 | | |
311 | | bool _need_row_id_column = false; |
312 | | std::string _current_file_path; |
313 | | int32_t _partition_spec_id = 0; |
314 | | std::string _partition_data_json; |
315 | | |
316 | | ShardedKVCache* _kv_cache; |
317 | | IcebergProfile _iceberg_profile; |
318 | | const std::vector<int64_t>* _iceberg_delete_rows = nullptr; |
319 | | std::vector<std::string> _expand_col_names; |
320 | | std::vector<ColumnWithTypeAndName> _expand_columns; |
321 | | std::vector<std::string> _all_required_col_names; |
322 | | Fileformat _file_format = Fileformat::NONE; |
323 | | |
324 | | const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; |
325 | | const std::string ICEBERG_FILE_PATH = "file_path"; |
326 | | const std::string ICEBERG_ROW_POS = "pos"; |
327 | | const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS}; |
328 | | const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = { |
329 | | {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}}; |
330 | | const int ICEBERG_FILE_PATH_INDEX = 0; |
331 | | const int ICEBERG_FILE_POS_INDEX = 1; |
332 | | const int READ_DELETE_FILE_BATCH_SIZE = 102400; |
333 | | |
334 | | // all ids that need read for eq delete (from all eq delete files) |
335 | | std::set<int> _equality_delete_col_ids; |
336 | | // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls |
337 | | std::map<std::vector<int>, int> _equality_delete_block_map; |
338 | | // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after |
339 | | // creating entries in _equality_delete_impls. |
340 | | std::vector<Block> _equality_delete_blocks; |
341 | | std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls; |
342 | | |
343 | | // id -> block column name |
344 | | std::unordered_map<int, std::string> _id_to_block_column_name; |
345 | | |
346 | | // File column names used during init |
347 | | std::vector<std::string> _file_col_names; |
348 | | |
349 | | std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> |
350 | | _create_topn_row_id_column_iterator; |
351 | | |
352 | | static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id"; |
353 | | static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = |
354 | | "_last_updated_sequence_number"; |
355 | | struct RowLineageColumns { |
356 | | int64_t first_row_id = -1; |
357 | | int64_t last_updated_sequence_number = -1; |
358 | | }; |
359 | | RowLineageColumns _row_lineage_columns; |
360 | | }; |
361 | | |
362 | | // ============================================================================ |
363 | | // Template method implementations (must be in header for templates) |
364 | | // ============================================================================ |
365 | | |
366 | | template <typename BaseReader> |
367 | 2 | Status IcebergReaderMixin<BaseReader>::_init_row_filters() { |
368 | | // COUNT(*) short-circuit |
369 | 2 | if (this->_push_down_agg_type == TPushAggOp::type::COUNT && |
370 | 2 | this->get_scan_range().table_format_params.__isset.table_level_row_count && |
371 | 2 | this->get_scan_range().table_format_params.table_level_row_count > 0) { |
372 | 0 | return Status::OK(); |
373 | 0 | } |
374 | | |
375 | 2 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
376 | 2 | const auto& version = table_desc.format_version; |
377 | 2 | if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { |
378 | 2 | return Status::OK(); |
379 | 2 | } |
380 | | |
381 | 0 | std::vector<TIcebergDeleteFileDesc> position_delete_files; |
382 | 0 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; |
383 | 0 | std::vector<TIcebergDeleteFileDesc> deletion_vector_files; |
384 | 0 | for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { |
385 | 0 | if (desc.content == POSITION_DELETE) { |
386 | 0 | position_delete_files.emplace_back(desc); |
387 | 0 | } else if (desc.content == EQUALITY_DELETE) { |
388 | 0 | equality_delete_files.emplace_back(desc); |
389 | 0 | } else if (desc.content == DELETION_VECTOR) { |
390 | 0 | deletion_vector_files.emplace_back(desc); |
391 | 0 | } |
392 | 0 | } |
393 | |
|
394 | 0 | if (!equality_delete_files.empty()) { |
395 | 0 | RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); |
396 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); |
397 | 0 | } |
398 | | |
399 | 0 | if (!deletion_vector_files.empty()) { |
400 | 0 | if (deletion_vector_files.size() != 1) [[unlikely]] { |
401 | | /* |
402 | | * Deletion vectors are a binary representation of deletes for a single data file that is more efficient |
403 | | * at execution time than position delete files. Unlike equality or position delete files, there can be |
404 | | * at most one deletion vector for a given data file in a snapshot. |
405 | | */ |
406 | 0 | return Status::DataQualityError("This iceberg data file has multiple DVs."); |
407 | 0 | } |
408 | 0 | RETURN_IF_ERROR( |
409 | 0 | read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); |
410 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); |
411 | 0 | } else if (!position_delete_files.empty()) { |
412 | 0 | RETURN_IF_ERROR( |
413 | 0 | _position_delete_base(table_desc.original_file_path, position_delete_files)); |
414 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); |
415 | 0 | } |
416 | | |
417 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); |
418 | 0 | return Status::OK(); |
419 | 0 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_init_row_filtersEv Line | Count | Source | 367 | 1 | Status IcebergReaderMixin<BaseReader>::_init_row_filters() { | 368 | | // COUNT(*) short-circuit | 369 | 1 | if (this->_push_down_agg_type == TPushAggOp::type::COUNT && | 370 | 1 | this->get_scan_range().table_format_params.__isset.table_level_row_count && | 371 | 1 | this->get_scan_range().table_format_params.table_level_row_count > 0) { | 372 | 0 | return Status::OK(); | 373 | 0 | } | 374 | | | 375 | 1 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; | 376 | 1 | const auto& version = table_desc.format_version; | 377 | 1 | if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { | 378 | 1 | return Status::OK(); | 379 | 1 | } | 380 | | | 381 | 0 | std::vector<TIcebergDeleteFileDesc> position_delete_files; | 382 | 0 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; | 383 | 0 | std::vector<TIcebergDeleteFileDesc> deletion_vector_files; | 384 | 0 | for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { | 385 | 0 | if (desc.content == POSITION_DELETE) { | 386 | 0 | position_delete_files.emplace_back(desc); | 387 | 0 | } else if (desc.content == EQUALITY_DELETE) { | 388 | 0 | equality_delete_files.emplace_back(desc); | 389 | 0 | } else if (desc.content == DELETION_VECTOR) { | 390 | 0 | deletion_vector_files.emplace_back(desc); | 391 | 0 | } | 392 | 0 | } | 393 | |
| 394 | 0 | if (!equality_delete_files.empty()) { | 395 | 0 | RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); | 396 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 397 | 0 | } | 398 | | | 399 | 0 | if (!deletion_vector_files.empty()) { | 400 | 0 | if (deletion_vector_files.size() != 1) [[unlikely]] { | 401 | | /* | 402 | | * Deletion vectors are a binary representation of deletes for a single data file that is more efficient | 403 | | * at execution time than position delete files. Unlike equality or position delete files, there can be | 404 | | * at most one deletion vector for a given data file in a snapshot. | 405 | | */ | 406 | 0 | return Status::DataQualityError("This iceberg data file has multiple DVs."); | 407 | 0 | } | 408 | 0 | RETURN_IF_ERROR( | 409 | 0 | read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); | 410 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 411 | 0 | } else if (!position_delete_files.empty()) { | 412 | 0 | RETURN_IF_ERROR( | 413 | 0 | _position_delete_base(table_desc.original_file_path, position_delete_files)); | 414 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 415 | 0 | } | 416 | | | 417 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); | 418 | 0 | return Status::OK(); | 419 | 0 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_init_row_filtersEv Line | Count | Source | 367 | 1 | Status IcebergReaderMixin<BaseReader>::_init_row_filters() { | 368 | | // COUNT(*) short-circuit | 369 | 1 | if (this->_push_down_agg_type == TPushAggOp::type::COUNT && | 370 | 1 | this->get_scan_range().table_format_params.__isset.table_level_row_count && | 371 | 1 | this->get_scan_range().table_format_params.table_level_row_count > 0) { | 372 | 0 | return Status::OK(); | 373 | 0 | } | 374 | | | 375 | 1 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; | 376 | 1 | const auto& version = table_desc.format_version; | 377 | 1 | if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { | 378 | 1 | return Status::OK(); | 379 | 1 | } | 380 | | | 381 | 0 | std::vector<TIcebergDeleteFileDesc> position_delete_files; | 382 | 0 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; | 383 | 0 | std::vector<TIcebergDeleteFileDesc> deletion_vector_files; | 384 | 0 | for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { | 385 | 0 | if (desc.content == POSITION_DELETE) { | 386 | 0 | position_delete_files.emplace_back(desc); | 387 | 0 | } else if (desc.content == EQUALITY_DELETE) { | 388 | 0 | equality_delete_files.emplace_back(desc); | 389 | 0 | } else if (desc.content == DELETION_VECTOR) { | 390 | 0 | deletion_vector_files.emplace_back(desc); | 391 | 0 | } | 392 | 0 | } | 393 | |
| 394 | 0 | if (!equality_delete_files.empty()) { | 395 | 0 | RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); | 396 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 397 | 0 | } | 398 | | | 399 | 0 | if (!deletion_vector_files.empty()) { | 400 | 0 | if (deletion_vector_files.size() != 1) [[unlikely]] { | 401 | | /* | 402 | | * Deletion vectors are a binary representation of deletes for a single data file that is more efficient | 403 | | * at execution time than position delete files. Unlike equality or position delete files, there can be | 404 | | * at most one deletion vector for a given data file in a snapshot. | 405 | | */ | 406 | 0 | return Status::DataQualityError("This iceberg data file has multiple DVs."); | 407 | 0 | } | 408 | 0 | RETURN_IF_ERROR( | 409 | 0 | read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); | 410 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 411 | 0 | } else if (!position_delete_files.empty()) { | 412 | 0 | RETURN_IF_ERROR( | 413 | 0 | _position_delete_base(table_desc.original_file_path, position_delete_files)); | 414 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 415 | 0 | } | 416 | | | 417 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); | 418 | 0 | return Status::OK(); | 419 | 0 | } |
|
420 | | |
421 | | template <typename BaseReader> |
422 | | Status IcebergReaderMixin<BaseReader>::_equality_delete_base( |
423 | 0 | const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
424 | 0 | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
425 | 0 | partition_columns; |
426 | 0 | std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
427 | |
|
428 | 0 | for (const auto& delete_file : delete_files) { |
429 | 0 | TFileRangeDesc delete_desc; |
430 | 0 | delete_desc.__set_fs_name(this->get_scan_range().fs_name); |
431 | 0 | delete_desc.path = delete_file.path; |
432 | 0 | delete_desc.start_offset = 0; |
433 | 0 | delete_desc.size = -1; |
434 | 0 | delete_desc.file_size = -1; |
435 | |
|
436 | 0 | if (!delete_file.__isset.field_ids) [[unlikely]] { |
437 | 0 | return Status::InternalError( |
438 | 0 | "missing delete field ids when reading equality delete file"); |
439 | 0 | } |
440 | 0 | auto& read_column_field_ids = delete_file.field_ids; |
441 | 0 | std::set<int> read_column_field_ids_set; |
442 | 0 | for (const auto& field_id : read_column_field_ids) { |
443 | 0 | read_column_field_ids_set.insert(field_id); |
444 | 0 | _equality_delete_col_ids.insert(field_id); |
445 | 0 | } |
446 | |
|
447 | 0 | std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc); |
448 | 0 | RETURN_IF_ERROR(delete_reader->init_schema_reader()); |
449 | | |
450 | 0 | std::vector<std::string> equality_delete_col_names; |
451 | 0 | std::vector<DataTypePtr> equality_delete_col_types; |
452 | | |
453 | | // Build delete col names/types/ids by matching field_ids from delete file schema. |
454 | | // Master iterates delete file's FieldDescriptor and uses field_id to match, |
455 | | // NOT idx-based pairing (get_parsed_schema order != field_ids order). |
456 | 0 | std::vector<std::string> delete_col_names; |
457 | 0 | std::vector<DataTypePtr> delete_col_types; |
458 | 0 | std::vector<int> delete_col_ids; |
459 | 0 | std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx; |
460 | |
|
461 | 0 | if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) { |
462 | 0 | const FieldDescriptor* delete_field_desc = nullptr; |
463 | 0 | RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&delete_field_desc)); |
464 | 0 | DCHECK(delete_field_desc != nullptr); |
465 | |
|
466 | 0 | for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) { |
467 | 0 | if (delete_file_field.field_id == -1) [[unlikely]] { |
468 | 0 | return Status::DataQualityError( |
469 | 0 | "missing field id when reading equality delete file"); |
470 | 0 | } |
471 | 0 | if (!read_column_field_ids_set.contains(delete_file_field.field_id)) { |
472 | 0 | continue; |
473 | 0 | } |
474 | 0 | if (delete_file_field.children.size() > 0) [[unlikely]] { |
475 | 0 | return Status::InternalError( |
476 | 0 | "can not support read complex column in equality delete file"); |
477 | 0 | } |
478 | | |
479 | 0 | delete_col_ids.emplace_back(delete_file_field.field_id); |
480 | 0 | delete_col_names.emplace_back(delete_file_field.name); |
481 | 0 | delete_col_types.emplace_back(make_nullable(delete_file_field.data_type)); |
482 | |
|
483 | 0 | int field_id = delete_file_field.field_id; |
484 | 0 | if (!_id_to_block_column_name.contains(field_id)) { |
485 | 0 | _id_to_block_column_name.emplace(field_id, delete_file_field.name); |
486 | 0 | _expand_col_names.emplace_back(delete_file_field.name); |
487 | 0 | _expand_columns.emplace_back( |
488 | 0 | make_nullable(delete_file_field.data_type)->create_column(), |
489 | 0 | make_nullable(delete_file_field.data_type), delete_file_field.name); |
490 | 0 | } |
491 | 0 | } |
492 | 0 | for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
493 | 0 | delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
494 | 0 | } |
495 | | // Delete files have TFileRangeDesc.size=-1, which would cause |
496 | | // set_fill_columns to return EndOfFile("No row group to read") |
497 | | // when _filter_groups is true. Master passes filter_groups=false. |
498 | 0 | ParquetInitContext eq_delete_ctx; |
499 | 0 | eq_delete_ctx.filter_groups = false; |
500 | 0 | eq_delete_ctx.column_names = delete_col_names; |
501 | 0 | eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx; |
502 | 0 | auto st2 = parquet_reader->init_reader(&eq_delete_ctx); |
503 | 0 | if (!st2.ok()) { |
504 | 0 | return st2; |
505 | 0 | } |
506 | 0 | } else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) { |
507 | | // For ORC: use get_parsed_schema with field_ids from delete_file |
508 | | // ORC field_ids come from the Thrift descriptor, not from ORC metadata |
509 | 0 | RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names, |
510 | 0 | &equality_delete_col_types)); |
511 | 0 | for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) { |
512 | 0 | if (idx < read_column_field_ids.size()) { |
513 | 0 | int field_id = read_column_field_ids[idx]; |
514 | 0 | if (!read_column_field_ids_set.contains(field_id)) continue; |
515 | 0 | delete_col_ids.emplace_back(field_id); |
516 | 0 | delete_col_names.emplace_back(equality_delete_col_names[idx]); |
517 | 0 | delete_col_types.emplace_back(make_nullable(equality_delete_col_types[idx])); |
518 | 0 | if (!_id_to_block_column_name.contains(field_id)) { |
519 | 0 | _id_to_block_column_name.emplace(field_id, equality_delete_col_names[idx]); |
520 | 0 | _expand_col_names.emplace_back(equality_delete_col_names[idx]); |
521 | 0 | _expand_columns.emplace_back( |
522 | 0 | make_nullable(equality_delete_col_types[idx])->create_column(), |
523 | 0 | make_nullable(equality_delete_col_types[idx]), |
524 | 0 | equality_delete_col_names[idx]); |
525 | 0 | } |
526 | 0 | } |
527 | 0 | } |
528 | 0 | for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
529 | 0 | delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
530 | 0 | } |
531 | 0 | OrcInitContext eq_delete_ctx; |
532 | 0 | eq_delete_ctx.column_names = delete_col_names; |
533 | 0 | eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx; |
534 | 0 | RETURN_IF_ERROR(orc_reader->init_reader(&eq_delete_ctx)); |
535 | 0 | } else { |
536 | 0 | return Status::InternalError("Unsupported format of delete file"); |
537 | 0 | } |
538 | | |
539 | 0 | if (!_equality_delete_block_map.contains(delete_col_ids)) { |
540 | 0 | _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); |
541 | 0 | Block block; |
542 | 0 | _generate_equality_delete_block(&block, delete_col_names, delete_col_types); |
543 | 0 | _equality_delete_blocks.emplace_back(block); |
544 | 0 | } |
545 | 0 | Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; |
546 | |
|
547 | 0 | bool eof = false; |
548 | 0 | while (!eof) { |
549 | 0 | Block tmp_block; |
550 | 0 | _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); |
551 | 0 | size_t read_rows = 0; |
552 | 0 | auto st = delete_reader->get_next_block(&tmp_block, &read_rows, &eof); |
553 | 0 | if (!st.ok()) { |
554 | 0 | return st; |
555 | 0 | } |
556 | 0 | if (read_rows > 0) { |
557 | 0 | ScopedMutableBlock scoped_mutable_block(&eq_file_block); |
558 | 0 | auto& mutable_block = scoped_mutable_block.mutable_block(); |
559 | 0 | RETURN_IF_ERROR(mutable_block.merge(tmp_block)); |
560 | 0 | } |
561 | 0 | } |
562 | 0 | } |
563 | | |
564 | 0 | for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { |
565 | 0 | auto& eq_file_block = _equality_delete_blocks[block_idx]; |
566 | 0 | auto equality_delete_impl = |
567 | 0 | EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); |
568 | 0 | RETURN_IF_ERROR(equality_delete_impl->init(this->get_profile())); |
569 | 0 | _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); |
570 | 0 | } |
571 | 0 | return Status::OK(); |
572 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE |
573 | | |
574 | | template <typename BaseReader> |
575 | | void IcebergReaderMixin<BaseReader>::_generate_equality_delete_block( |
576 | | Block* block, const std::vector<std::string>& equality_delete_col_names, |
577 | 0 | const std::vector<DataTypePtr>& equality_delete_col_types) { |
578 | 0 | for (int i = 0; i < equality_delete_col_names.size(); ++i) { |
579 | 0 | DataTypePtr data_type = make_nullable(equality_delete_col_types[i]); |
580 | 0 | MutableColumnPtr data_column = data_type->create_column(); |
581 | 0 | block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, |
582 | 0 | equality_delete_col_names[i])); |
583 | 0 | } |
584 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE |
585 | | |
586 | | template <typename BaseReader> |
587 | 2 | Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) { |
588 | 2 | std::set<std::string> names; |
589 | 2 | auto block_names = block->get_names(); |
590 | 2 | names.insert(block_names.begin(), block_names.end()); |
591 | 2 | for (auto& col : _expand_columns) { |
592 | 0 | if (names.contains(col.name)) { |
593 | 0 | return Status::InternalError("Wrong expand column '{}'", col.name); |
594 | 0 | } |
595 | 0 | names.insert(col.name); |
596 | 0 | (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns()); |
597 | 0 | block->insert({col.type->create_column(), col.type, col.name}); |
598 | 0 | } |
599 | 2 | return Status::OK(); |
600 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_expand_block_if_needEPNS_5BlockE Line | Count | Source | 587 | 1 | Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) { | 588 | 1 | std::set<std::string> names; | 589 | 1 | auto block_names = block->get_names(); | 590 | 1 | names.insert(block_names.begin(), block_names.end()); | 591 | 1 | for (auto& col : _expand_columns) { | 592 | 0 | if (names.contains(col.name)) { | 593 | 0 | return Status::InternalError("Wrong expand column '{}'", col.name); | 594 | 0 | } | 595 | 0 | names.insert(col.name); | 596 | 0 | (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns()); | 597 | 0 | block->insert({col.type->create_column(), col.type, col.name}); | 598 | 0 | } | 599 | 1 | return Status::OK(); | 600 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_expand_block_if_needEPNS_5BlockE Line | Count | Source | 587 | 1 | Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) { | 588 | 1 | std::set<std::string> names; | 589 | 1 | auto block_names = block->get_names(); | 590 | 1 | names.insert(block_names.begin(), block_names.end()); | 591 | 1 | for (auto& col : _expand_columns) { | 592 | 0 | if (names.contains(col.name)) { | 593 | 0 | return Status::InternalError("Wrong expand column '{}'", col.name); | 594 | 0 | } | 595 | 0 | names.insert(col.name); | 596 | 0 | (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns()); | 597 | 0 | block->insert({col.type->create_column(), col.type, col.name}); | 598 | 0 | } | 599 | 1 | return Status::OK(); | 600 | 1 | } |
|
601 | | |
602 | | template <typename BaseReader> |
603 | 2 | Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) { |
604 | 2 | std::set<size_t> positions_to_erase; |
605 | 2 | for (const std::string& expand_col : _expand_col_names) { |
606 | 0 | if (!this->col_name_to_block_idx_ref()->contains(expand_col)) { |
607 | 0 | return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, |
608 | 0 | block->dump_names()); |
609 | 0 | } |
610 | 0 | positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]); |
611 | 0 | } |
612 | 2 | block->erase(positions_to_erase); |
613 | 2 | for (const std::string& expand_col : _expand_col_names) { |
614 | 0 | this->col_name_to_block_idx_ref()->erase(expand_col); |
615 | 0 | } |
616 | 2 | return Status::OK(); |
617 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_shrink_block_if_needEPNS_5BlockE Line | Count | Source | 603 | 1 | Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) { | 604 | 1 | std::set<size_t> positions_to_erase; | 605 | 1 | for (const std::string& expand_col : _expand_col_names) { | 606 | 0 | if (!this->col_name_to_block_idx_ref()->contains(expand_col)) { | 607 | 0 | return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, | 608 | 0 | block->dump_names()); | 609 | 0 | } | 610 | 0 | positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]); | 611 | 0 | } | 612 | 1 | block->erase(positions_to_erase); | 613 | 1 | for (const std::string& expand_col : _expand_col_names) { | 614 | 0 | this->col_name_to_block_idx_ref()->erase(expand_col); | 615 | 0 | } | 616 | 1 | return Status::OK(); | 617 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_shrink_block_if_needEPNS_5BlockE Line | Count | Source | 603 | 1 | Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) { | 604 | 1 | std::set<size_t> positions_to_erase; | 605 | 1 | for (const std::string& expand_col : _expand_col_names) { | 606 | 0 | if (!this->col_name_to_block_idx_ref()->contains(expand_col)) { | 607 | 0 | return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, | 608 | 0 | block->dump_names()); | 609 | 0 | } | 610 | 0 | positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]); | 611 | 0 | } | 612 | 1 | block->erase(positions_to_erase); | 613 | 1 | for (const std::string& expand_col : _expand_col_names) { | 614 | 0 | this->col_name_to_block_idx_ref()->erase(expand_col); | 615 | 0 | } | 616 | 1 | return Status::OK(); | 617 | 1 | } |
|
618 | | |
619 | | template <typename BaseReader> |
620 | | Status IcebergReaderMixin<BaseReader>::_position_delete_base( |
621 | 0 | const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
622 | 0 | std::vector<DeleteRows*> delete_rows_array; |
623 | 0 | int64_t num_delete_rows = 0; |
624 | 0 | for (const auto& delete_file : delete_files) { |
625 | 0 | SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
626 | 0 | Status create_status = Status::OK(); |
627 | 0 | auto* delete_file_cache = _kv_cache->template get<DeleteFile>( |
628 | 0 | _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* { |
629 | 0 | auto* position_delete = new DeleteFile; |
630 | 0 | TFileRangeDesc delete_file_range; |
631 | 0 | delete_file_range.__set_fs_name(this->get_scan_range().fs_name); |
632 | 0 | delete_file_range.path = delete_file.path; |
633 | 0 | delete_file_range.start_offset = 0; |
634 | 0 | delete_file_range.size = -1; |
635 | 0 | delete_file_range.file_size = -1; |
636 | 0 | create_status = _read_position_delete_file(&delete_file_range, position_delete); |
637 | 0 | if (!create_status) { |
638 | 0 | return nullptr; |
639 | 0 | } |
640 | 0 | return position_delete; |
641 | 0 | }); Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev |
642 | 0 | if (create_status.is<ErrorCode::END_OF_FILE>()) { |
643 | 0 | continue; |
644 | 0 | } else if (!create_status.ok()) { |
645 | 0 | return create_status; |
646 | 0 | } |
647 | | |
648 | 0 | DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache); |
649 | 0 | auto get_value = [&](const auto& v) { |
650 | 0 | DeleteRows* row_ids = v.second.get(); |
651 | 0 | if (!row_ids->empty()) { |
652 | 0 | delete_rows_array.emplace_back(row_ids); |
653 | 0 | num_delete_rows += row_ids->size(); |
654 | 0 | } |
655 | 0 | }; Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_ Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_ |
656 | 0 | delete_file_map.if_contains(data_file_path, get_value); |
657 | 0 | } |
658 | 0 | if (num_delete_rows > 0) { |
659 | 0 | SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time); |
660 | 0 | _iceberg_delete_rows = |
661 | 0 | _kv_cache->template get<DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
662 | 0 | auto* data_file_position_delete = new DeleteRows; |
663 | 0 | _sort_delete_rows(delete_rows_array, num_delete_rows, |
664 | 0 | *data_file_position_delete); |
665 | 0 | return data_file_position_delete; |
666 | 0 | }); Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv |
667 | 0 | set_delete_rows(); |
668 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); |
669 | 0 | } |
670 | 0 | return Status::OK(); |
671 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE |
672 | | |
673 | | template <typename BaseReader> |
674 | | typename IcebergReaderMixin<BaseReader>::PositionDeleteRange |
675 | 0 | IcebergReaderMixin<BaseReader>::_get_range(const ColumnDictI32& file_path_column) { |
676 | 0 | PositionDeleteRange range; |
677 | 0 | size_t read_rows = file_path_column.get_data().size(); |
678 | 0 | const int* code_path = file_path_column.get_data().data(); |
679 | 0 | const int* code_path_start = code_path; |
680 | 0 | const int* code_path_end = code_path + read_rows; |
681 | 0 | while (code_path < code_path_end) { |
682 | 0 | int code = code_path[0]; |
683 | 0 | const int* code_end = std::upper_bound(code_path, code_path_end, code); |
684 | 0 | range.data_file_path.emplace_back(file_path_column.get_value(code).to_string()); |
685 | 0 | range.range.emplace_back(code_path - code_path_start, code_end - code_path_start); |
686 | 0 | code_path = code_end; |
687 | 0 | } |
688 | 0 | return range; |
689 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_13ColumnDictI32E Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_13ColumnDictI32E |
690 | | |
691 | | template <typename BaseReader> |
692 | | typename IcebergReaderMixin<BaseReader>::PositionDeleteRange |
693 | 0 | IcebergReaderMixin<BaseReader>::_get_range(const ColumnString& file_path_column) { |
694 | 0 | PositionDeleteRange range; |
695 | 0 | size_t read_rows = file_path_column.size(); |
696 | 0 | size_t index = 0; |
697 | 0 | while (index < read_rows) { |
698 | 0 | StringRef data_path = file_path_column.get_data_at(index); |
699 | 0 | size_t left = index - 1; |
700 | 0 | size_t right = read_rows; |
701 | 0 | while (left + 1 != right) { |
702 | 0 | size_t mid = left + (right - left) / 2; |
703 | 0 | if (file_path_column.get_data_at(mid) > data_path) { |
704 | 0 | right = mid; |
705 | 0 | } else { |
706 | 0 | left = mid; |
707 | 0 | } |
708 | 0 | } |
709 | 0 | range.data_file_path.emplace_back(data_path.to_string()); |
710 | 0 | range.range.emplace_back(index, left + 1); |
711 | 0 | index = left + 1; |
712 | 0 | } |
713 | 0 | return range; |
714 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_9ColumnStrIjEE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_9ColumnStrIjEE |
715 | | |
716 | | template <typename BaseReader> |
717 | | void IcebergReaderMixin<BaseReader>::_sort_delete_rows( |
718 | | const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows, |
719 | 0 | std::vector<int64_t>& result) { |
720 | 0 | if (delete_rows_array.empty()) { |
721 | 0 | return; |
722 | 0 | } |
723 | 0 | if (delete_rows_array.size() == 1) { |
724 | 0 | result.resize(num_delete_rows); |
725 | 0 | memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows); |
726 | 0 | return; |
727 | 0 | } |
728 | 0 | if (delete_rows_array.size() == 2) { |
729 | 0 | result.resize(num_delete_rows); |
730 | 0 | std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(), |
731 | 0 | delete_rows_array.back()->begin(), delete_rows_array.back()->end(), |
732 | 0 | result.begin()); |
733 | 0 | return; |
734 | 0 | } |
735 | | |
736 | 0 | using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>; |
737 | 0 | result.resize(num_delete_rows); |
738 | 0 | auto row_id_iter = result.begin(); |
739 | 0 | auto iter_end = result.end(); |
740 | 0 | std::vector<vec_pair> rows_array; |
741 | 0 | for (auto* rows : delete_rows_array) { |
742 | 0 | if (!rows->empty()) { |
743 | 0 | rows_array.emplace_back(rows->begin(), rows->end()); |
744 | 0 | } |
745 | 0 | } |
746 | 0 | size_t array_size = rows_array.size(); |
747 | 0 | while (row_id_iter != iter_end) { |
748 | 0 | int64_t min_index = 0; |
749 | 0 | int64_t min = *rows_array[0].first; |
750 | 0 | for (size_t i = 0; i < array_size; ++i) { |
751 | 0 | if (*rows_array[i].first < min) { |
752 | 0 | min_index = i; |
753 | 0 | min = *rows_array[i].first; |
754 | 0 | } |
755 | 0 | } |
756 | 0 | *row_id_iter++ = min; |
757 | 0 | rows_array[min_index].first++; |
758 | 0 | if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) { |
759 | 0 | rows_array.erase(rows_array.begin() + min_index); |
760 | 0 | array_size--; |
761 | 0 | } |
762 | 0 | } |
763 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_ Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_ |
764 | | |
765 | | template <typename BaseReader> |
766 | | void IcebergReaderMixin<BaseReader>::_gen_position_delete_file_range( |
767 | | Block& block, DeleteFile* position_delete, size_t read_rows, |
768 | 0 | bool file_path_column_dictionary_coded) { |
769 | 0 | SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
770 | 0 | auto name_to_pos_map = block.get_name_to_pos_map(); |
771 | 0 | ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column; |
772 | 0 | DCHECK_EQ(path_column->size(), read_rows); |
773 | 0 | ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column; |
774 | 0 | using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; |
775 | 0 | const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data(); |
776 | 0 | PositionDeleteRange range; |
777 | 0 | if (file_path_column_dictionary_coded) { |
778 | 0 | range = _get_range(assert_cast<const ColumnDictI32&>(*path_column)); |
779 | 0 | } else { |
780 | 0 | range = _get_range(assert_cast<const ColumnString&>(*path_column)); |
781 | 0 | } |
782 | 0 | for (int i = 0; i < range.range.size(); ++i) { |
783 | 0 | std::string key = range.data_file_path[i]; |
784 | 0 | auto iter = position_delete->find(key); |
785 | 0 | DeleteRows* delete_rows; |
786 | 0 | if (iter == position_delete->end()) { |
787 | 0 | delete_rows = new DeleteRows; |
788 | 0 | std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows); |
789 | 0 | (*position_delete)[key] = std::move(delete_rows_ptr); |
790 | 0 | } else { |
791 | 0 | delete_rows = iter->second.get(); |
792 | 0 | } |
793 | 0 | const int64_t* cpy_start = src_data + range.range[i].first; |
794 | 0 | const int64_t cpy_count = range.range[i].second - range.range[i].first; |
795 | 0 | int64_t origin_size = delete_rows->size(); |
796 | 0 | delete_rows->resize(origin_size + cpy_count); |
797 | 0 | int64_t* dest_position = &(*delete_rows)[origin_size]; |
798 | 0 | memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t)); |
799 | 0 | } |
800 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb |
801 | | |
802 | | template <typename BaseReader> |
803 | | Status IcebergReaderMixin<BaseReader>::read_deletion_vector( |
804 | 0 | const std::string& data_file_path, const TIcebergDeleteFileDesc& delete_file_desc) { |
805 | 0 | Status create_status = Status::OK(); |
806 | 0 | SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
807 | 0 | _iceberg_delete_rows = _kv_cache->template get< |
808 | 0 | DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
809 | 0 | auto* delete_rows = new DeleteRows; |
810 | |
|
811 | 0 | TFileRangeDesc delete_range; |
812 | 0 | delete_range.__set_fs_name(this->get_scan_range().fs_name); |
813 | 0 | delete_range.path = delete_file_desc.path; |
814 | 0 | delete_range.start_offset = delete_file_desc.content_offset; |
815 | 0 | delete_range.size = delete_file_desc.content_size_in_bytes; |
816 | 0 | delete_range.file_size = -1; |
817 | |
|
818 | 0 | DeletionVectorReader dv_reader(this->get_state(), this->get_profile(), |
819 | 0 | this->get_scan_params(), delete_range, this->get_io_ctx()); |
820 | 0 | create_status = dv_reader.open(); |
821 | 0 | if (!create_status.ok()) [[unlikely]] { |
822 | 0 | return nullptr; |
823 | 0 | } |
824 | | |
825 | 0 | size_t buffer_size = delete_range.size; |
826 | 0 | std::vector<char> buf(buffer_size); |
827 | 0 | if (buffer_size < 12) [[unlikely]] { |
828 | 0 | create_status = Status::DataQualityError("Deletion vector file size too small: {}", |
829 | 0 | buffer_size); |
830 | 0 | return nullptr; |
831 | 0 | } |
832 | | |
833 | 0 | create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size}); |
834 | 0 | if (!create_status) [[unlikely]] { |
835 | 0 | return nullptr; |
836 | 0 | } |
837 | | |
838 | 0 | auto total_length = BigEndian::Load32(buf.data()); |
839 | 0 | if (total_length + 8 != buffer_size) [[unlikely]] { |
840 | 0 | create_status = Status::DataQualityError( |
841 | 0 | "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8, |
842 | 0 | buffer_size); |
843 | 0 | return nullptr; |
844 | 0 | } |
845 | | |
846 | 0 | constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
847 | 0 | if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] { |
848 | 0 | create_status = Status::DataQualityError("Deletion vector magic number mismatch"); |
849 | 0 | return nullptr; |
850 | 0 | } |
851 | | |
852 | 0 | roaring::Roaring64Map bitmap; |
853 | 0 | SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
854 | 0 | try { |
855 | 0 | bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12); |
856 | 0 | } catch (const std::runtime_error& e) { |
857 | 0 | create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
858 | 0 | return nullptr; |
859 | 0 | } |
860 | | |
861 | 0 | delete_rows->reserve(bitmap.cardinality()); |
862 | 0 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
863 | 0 | delete_rows->push_back(*it); |
864 | 0 | } |
865 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size()); |
866 | 0 | return delete_rows; |
867 | 0 | }); Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv |
868 | |
|
869 | 0 | RETURN_IF_ERROR(create_status); |
870 | 0 | if (!_iceberg_delete_rows->empty()) [[likely]] { |
871 | 0 | set_delete_rows(); |
872 | 0 | } |
873 | 0 | return Status::OK(); |
874 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE |
875 | | |
876 | | } // namespace doris |