be/src/format/table/iceberg_reader_mixin.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <cstdint> |
22 | | #include <string> |
23 | | #include <unordered_map> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/consts.h" |
27 | | #include "common/status.h" |
28 | | #include "core/block/block.h" |
29 | | #include "core/column/column_dictionary.h" |
30 | | #include "core/column/column_nullable.h" |
31 | | #include "core/column/column_string.h" |
32 | | #include "core/column/column_struct.h" |
33 | | #include "core/data_type/data_type_number.h" |
34 | | #include "core/data_type/data_type_string.h" |
35 | | #include "format/generic_reader.h" |
36 | | #include "format/table/deletion_vector_reader.h" |
37 | | #include "format/table/equality_delete.h" |
38 | | #include "format/table/table_schema_change_helper.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "runtime/runtime_state.h" |
41 | | #include "storage/olap_common.h" |
42 | | |
43 | | namespace doris { |
44 | | class TIcebergDeleteFileDesc; |
45 | | } // namespace doris |
46 | | |
47 | | namespace doris { |
48 | | |
49 | | class ShardedKVCache; |
50 | | |
51 | | // CRTP mixin for Iceberg reader functionality. |
52 | | // BaseReader should be ParquetReader or OrcReader. |
53 | | // Inherits BaseReader + TableSchemaChangeHelper, providing shared Iceberg logic |
54 | | // (delete files, deletion vectors, equality delete, $row_id synthesis). |
55 | | // |
56 | | // Inheritance chain: |
57 | | // IcebergParquetReader -> IcebergReaderMixin<ParquetReader> -> ParquetReader -> GenericReader |
58 | | // IcebergOrcReader -> IcebergReaderMixin<OrcReader> -> OrcReader -> GenericReader |
59 | | template <typename BaseReader> |
60 | | class IcebergReaderMixin : public BaseReader, public TableSchemaChangeHelper { |
61 | | public: |
62 | | struct PositionDeleteRange { |
63 | | std::vector<std::string> data_file_path; |
64 | | std::vector<std::pair<int, int>> range; |
65 | | }; |
66 | | |
67 | | // Forward BaseReader constructor arguments + Iceberg-specific kv_cache |
68 | | template <typename... Args> |
69 | | IcebergReaderMixin(ShardedKVCache* kv_cache, Args&&... args) |
70 | 14 | : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) { |
71 | 14 | static const char* iceberg_profile = "IcebergProfile"; |
72 | 14 | ADD_TIMER(this->get_profile(), iceberg_profile); |
73 | 14 | _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles", |
74 | 14 | TUnit::UNIT, iceberg_profile); |
75 | 14 | _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows", |
76 | 14 | TUnit::UNIT, iceberg_profile); |
77 | 14 | _iceberg_profile.delete_files_read_time = |
78 | 14 | ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile); |
79 | 14 | _iceberg_profile.delete_rows_sort_time = |
80 | 14 | ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile); |
81 | 14 | _iceberg_profile.parse_delete_file_time = |
82 | 14 | ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile); |
83 | 14 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEEC2IJRPNS_14RuntimeProfileERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRPKN4cctz9time_zoneERPNS_2io9IOContextERPNS_12RuntimeStateERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_ Line | Count | Source | 70 | 7 | : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) { | 71 | 7 | static const char* iceberg_profile = "IcebergProfile"; | 72 | 7 | ADD_TIMER(this->get_profile(), iceberg_profile); | 73 | 7 | _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles", | 74 | 7 | TUnit::UNIT, iceberg_profile); | 75 | 7 | _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows", | 76 | 7 | TUnit::UNIT, iceberg_profile); | 77 | 7 | _iceberg_profile.delete_files_read_time = | 78 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile); | 79 | 7 | _iceberg_profile.delete_rows_sort_time = | 80 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile); | 81 | 7 | _iceberg_profile.parse_delete_file_time = | 82 | 7 | ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile); | 83 | 7 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEEC2IJRPNS_14RuntimeProfileERPNS_12RuntimeStateERKNS_20TFileScanRangeParamsERKNS_14TFileRangeDescERmRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERPNS_2io9IOContextERPNS_13FileMetaCacheEEEEPNS_14ShardedKVCacheEDpOT_ Line | Count | Source | 70 | 7 | : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) { | 71 | 7 | static const char* iceberg_profile = "IcebergProfile"; | 72 | 7 | ADD_TIMER(this->get_profile(), iceberg_profile); | 73 | 7 | _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles", | 74 | 7 | TUnit::UNIT, iceberg_profile); | 75 | 7 | _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows", | 76 | 7 | TUnit::UNIT, iceberg_profile); | 77 | 7 | _iceberg_profile.delete_files_read_time = | 78 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile); | 79 | 7 | _iceberg_profile.delete_rows_sort_time = | 80 | 7 | ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile); | 81 | 7 | _iceberg_profile.parse_delete_file_time = | 82 | 7 | ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile); | 83 | 7 | } |
|
84 | | |
85 | 14 | ~IcebergReaderMixin() override = default; _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEED2Ev Line | Count | Source | 85 | 7 | ~IcebergReaderMixin() override = default; |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEED2Ev Line | Count | Source | 85 | 7 | ~IcebergReaderMixin() override = default; |
|
86 | | |
87 | | void set_current_file_info(const std::string& file_path, int32_t partition_spec_id, |
88 | 0 | const std::string& partition_data_json) { |
89 | 0 | _current_file_path = file_path; |
90 | 0 | _partition_spec_id = partition_spec_id; |
91 | 0 | _partition_data_json = partition_data_json; |
92 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_ Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21set_current_file_infoERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSA_ |
93 | | |
94 | | enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR }; |
95 | | enum Fileformat { NONE, PARQUET, ORC, AVRO }; |
96 | | |
97 | | virtual void set_delete_rows() = 0; |
98 | | |
99 | | // Table-level COUNT(*) is handled by CountReader (created by FileScanner after |
100 | | // init_reader). If _do_get_next_block is called, COUNT must have been resolved. |
101 | 2 | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override { |
102 | 2 | DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT); |
103 | 2 | return BaseReader::_do_get_next_block(block, read_rows, eof); |
104 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE18_do_get_next_blockEPNS_5BlockEPmPb Line | Count | Source | 101 | 1 | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override { | 102 | | DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT); | 103 | 1 | return BaseReader::_do_get_next_block(block, read_rows, eof); | 104 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE18_do_get_next_blockEPNS_5BlockEPmPb Line | Count | Source | 101 | 1 | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override { | 102 | | DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT); | 103 | 1 | return BaseReader::_do_get_next_block(block, read_rows, eof); | 104 | 1 | } |
|
105 | | |
106 | | void set_create_row_id_column_iterator_func( |
107 | 0 | std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> create_func) { |
108 | 0 | _create_topn_row_id_column_iterator = create_func; |
109 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE38set_create_row_id_column_iterator_funcESt8functionIFSt10shared_ptrINS_10segment_v221RowIdColumnIteratorV2EEvEE |
110 | | |
111 | | protected: |
112 | | // ---- Hook implementations ---- |
113 | | |
114 | | // Called before reading a block: expand block for equality delete columns + detect row_id |
115 | 2 | Status on_before_read_block(Block* block) override { |
116 | 2 | RETURN_IF_ERROR(_expand_block_if_need(block)); |
117 | 2 | return Status::OK(); |
118 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20on_before_read_blockEPNS_5BlockE Line | Count | Source | 115 | 1 | Status on_before_read_block(Block* block) override { | 116 | 1 | RETURN_IF_ERROR(_expand_block_if_need(block)); | 117 | 1 | return Status::OK(); | 118 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20on_before_read_blockEPNS_5BlockE Line | Count | Source | 115 | 1 | Status on_before_read_block(Block* block) override { | 116 | 1 | RETURN_IF_ERROR(_expand_block_if_need(block)); | 117 | 1 | return Status::OK(); | 118 | 1 | } |
|
119 | | |
120 | | /// Fill Iceberg $row_id synthesized column. Registered as handler during init. |
121 | 0 | Status _fill_iceberg_row_id(Block* block, size_t rows) { |
122 | 0 | int row_id_pos = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL); |
123 | 0 | DORIS_CHECK(row_id_pos >= 0); |
124 | | |
125 | | // Lazy-init file info: only set when $row_id is actually needed. |
126 | 0 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
127 | 0 | std::string file_path = table_desc.original_file_path; |
128 | 0 | int32_t partition_spec_id = |
129 | 0 | table_desc.__isset.partition_spec_id ? table_desc.partition_spec_id : 0; |
130 | 0 | std::string partition_data_json; |
131 | 0 | if (table_desc.__isset.partition_data_json) { |
132 | 0 | partition_data_json = table_desc.partition_data_json; |
133 | 0 | } |
134 | 0 | set_current_file_info(file_path, partition_spec_id, partition_data_json); |
135 | |
|
136 | 0 | const auto& row_ids = this->current_batch_row_positions(); |
137 | 0 | auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_pos)); |
138 | 0 | MutableColumnPtr row_id_column; |
139 | 0 | RETURN_IF_ERROR(_build_iceberg_rowid_column(col_with_type.type, _current_file_path, row_ids, |
140 | 0 | _partition_spec_id, _partition_data_json, |
141 | 0 | &row_id_column)); |
142 | 0 | col_with_type.column = std::move(row_id_column); |
143 | 0 | return Status::OK(); |
144 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20_fill_iceberg_row_idEPNS_5BlockEm Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20_fill_iceberg_row_idEPNS_5BlockEm |
145 | | |
146 | 0 | void _init_row_lineage_columns() { |
147 | 0 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
148 | 0 | if (table_desc.__isset.first_row_id) { |
149 | 0 | _row_lineage_columns.first_row_id = table_desc.first_row_id; |
150 | 0 | } |
151 | 0 | if (table_desc.__isset.last_updated_sequence_number) { |
152 | 0 | _row_lineage_columns.last_updated_sequence_number = |
153 | 0 | table_desc.last_updated_sequence_number; |
154 | 0 | } |
155 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE25_init_row_lineage_columnsEv Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE25_init_row_lineage_columnsEv |
156 | | |
157 | 0 | Status _fill_row_lineage_row_id(Block* block, size_t rows) { |
158 | 0 | int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID); |
159 | 0 | DORIS_CHECK(col_pos >= 0); |
160 | | |
161 | 0 | if (_row_lineage_columns.first_row_id >= 0) { |
162 | 0 | auto col = block->get_by_position(col_pos).column->assume_mutable(); |
163 | 0 | auto* nullable_column = assert_cast<ColumnNullable*>(col.get()); |
164 | 0 | auto& null_map = nullable_column->get_null_map_data(); |
165 | 0 | auto& data = |
166 | 0 | assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
167 | 0 | const auto& row_ids = this->current_batch_row_positions(); |
168 | 0 | for (size_t i = 0; i < rows; ++i) { |
169 | 0 | if (null_map[i] != 0) { |
170 | 0 | null_map[i] = 0; |
171 | 0 | data[i] = _row_lineage_columns.first_row_id + static_cast<int64_t>(row_ids[i]); |
172 | 0 | } |
173 | 0 | } |
174 | 0 | } |
175 | 0 | return Status::OK(); |
176 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE24_fill_row_lineage_row_idEPNS_5BlockEm |
177 | | |
178 | 0 | Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t rows) { |
179 | 0 | int col_pos = block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER); |
180 | 0 | DORIS_CHECK(col_pos >= 0); |
181 | | |
182 | 0 | if (_row_lineage_columns.last_updated_sequence_number >= 0) { |
183 | 0 | auto col = block->get_by_position(col_pos).column->assume_mutable(); |
184 | 0 | auto* nullable_column = assert_cast<ColumnNullable*>(col.get()); |
185 | 0 | auto& null_map = nullable_column->get_null_map_data(); |
186 | 0 | auto& data = |
187 | 0 | assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
188 | 0 | for (size_t i = 0; i < rows; ++i) { |
189 | 0 | if (null_map[i] != 0) { |
190 | 0 | null_map[i] = 0; |
191 | 0 | data[i] = _row_lineage_columns.last_updated_sequence_number; |
192 | 0 | } |
193 | 0 | } |
194 | 0 | } |
195 | 0 | return Status::OK(); |
196 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE46_fill_row_lineage_last_updated_sequence_numberEPNS_5BlockEm |
197 | | |
198 | | // Called after reading a block: apply equality delete filter + shrink block |
199 | 2 | Status on_after_read_block(Block* block, size_t* read_rows) override { |
200 | 2 | if (!_equality_delete_impls.empty()) { |
201 | 0 | std::unique_ptr<IColumn::Filter> filter = |
202 | 0 | std::make_unique<IColumn::Filter>(block->rows(), 1); |
203 | 0 | for (auto& equality_delete_impl : _equality_delete_impls) { |
204 | 0 | RETURN_IF_ERROR(equality_delete_impl->filter_data_block( |
205 | 0 | block, this->col_name_to_block_idx_ref(), _id_to_block_column_name, |
206 | 0 | *filter)); |
207 | 0 | } |
208 | 0 | Block::filter_block_internal(block, *filter, block->columns()); |
209 | 0 | *read_rows = block->rows(); |
210 | 0 | } |
211 | 2 | return _shrink_block_if_need(block); |
212 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE19on_after_read_blockEPNS_5BlockEPm Line | Count | Source | 199 | 1 | Status on_after_read_block(Block* block, size_t* read_rows) override { | 200 | 1 | if (!_equality_delete_impls.empty()) { | 201 | 0 | std::unique_ptr<IColumn::Filter> filter = | 202 | 0 | std::make_unique<IColumn::Filter>(block->rows(), 1); | 203 | 0 | for (auto& equality_delete_impl : _equality_delete_impls) { | 204 | 0 | RETURN_IF_ERROR(equality_delete_impl->filter_data_block( | 205 | 0 | block, this->col_name_to_block_idx_ref(), _id_to_block_column_name, | 206 | 0 | *filter)); | 207 | 0 | } | 208 | 0 | Block::filter_block_internal(block, *filter, block->columns()); | 209 | 0 | *read_rows = block->rows(); | 210 | 0 | } | 211 | 1 | return _shrink_block_if_need(block); | 212 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE19on_after_read_blockEPNS_5BlockEPm Line | Count | Source | 199 | 1 | Status on_after_read_block(Block* block, size_t* read_rows) override { | 200 | 1 | if (!_equality_delete_impls.empty()) { | 201 | 0 | std::unique_ptr<IColumn::Filter> filter = | 202 | 0 | std::make_unique<IColumn::Filter>(block->rows(), 1); | 203 | 0 | for (auto& equality_delete_impl : _equality_delete_impls) { | 204 | 0 | RETURN_IF_ERROR(equality_delete_impl->filter_data_block( | 205 | 0 | block, this->col_name_to_block_idx_ref(), _id_to_block_column_name, | 206 | 0 | *filter)); | 207 | 0 | } | 208 | 0 | Block::filter_block_internal(block, *filter, block->columns()); | 209 | 0 | *read_rows = block->rows(); | 210 | 0 | } | 211 | 1 | return _shrink_block_if_need(block); | 212 | 1 | } |
|
213 | | |
214 | | // ---- Shared Iceberg methods ---- |
215 | | |
216 | | Status _init_row_filters(); |
217 | | Status _position_delete_base(const std::string data_file_path, |
218 | | const std::vector<TIcebergDeleteFileDesc>& delete_files); |
219 | | Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files); |
220 | | Status read_deletion_vector(const std::string& data_file_path, |
221 | | const TIcebergDeleteFileDesc& delete_file_desc); |
222 | | |
223 | | Status _expand_block_if_need(Block* block); |
224 | | Status _shrink_block_if_need(Block* block); |
225 | | |
226 | | // Type aliases — must be defined before member function declarations that use them. |
227 | | using DeleteRows = std::vector<int64_t>; |
228 | | using DeleteFile = phmap::parallel_flat_hash_map< |
229 | | std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>, |
230 | | std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, |
231 | | std::mutex>; |
232 | | |
233 | | PositionDeleteRange _get_range(const ColumnDictI32& file_path_column); |
234 | | PositionDeleteRange _get_range(const ColumnString& file_path_column); |
235 | | static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array, |
236 | | int64_t num_delete_rows, std::vector<int64_t>& result); |
237 | | void _gen_position_delete_file_range(Block& block, DeleteFile* position_delete, |
238 | | size_t read_rows, bool file_path_column_dictionary_coded); |
239 | | void _generate_equality_delete_block(Block* block, |
240 | | const std::vector<std::string>& equality_delete_col_names, |
241 | | const std::vector<DataTypePtr>& equality_delete_col_types); |
242 | | |
243 | | // Pure virtual: format-specific delete file reading |
244 | | virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0; |
245 | | virtual std::unique_ptr<GenericReader> _create_equality_reader( |
246 | | const TFileRangeDesc& delete_desc) = 0; |
247 | | |
248 | 0 | static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; }Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_delet_file_cache_keyERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
249 | | |
250 | | /// Build the Iceberg V2 row-id struct column. |
251 | | static Status _build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path, |
252 | | const std::vector<rowid_t>& row_ids, |
253 | | int32_t partition_spec_id, |
254 | | const std::string& partition_data_json, |
255 | 0 | MutableColumnPtr* column_out) { |
256 | 0 | if (type == nullptr || column_out == nullptr) { |
257 | 0 | return Status::InvalidArgument("Invalid iceberg rowid column type or output column"); |
258 | 0 | } |
259 | 0 | MutableColumnPtr column = type->create_column(); |
260 | 0 | ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get()); |
261 | 0 | ColumnStruct* struct_col = nullptr; |
262 | 0 | if (nullable_col != nullptr) { |
263 | 0 | struct_col = |
264 | 0 | check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get()); |
265 | 0 | } else { |
266 | 0 | struct_col = check_and_get_column<ColumnStruct>(column.get()); |
267 | 0 | } |
268 | 0 | if (struct_col == nullptr || struct_col->tuple_size() < 4) { |
269 | 0 | return Status::InternalError("Invalid iceberg rowid column structure"); |
270 | 0 | } |
271 | 0 | size_t num_rows = row_ids.size(); |
272 | 0 | auto& file_path_col = struct_col->get_column(0); |
273 | 0 | auto& row_pos_col = struct_col->get_column(1); |
274 | 0 | auto& spec_id_col = struct_col->get_column(2); |
275 | 0 | auto& partition_data_col = struct_col->get_column(3); |
276 | 0 | file_path_col.reserve(num_rows); |
277 | 0 | row_pos_col.reserve(num_rows); |
278 | 0 | spec_id_col.reserve(num_rows); |
279 | 0 | partition_data_col.reserve(num_rows); |
280 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
281 | 0 | file_path_col.insert_data(file_path.data(), file_path.size()); |
282 | 0 | } |
283 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
284 | 0 | int64_t row_pos = static_cast<int64_t>(row_ids[i]); |
285 | 0 | row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos)); |
286 | 0 | } |
287 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
288 | 0 | int32_t spec_id = partition_spec_id; |
289 | 0 | spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id)); |
290 | 0 | } |
291 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
292 | 0 | partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size()); |
293 | 0 | } |
294 | 0 | if (nullable_col != nullptr) { |
295 | 0 | nullable_col->get_null_map_data().resize_fill(num_rows, 0); |
296 | 0 | } |
297 | 0 | *column_out = std::move(column); |
298 | 0 | return Status::OK(); |
299 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE27_build_iceberg_rowid_columnERKSt10shared_ptrIKNS_9IDataTypeEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIjSaIjEEiSG_PNS_3COWINS_7IColumnEE11mutable_ptrISN_EE |
300 | | |
301 | | struct IcebergProfile { |
302 | | RuntimeProfile::Counter* num_delete_files; |
303 | | RuntimeProfile::Counter* num_delete_rows; |
304 | | RuntimeProfile::Counter* delete_files_read_time; |
305 | | RuntimeProfile::Counter* delete_rows_sort_time; |
306 | | RuntimeProfile::Counter* parse_delete_file_time; |
307 | | }; |
308 | | |
309 | | bool _need_row_id_column = false; |
310 | | std::string _current_file_path; |
311 | | int32_t _partition_spec_id = 0; |
312 | | std::string _partition_data_json; |
313 | | |
314 | | ShardedKVCache* _kv_cache; |
315 | | IcebergProfile _iceberg_profile; |
316 | | const std::vector<int64_t>* _iceberg_delete_rows = nullptr; |
317 | | std::vector<std::string> _expand_col_names; |
318 | | std::vector<ColumnWithTypeAndName> _expand_columns; |
319 | | std::vector<std::string> _all_required_col_names; |
320 | | Fileformat _file_format = Fileformat::NONE; |
321 | | |
322 | | const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; |
323 | | const std::string ICEBERG_FILE_PATH = "file_path"; |
324 | | const std::string ICEBERG_ROW_POS = "pos"; |
325 | | const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS}; |
326 | | const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = { |
327 | | {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}}; |
328 | | const int ICEBERG_FILE_PATH_INDEX = 0; |
329 | | const int ICEBERG_FILE_POS_INDEX = 1; |
330 | | const int READ_DELETE_FILE_BATCH_SIZE = 102400; |
331 | | |
332 | | // all ids that need read for eq delete (from all eq delete files) |
333 | | std::set<int> _equality_delete_col_ids; |
334 | | // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls |
335 | | std::map<std::vector<int>, int> _equality_delete_block_map; |
336 | | // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after |
337 | | // creating entries in _equality_delete_impls. |
338 | | std::vector<Block> _equality_delete_blocks; |
339 | | std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls; |
340 | | |
341 | | // id -> block column name |
342 | | std::unordered_map<int, std::string> _id_to_block_column_name; |
343 | | |
344 | | // File column names used during init |
345 | | std::vector<std::string> _file_col_names; |
346 | | |
347 | | std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> |
348 | | _create_topn_row_id_column_iterator; |
349 | | |
350 | | static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id"; |
351 | | static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = |
352 | | "_last_updated_sequence_number"; |
353 | | struct RowLineageColumns { |
354 | | int64_t first_row_id = -1; |
355 | | int64_t last_updated_sequence_number = -1; |
356 | | }; |
357 | | RowLineageColumns _row_lineage_columns; |
358 | | }; |
359 | | |
360 | | // ============================================================================ |
361 | | // Template method implementations (must be in header for templates) |
362 | | // ============================================================================ |
363 | | |
364 | | template <typename BaseReader> |
365 | 2 | Status IcebergReaderMixin<BaseReader>::_init_row_filters() { |
366 | | // COUNT(*) short-circuit |
367 | 2 | if (this->_push_down_agg_type == TPushAggOp::type::COUNT && |
368 | 2 | this->get_scan_range().table_format_params.__isset.table_level_row_count && |
369 | 2 | this->get_scan_range().table_format_params.table_level_row_count > 0) { |
370 | 0 | return Status::OK(); |
371 | 0 | } |
372 | | |
373 | 2 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
374 | 2 | const auto& version = table_desc.format_version; |
375 | 2 | if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { |
376 | 2 | return Status::OK(); |
377 | 2 | } |
378 | | |
379 | 0 | std::vector<TIcebergDeleteFileDesc> position_delete_files; |
380 | 0 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; |
381 | 0 | std::vector<TIcebergDeleteFileDesc> deletion_vector_files; |
382 | 0 | for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { |
383 | 0 | if (desc.content == POSITION_DELETE) { |
384 | 0 | position_delete_files.emplace_back(desc); |
385 | 0 | } else if (desc.content == EQUALITY_DELETE) { |
386 | 0 | equality_delete_files.emplace_back(desc); |
387 | 0 | } else if (desc.content == DELETION_VECTOR) { |
388 | 0 | deletion_vector_files.emplace_back(desc); |
389 | 0 | } |
390 | 0 | } |
391 | |
|
392 | 0 | if (!equality_delete_files.empty()) { |
393 | 0 | RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); |
394 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); |
395 | 0 | } |
396 | | |
397 | 0 | if (!deletion_vector_files.empty()) { |
398 | 0 | if (deletion_vector_files.size() != 1) [[unlikely]] { |
399 | | /* |
400 | | * Deletion vectors are a binary representation of deletes for a single data file that is more efficient |
401 | | * at execution time than position delete files. Unlike equality or position delete files, there can be |
402 | | * at most one deletion vector for a given data file in a snapshot. |
403 | | */ |
404 | 0 | return Status::DataQualityError("This iceberg data file has multiple DVs."); |
405 | 0 | } |
406 | 0 | RETURN_IF_ERROR( |
407 | 0 | read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); |
408 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); |
409 | 0 | } else if (!position_delete_files.empty()) { |
410 | 0 | RETURN_IF_ERROR( |
411 | 0 | _position_delete_base(table_desc.original_file_path, position_delete_files)); |
412 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); |
413 | 0 | } |
414 | | |
415 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); |
416 | 0 | return Status::OK(); |
417 | 0 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_init_row_filtersEv Line | Count | Source | 365 | 1 | Status IcebergReaderMixin<BaseReader>::_init_row_filters() { | 366 | | // COUNT(*) short-circuit | 367 | 1 | if (this->_push_down_agg_type == TPushAggOp::type::COUNT && | 368 | 1 | this->get_scan_range().table_format_params.__isset.table_level_row_count && | 369 | 1 | this->get_scan_range().table_format_params.table_level_row_count > 0) { | 370 | 0 | return Status::OK(); | 371 | 0 | } | 372 | | | 373 | 1 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; | 374 | 1 | const auto& version = table_desc.format_version; | 375 | 1 | if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { | 376 | 1 | return Status::OK(); | 377 | 1 | } | 378 | | | 379 | 0 | std::vector<TIcebergDeleteFileDesc> position_delete_files; | 380 | 0 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; | 381 | 0 | std::vector<TIcebergDeleteFileDesc> deletion_vector_files; | 382 | 0 | for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { | 383 | 0 | if (desc.content == POSITION_DELETE) { | 384 | 0 | position_delete_files.emplace_back(desc); | 385 | 0 | } else if (desc.content == EQUALITY_DELETE) { | 386 | 0 | equality_delete_files.emplace_back(desc); | 387 | 0 | } else if (desc.content == DELETION_VECTOR) { | 388 | 0 | deletion_vector_files.emplace_back(desc); | 389 | 0 | } | 390 | 0 | } | 391 | |
| 392 | 0 | if (!equality_delete_files.empty()) { | 393 | 0 | RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); | 394 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 395 | 0 | } | 396 | | | 397 | 0 | if (!deletion_vector_files.empty()) { | 398 | 0 | if (deletion_vector_files.size() != 1) [[unlikely]] { | 399 | | /* | 400 | | * Deletion vectors are a binary representation of deletes for a single data file that is more efficient | 401 | | * at execution time than position delete files. Unlike equality or position delete files, there can be | 402 | | * at most one deletion vector for a given data file in a snapshot. | 403 | | */ | 404 | 0 | return Status::DataQualityError("This iceberg data file has multiple DVs."); | 405 | 0 | } | 406 | 0 | RETURN_IF_ERROR( | 407 | 0 | read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); | 408 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 409 | 0 | } else if (!position_delete_files.empty()) { | 410 | 0 | RETURN_IF_ERROR( | 411 | 0 | _position_delete_base(table_desc.original_file_path, position_delete_files)); | 412 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 413 | 0 | } | 414 | | | 415 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); | 416 | 0 | return Status::OK(); | 417 | 0 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_init_row_filtersEv Line | Count | Source | 365 | 1 | Status IcebergReaderMixin<BaseReader>::_init_row_filters() { | 366 | | // COUNT(*) short-circuit | 367 | 1 | if (this->_push_down_agg_type == TPushAggOp::type::COUNT && | 368 | 1 | this->get_scan_range().table_format_params.__isset.table_level_row_count && | 369 | 1 | this->get_scan_range().table_format_params.table_level_row_count > 0) { | 370 | 0 | return Status::OK(); | 371 | 0 | } | 372 | | | 373 | 1 | const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; | 374 | 1 | const auto& version = table_desc.format_version; | 375 | 1 | if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { | 376 | 1 | return Status::OK(); | 377 | 1 | } | 378 | | | 379 | 0 | std::vector<TIcebergDeleteFileDesc> position_delete_files; | 380 | 0 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; | 381 | 0 | std::vector<TIcebergDeleteFileDesc> deletion_vector_files; | 382 | 0 | for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { | 383 | 0 | if (desc.content == POSITION_DELETE) { | 384 | 0 | position_delete_files.emplace_back(desc); | 385 | 0 | } else if (desc.content == EQUALITY_DELETE) { | 386 | 0 | equality_delete_files.emplace_back(desc); | 387 | 0 | } else if (desc.content == DELETION_VECTOR) { | 388 | 0 | deletion_vector_files.emplace_back(desc); | 389 | 0 | } | 390 | 0 | } | 391 | |
| 392 | 0 | if (!equality_delete_files.empty()) { | 393 | 0 | RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); | 394 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 395 | 0 | } | 396 | | | 397 | 0 | if (!deletion_vector_files.empty()) { | 398 | 0 | if (deletion_vector_files.size() != 1) [[unlikely]] { | 399 | | /* | 400 | | * Deletion vectors are a binary representation of deletes for a single data file that is more efficient | 401 | | * at execution time than position delete files. Unlike equality or position delete files, there can be | 402 | | * at most one deletion vector for a given data file in a snapshot. | 403 | | */ | 404 | 0 | return Status::DataQualityError("This iceberg data file has multiple DVs."); | 405 | 0 | } | 406 | 0 | RETURN_IF_ERROR( | 407 | 0 | read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); | 408 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 409 | 0 | } else if (!position_delete_files.empty()) { | 410 | 0 | RETURN_IF_ERROR( | 411 | 0 | _position_delete_base(table_desc.original_file_path, position_delete_files)); | 412 | 0 | this->set_push_down_agg_type(TPushAggOp::NONE); | 413 | 0 | } | 414 | | | 415 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); | 416 | 0 | return Status::OK(); | 417 | 0 | } |
|
418 | | |
419 | | template <typename BaseReader> |
420 | | Status IcebergReaderMixin<BaseReader>::_equality_delete_base( |
421 | 0 | const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
422 | 0 | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
423 | 0 | partition_columns; |
424 | 0 | std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
425 | |
|
426 | 0 | for (const auto& delete_file : delete_files) { |
427 | 0 | TFileRangeDesc delete_desc; |
428 | 0 | delete_desc.__set_fs_name(this->get_scan_range().fs_name); |
429 | 0 | delete_desc.path = delete_file.path; |
430 | 0 | delete_desc.start_offset = 0; |
431 | 0 | delete_desc.size = -1; |
432 | 0 | delete_desc.file_size = -1; |
433 | |
|
434 | 0 | if (!delete_file.__isset.field_ids) [[unlikely]] { |
435 | 0 | return Status::InternalError( |
436 | 0 | "missing delete field ids when reading equality delete file"); |
437 | 0 | } |
438 | 0 | auto& read_column_field_ids = delete_file.field_ids; |
439 | 0 | std::set<int> read_column_field_ids_set; |
440 | 0 | for (const auto& field_id : read_column_field_ids) { |
441 | 0 | read_column_field_ids_set.insert(field_id); |
442 | 0 | _equality_delete_col_ids.insert(field_id); |
443 | 0 | } |
444 | |
|
445 | 0 | std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc); |
446 | 0 | RETURN_IF_ERROR(delete_reader->init_schema_reader()); |
447 | | |
448 | 0 | std::vector<std::string> equality_delete_col_names; |
449 | 0 | std::vector<DataTypePtr> equality_delete_col_types; |
450 | | |
451 | | // Build delete col names/types/ids by matching field_ids from delete file schema. |
452 | | // Master iterates delete file's FieldDescriptor and uses field_id to match, |
453 | | // NOT idx-based pairing (get_parsed_schema order != field_ids order). |
454 | 0 | std::vector<std::string> delete_col_names; |
455 | 0 | std::vector<DataTypePtr> delete_col_types; |
456 | 0 | std::vector<int> delete_col_ids; |
457 | 0 | std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx; |
458 | |
|
459 | 0 | if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) { |
460 | 0 | const FieldDescriptor* delete_field_desc = nullptr; |
461 | 0 | RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&delete_field_desc)); |
462 | 0 | DCHECK(delete_field_desc != nullptr); |
463 | |
|
464 | 0 | for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) { |
465 | 0 | if (delete_file_field.field_id == -1) [[unlikely]] { |
466 | 0 | return Status::DataQualityError( |
467 | 0 | "missing field id when reading equality delete file"); |
468 | 0 | } |
469 | 0 | if (!read_column_field_ids_set.contains(delete_file_field.field_id)) { |
470 | 0 | continue; |
471 | 0 | } |
472 | 0 | if (delete_file_field.children.size() > 0) [[unlikely]] { |
473 | 0 | return Status::InternalError( |
474 | 0 | "can not support read complex column in equality delete file"); |
475 | 0 | } |
476 | | |
477 | 0 | delete_col_ids.emplace_back(delete_file_field.field_id); |
478 | 0 | delete_col_names.emplace_back(delete_file_field.name); |
479 | 0 | delete_col_types.emplace_back(make_nullable(delete_file_field.data_type)); |
480 | |
|
481 | 0 | int field_id = delete_file_field.field_id; |
482 | 0 | if (!_id_to_block_column_name.contains(field_id)) { |
483 | 0 | _id_to_block_column_name.emplace(field_id, delete_file_field.name); |
484 | 0 | _expand_col_names.emplace_back(delete_file_field.name); |
485 | 0 | _expand_columns.emplace_back( |
486 | 0 | make_nullable(delete_file_field.data_type)->create_column(), |
487 | 0 | make_nullable(delete_file_field.data_type), delete_file_field.name); |
488 | 0 | } |
489 | 0 | } |
490 | 0 | for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
491 | 0 | delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
492 | 0 | } |
493 | | // Delete files have TFileRangeDesc.size=-1, which would cause |
494 | | // set_fill_columns to return EndOfFile("No row group to read") |
495 | | // when _filter_groups is true. Master passes filter_groups=false. |
496 | 0 | ParquetInitContext eq_delete_ctx; |
497 | 0 | eq_delete_ctx.filter_groups = false; |
498 | 0 | eq_delete_ctx.column_names = delete_col_names; |
499 | 0 | eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx; |
500 | 0 | auto st2 = parquet_reader->init_reader(&eq_delete_ctx); |
501 | 0 | if (!st2.ok()) { |
502 | 0 | return st2; |
503 | 0 | } |
504 | 0 | } else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) { |
505 | | // For ORC: use get_parsed_schema with field_ids from delete_file |
506 | | // ORC field_ids come from the Thrift descriptor, not from ORC metadata |
507 | 0 | RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names, |
508 | 0 | &equality_delete_col_types)); |
509 | 0 | for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) { |
510 | 0 | if (idx < read_column_field_ids.size()) { |
511 | 0 | int field_id = read_column_field_ids[idx]; |
512 | 0 | if (!read_column_field_ids_set.contains(field_id)) continue; |
513 | 0 | delete_col_ids.emplace_back(field_id); |
514 | 0 | delete_col_names.emplace_back(equality_delete_col_names[idx]); |
515 | 0 | delete_col_types.emplace_back(make_nullable(equality_delete_col_types[idx])); |
516 | 0 | if (!_id_to_block_column_name.contains(field_id)) { |
517 | 0 | _id_to_block_column_name.emplace(field_id, equality_delete_col_names[idx]); |
518 | 0 | _expand_col_names.emplace_back(equality_delete_col_names[idx]); |
519 | 0 | _expand_columns.emplace_back( |
520 | 0 | make_nullable(equality_delete_col_types[idx])->create_column(), |
521 | 0 | make_nullable(equality_delete_col_types[idx]), |
522 | 0 | equality_delete_col_names[idx]); |
523 | 0 | } |
524 | 0 | } |
525 | 0 | } |
526 | 0 | for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
527 | 0 | delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
528 | 0 | } |
529 | 0 | OrcInitContext eq_delete_ctx; |
530 | 0 | eq_delete_ctx.column_names = delete_col_names; |
531 | 0 | eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx; |
532 | 0 | RETURN_IF_ERROR(orc_reader->init_reader(&eq_delete_ctx)); |
533 | 0 | } else { |
534 | 0 | return Status::InternalError("Unsupported format of delete file"); |
535 | 0 | } |
536 | | |
537 | 0 | if (!_equality_delete_block_map.contains(delete_col_ids)) { |
538 | 0 | _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); |
539 | 0 | Block block; |
540 | 0 | _generate_equality_delete_block(&block, delete_col_names, delete_col_types); |
541 | 0 | _equality_delete_blocks.emplace_back(block); |
542 | 0 | } |
543 | 0 | Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; |
544 | |
|
545 | 0 | bool eof = false; |
546 | 0 | while (!eof) { |
547 | 0 | Block tmp_block; |
548 | 0 | _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); |
549 | 0 | size_t read_rows = 0; |
550 | 0 | auto st = delete_reader->get_next_block(&tmp_block, &read_rows, &eof); |
551 | 0 | if (!st.ok()) { |
552 | 0 | return st; |
553 | 0 | } |
554 | 0 | if (read_rows > 0) { |
555 | 0 | MutableBlock mutable_block(&eq_file_block); |
556 | 0 | RETURN_IF_ERROR(mutable_block.merge(tmp_block)); |
557 | 0 | } |
558 | 0 | } |
559 | 0 | } |
560 | | |
561 | 0 | for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { |
562 | 0 | auto& eq_file_block = _equality_delete_blocks[block_idx]; |
563 | 0 | auto equality_delete_impl = |
564 | 0 | EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); |
565 | 0 | RETURN_IF_ERROR(equality_delete_impl->init(this->get_profile())); |
566 | 0 | _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); |
567 | 0 | } |
568 | 0 | return Status::OK(); |
569 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_equality_delete_baseERKSt6vectorINS_22TIcebergDeleteFileDescESaIS4_EE |
570 | | |
571 | | template <typename BaseReader> |
572 | | void IcebergReaderMixin<BaseReader>::_generate_equality_delete_block( |
573 | | Block* block, const std::vector<std::string>& equality_delete_col_names, |
574 | 0 | const std::vector<DataTypePtr>& equality_delete_col_types) { |
575 | 0 | for (int i = 0; i < equality_delete_col_names.size(); ++i) { |
576 | 0 | DataTypePtr data_type = make_nullable(equality_delete_col_types[i]); |
577 | 0 | MutableColumnPtr data_column = data_type->create_column(); |
578 | 0 | block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, |
579 | 0 | equality_delete_col_names[i])); |
580 | 0 | } |
581 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_generate_equality_delete_blockEPNS_5BlockERKSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISB_EERKS5_ISt10shared_ptrIKNS_9IDataTypeEESaISJ_EE |
582 | | |
583 | | template <typename BaseReader> |
584 | 2 | Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) { |
585 | 2 | std::set<std::string> names; |
586 | 2 | auto block_names = block->get_names(); |
587 | 2 | names.insert(block_names.begin(), block_names.end()); |
588 | 2 | for (auto& col : _expand_columns) { |
589 | 0 | col.column->assume_mutable()->clear(); |
590 | 0 | if (names.contains(col.name)) { |
591 | 0 | return Status::InternalError("Wrong expand column '{}'", col.name); |
592 | 0 | } |
593 | 0 | names.insert(col.name); |
594 | 0 | (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns()); |
595 | 0 | block->insert(col); |
596 | 0 | } |
597 | 2 | return Status::OK(); |
598 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_expand_block_if_needEPNS_5BlockE Line | Count | Source | 584 | 1 | Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) { | 585 | 1 | std::set<std::string> names; | 586 | 1 | auto block_names = block->get_names(); | 587 | 1 | names.insert(block_names.begin(), block_names.end()); | 588 | 1 | for (auto& col : _expand_columns) { | 589 | 0 | col.column->assume_mutable()->clear(); | 590 | 0 | if (names.contains(col.name)) { | 591 | 0 | return Status::InternalError("Wrong expand column '{}'", col.name); | 592 | 0 | } | 593 | 0 | names.insert(col.name); | 594 | 0 | (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns()); | 595 | 0 | block->insert(col); | 596 | 0 | } | 597 | 1 | return Status::OK(); | 598 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_expand_block_if_needEPNS_5BlockE Line | Count | Source | 584 | 1 | Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) { | 585 | 1 | std::set<std::string> names; | 586 | 1 | auto block_names = block->get_names(); | 587 | 1 | names.insert(block_names.begin(), block_names.end()); | 588 | 1 | for (auto& col : _expand_columns) { | 589 | 0 | col.column->assume_mutable()->clear(); | 590 | 0 | if (names.contains(col.name)) { | 591 | 0 | return Status::InternalError("Wrong expand column '{}'", col.name); | 592 | 0 | } | 593 | 0 | names.insert(col.name); | 594 | 0 | (*this->col_name_to_block_idx_ref())[col.name] = static_cast<uint32_t>(block->columns()); | 595 | 0 | block->insert(col); | 596 | 0 | } | 597 | 1 | return Status::OK(); | 598 | 1 | } |
|
599 | | |
600 | | template <typename BaseReader> |
601 | 2 | Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) { |
602 | 2 | std::set<size_t> positions_to_erase; |
603 | 2 | for (const std::string& expand_col : _expand_col_names) { |
604 | 0 | if (!this->col_name_to_block_idx_ref()->contains(expand_col)) { |
605 | 0 | return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, |
606 | 0 | block->dump_names()); |
607 | 0 | } |
608 | 0 | positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]); |
609 | 0 | } |
610 | 2 | block->erase(positions_to_erase); |
611 | 2 | for (const std::string& expand_col : _expand_col_names) { |
612 | 0 | this->col_name_to_block_idx_ref()->erase(expand_col); |
613 | 0 | } |
614 | 2 | return Status::OK(); |
615 | 2 | } _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_shrink_block_if_needEPNS_5BlockE Line | Count | Source | 601 | 1 | Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) { | 602 | 1 | std::set<size_t> positions_to_erase; | 603 | 1 | for (const std::string& expand_col : _expand_col_names) { | 604 | 0 | if (!this->col_name_to_block_idx_ref()->contains(expand_col)) { | 605 | 0 | return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, | 606 | 0 | block->dump_names()); | 607 | 0 | } | 608 | 0 | positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]); | 609 | 0 | } | 610 | 1 | block->erase(positions_to_erase); | 611 | 1 | for (const std::string& expand_col : _expand_col_names) { | 612 | 0 | this->col_name_to_block_idx_ref()->erase(expand_col); | 613 | 0 | } | 614 | 1 | return Status::OK(); | 615 | 1 | } |
_ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_shrink_block_if_needEPNS_5BlockE Line | Count | Source | 601 | 1 | Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) { | 602 | 1 | std::set<size_t> positions_to_erase; | 603 | 1 | for (const std::string& expand_col : _expand_col_names) { | 604 | 0 | if (!this->col_name_to_block_idx_ref()->contains(expand_col)) { | 605 | 0 | return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, | 606 | 0 | block->dump_names()); | 607 | 0 | } | 608 | 0 | positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]); | 609 | 0 | } | 610 | 1 | block->erase(positions_to_erase); | 611 | 1 | for (const std::string& expand_col : _expand_col_names) { | 612 | 0 | this->col_name_to_block_idx_ref()->erase(expand_col); | 613 | 0 | } | 614 | 1 | return Status::OK(); | 615 | 1 | } |
|
616 | | |
617 | | template <typename BaseReader> |
618 | | Status IcebergReaderMixin<BaseReader>::_position_delete_base( |
619 | 0 | const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
620 | 0 | std::vector<DeleteRows*> delete_rows_array; |
621 | 0 | int64_t num_delete_rows = 0; |
622 | 0 | for (const auto& delete_file : delete_files) { |
623 | 0 | SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
624 | 0 | Status create_status = Status::OK(); |
625 | 0 | auto* delete_file_cache = _kv_cache->template get<DeleteFile>( |
626 | 0 | _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* { |
627 | 0 | auto* position_delete = new DeleteFile; |
628 | 0 | TFileRangeDesc delete_file_range; |
629 | 0 | delete_file_range.__set_fs_name(this->get_scan_range().fs_name); |
630 | 0 | delete_file_range.path = delete_file.path; |
631 | 0 | delete_file_range.start_offset = 0; |
632 | 0 | delete_file_range.size = -1; |
633 | 0 | delete_file_range.file_size = -1; |
634 | 0 | create_status = _read_position_delete_file(&delete_file_range, position_delete); |
635 | 0 | if (!create_status) { |
636 | 0 | return nullptr; |
637 | 0 | } |
638 | 0 | return position_delete; |
639 | 0 | }); Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE_clB5cxx11Ev |
640 | 0 | if (create_status.is<ErrorCode::END_OF_FILE>()) { |
641 | 0 | continue; |
642 | 0 | } else if (!create_status.ok()) { |
643 | 0 | return create_status; |
644 | 0 | } |
645 | | |
646 | 0 | DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache); |
647 | 0 | auto get_value = [&](const auto& v) { |
648 | 0 | DeleteRows* row_ids = v.second.get(); |
649 | 0 | if (!row_ids->empty()) { |
650 | 0 | delete_rows_array.emplace_back(row_ids); |
651 | 0 | num_delete_rows += row_ids->size(); |
652 | 0 | } |
653 | 0 | }; Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_ Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlRKT_E_clISt4pairIKS8_St10unique_ptrIS9_IlSaIlEESt14default_deleteISO_EEEEEDaSH_ |
654 | 0 | delete_file_map.if_contains(data_file_path, get_value); |
655 | 0 | } |
656 | 0 | if (num_delete_rows > 0) { |
657 | 0 | SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time); |
658 | 0 | _iceberg_delete_rows = |
659 | 0 | _kv_cache->template get<DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
660 | 0 | auto* data_file_position_delete = new DeleteRows; |
661 | 0 | _sort_delete_rows(delete_rows_array, num_delete_rows, |
662 | 0 | *data_file_position_delete); |
663 | 0 | return data_file_position_delete; |
664 | 0 | }); Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EEENKUlvE0_clEv |
665 | 0 | set_delete_rows(); |
666 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); |
667 | 0 | } |
668 | 0 | return Status::OK(); |
669 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE21_position_delete_baseENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorINS_22TIcebergDeleteFileDescESaISA_EE |
670 | | |
671 | | template <typename BaseReader> |
672 | | typename IcebergReaderMixin<BaseReader>::PositionDeleteRange |
673 | 0 | IcebergReaderMixin<BaseReader>::_get_range(const ColumnDictI32& file_path_column) { |
674 | 0 | PositionDeleteRange range; |
675 | 0 | size_t read_rows = file_path_column.get_data().size(); |
676 | 0 | const int* code_path = file_path_column.get_data().data(); |
677 | 0 | const int* code_path_start = code_path; |
678 | 0 | const int* code_path_end = code_path + read_rows; |
679 | 0 | while (code_path < code_path_end) { |
680 | 0 | int code = code_path[0]; |
681 | 0 | const int* code_end = std::upper_bound(code_path, code_path_end, code); |
682 | 0 | range.data_file_path.emplace_back(file_path_column.get_value(code).to_string()); |
683 | 0 | range.range.emplace_back(code_path - code_path_start, code_end - code_path_start); |
684 | 0 | code_path = code_end; |
685 | 0 | } |
686 | 0 | return range; |
687 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_13ColumnDictI32E Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_13ColumnDictI32E |
688 | | |
689 | | template <typename BaseReader> |
690 | | typename IcebergReaderMixin<BaseReader>::PositionDeleteRange |
691 | 0 | IcebergReaderMixin<BaseReader>::_get_range(const ColumnString& file_path_column) { |
692 | 0 | PositionDeleteRange range; |
693 | 0 | size_t read_rows = file_path_column.size(); |
694 | 0 | size_t index = 0; |
695 | 0 | while (index < read_rows) { |
696 | 0 | StringRef data_path = file_path_column.get_data_at(index); |
697 | 0 | size_t left = index - 1; |
698 | 0 | size_t right = read_rows; |
699 | 0 | while (left + 1 != right) { |
700 | 0 | size_t mid = left + (right - left) / 2; |
701 | 0 | if (file_path_column.get_data_at(mid) > data_path) { |
702 | 0 | right = mid; |
703 | 0 | } else { |
704 | 0 | left = mid; |
705 | 0 | } |
706 | 0 | } |
707 | 0 | range.data_file_path.emplace_back(data_path.to_string()); |
708 | 0 | range.range.emplace_back(index, left + 1); |
709 | 0 | index = left + 1; |
710 | 0 | } |
711 | 0 | return range; |
712 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE10_get_rangeERKNS_9ColumnStrIjEE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE10_get_rangeERKNS_9ColumnStrIjEE |
713 | | |
714 | | template <typename BaseReader> |
715 | | void IcebergReaderMixin<BaseReader>::_sort_delete_rows( |
716 | | const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows, |
717 | 0 | std::vector<int64_t>& result) { |
718 | 0 | if (delete_rows_array.empty()) { |
719 | 0 | return; |
720 | 0 | } |
721 | 0 | if (delete_rows_array.size() == 1) { |
722 | 0 | result.resize(num_delete_rows); |
723 | 0 | memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows); |
724 | 0 | return; |
725 | 0 | } |
726 | 0 | if (delete_rows_array.size() == 2) { |
727 | 0 | result.resize(num_delete_rows); |
728 | 0 | std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(), |
729 | 0 | delete_rows_array.back()->begin(), delete_rows_array.back()->end(), |
730 | 0 | result.begin()); |
731 | 0 | return; |
732 | 0 | } |
733 | | |
734 | 0 | using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>; |
735 | 0 | result.resize(num_delete_rows); |
736 | 0 | auto row_id_iter = result.begin(); |
737 | 0 | auto iter_end = result.end(); |
738 | 0 | std::vector<vec_pair> rows_array; |
739 | 0 | for (auto* rows : delete_rows_array) { |
740 | 0 | if (!rows->empty()) { |
741 | 0 | rows_array.emplace_back(rows->begin(), rows->end()); |
742 | 0 | } |
743 | 0 | } |
744 | 0 | size_t array_size = rows_array.size(); |
745 | 0 | while (row_id_iter != iter_end) { |
746 | 0 | int64_t min_index = 0; |
747 | 0 | int64_t min = *rows_array[0].first; |
748 | 0 | for (size_t i = 0; i < array_size; ++i) { |
749 | 0 | if (*rows_array[i].first < min) { |
750 | 0 | min_index = i; |
751 | 0 | min = *rows_array[i].first; |
752 | 0 | } |
753 | 0 | } |
754 | 0 | *row_id_iter++ = min; |
755 | 0 | rows_array[min_index].first++; |
756 | 0 | if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) { |
757 | 0 | rows_array.erase(rows_array.begin() + min_index); |
758 | 0 | array_size--; |
759 | 0 | } |
760 | 0 | } |
761 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_ Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE17_sort_delete_rowsERKSt6vectorIPS3_IlSaIlEESaIS6_EElRS5_ |
762 | | |
763 | | template <typename BaseReader> |
764 | | void IcebergReaderMixin<BaseReader>::_gen_position_delete_file_range( |
765 | | Block& block, DeleteFile* position_delete, size_t read_rows, |
766 | 0 | bool file_path_column_dictionary_coded) { |
767 | 0 | SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
768 | 0 | auto name_to_pos_map = block.get_name_to_pos_map(); |
769 | 0 | ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column; |
770 | 0 | DCHECK_EQ(path_column->size(), read_rows); |
771 | 0 | ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column; |
772 | 0 | using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; |
773 | 0 | const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data(); |
774 | 0 | PositionDeleteRange range; |
775 | 0 | if (file_path_column_dictionary_coded) { |
776 | 0 | range = _get_range(assert_cast<const ColumnDictI32&>(*path_column)); |
777 | 0 | } else { |
778 | 0 | range = _get_range(assert_cast<const ColumnString&>(*path_column)); |
779 | 0 | } |
780 | 0 | for (int i = 0; i < range.range.size(); ++i) { |
781 | 0 | std::string key = range.data_file_path[i]; |
782 | 0 | auto iter = position_delete->find(key); |
783 | 0 | DeleteRows* delete_rows; |
784 | 0 | if (iter == position_delete->end()) { |
785 | 0 | delete_rows = new DeleteRows; |
786 | 0 | std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows); |
787 | 0 | (*position_delete)[key] = std::move(delete_rows_ptr); |
788 | 0 | } else { |
789 | 0 | delete_rows = iter->second.get(); |
790 | 0 | } |
791 | 0 | const int64_t* cpy_start = src_data + range.range[i].first; |
792 | 0 | const int64_t cpy_count = range.range[i].second - range.range[i].first; |
793 | 0 | int64_t origin_size = delete_rows->size(); |
794 | 0 | delete_rows->resize(origin_size + cpy_count); |
795 | 0 | int64_t* dest_position = &(*delete_rows)[origin_size]; |
796 | 0 | memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t)); |
797 | 0 | } |
798 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE31_gen_position_delete_file_rangeERNS_5BlockEPN5phmap22parallel_flat_hash_mapINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10unique_ptrISt6vectorIlSaIlEESt14default_deleteISG_EESt4hashISC_ESt8equal_toIvESaISt4pairIKSC_SJ_EELm8ESt5mutexEEmb |
799 | | |
800 | | template <typename BaseReader> |
801 | | Status IcebergReaderMixin<BaseReader>::read_deletion_vector( |
802 | 0 | const std::string& data_file_path, const TIcebergDeleteFileDesc& delete_file_desc) { |
803 | 0 | Status create_status = Status::OK(); |
804 | 0 | SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
805 | 0 | _iceberg_delete_rows = _kv_cache->template get< |
806 | 0 | DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
807 | 0 | auto* delete_rows = new DeleteRows; |
808 | |
|
809 | 0 | TFileRangeDesc delete_range; |
810 | 0 | delete_range.__set_fs_name(this->get_scan_range().fs_name); |
811 | 0 | delete_range.path = delete_file_desc.path; |
812 | 0 | delete_range.start_offset = delete_file_desc.content_offset; |
813 | 0 | delete_range.size = delete_file_desc.content_size_in_bytes; |
814 | 0 | delete_range.file_size = -1; |
815 | |
|
816 | 0 | DeletionVectorReader dv_reader(this->get_state(), this->get_profile(), |
817 | 0 | this->get_scan_params(), delete_range, this->get_io_ctx()); |
818 | 0 | create_status = dv_reader.open(); |
819 | 0 | if (!create_status.ok()) [[unlikely]] { |
820 | 0 | return nullptr; |
821 | 0 | } |
822 | | |
823 | 0 | size_t buffer_size = delete_range.size; |
824 | 0 | std::vector<char> buf(buffer_size); |
825 | 0 | if (buffer_size < 12) [[unlikely]] { |
826 | 0 | create_status = Status::DataQualityError("Deletion vector file size too small: {}", |
827 | 0 | buffer_size); |
828 | 0 | return nullptr; |
829 | 0 | } |
830 | | |
831 | 0 | create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size}); |
832 | 0 | if (!create_status) [[unlikely]] { |
833 | 0 | return nullptr; |
834 | 0 | } |
835 | | |
836 | 0 | auto total_length = BigEndian::Load32(buf.data()); |
837 | 0 | if (total_length + 8 != buffer_size) [[unlikely]] { |
838 | 0 | create_status = Status::DataQualityError( |
839 | 0 | "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8, |
840 | 0 | buffer_size); |
841 | 0 | return nullptr; |
842 | 0 | } |
843 | | |
844 | 0 | constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
845 | 0 | if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] { |
846 | 0 | create_status = Status::DataQualityError("Deletion vector magic number mismatch"); |
847 | 0 | return nullptr; |
848 | 0 | } |
849 | | |
850 | 0 | roaring::Roaring64Map bitmap; |
851 | 0 | SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
852 | 0 | try { |
853 | 0 | bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12); |
854 | 0 | } catch (const std::runtime_error& e) { |
855 | 0 | create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
856 | 0 | return nullptr; |
857 | 0 | } |
858 | | |
859 | 0 | delete_rows->reserve(bitmap.cardinality()); |
860 | 0 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
861 | 0 | delete_rows->push_back(*it); |
862 | 0 | } |
863 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size()); |
864 | 0 | return delete_rows; |
865 | 0 | }); Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescEENKUlvE_clEv |
866 | |
|
867 | 0 | RETURN_IF_ERROR(create_status); |
868 | 0 | if (!_iceberg_delete_rows->empty()) [[likely]] { |
869 | 0 | set_delete_rows(); |
870 | 0 | } |
871 | 0 | return Status::OK(); |
872 | 0 | } Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_13ParquetReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE Unexecuted instantiation: _ZN5doris18IcebergReaderMixinINS_9OrcReaderEE20read_deletion_vectorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS_22TIcebergDeleteFileDescE |
873 | | |
874 | | } // namespace doris |