be/src/format_v2/table/iceberg_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/table/iceberg_reader.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cstring> |
22 | | #include <memory> |
23 | | #include <sstream> |
24 | | #include <utility> |
25 | | |
26 | | #include "common/cast_set.h" |
27 | | #include "common/consts.h" |
28 | | #include "core/assert_cast.h" |
29 | | #include "core/block/block.h" |
30 | | #include "core/column/column_const.h" |
31 | | #include "core/column/column_nullable.h" |
32 | | #include "core/column/column_string.h" |
33 | | #include "core/column/column_struct.h" |
34 | | #include "core/column/column_vector.h" |
35 | | #include "core/data_type/data_type_number.h" |
36 | | #include "core/data_type/define_primitive_type.h" |
37 | | #include "core/field.h" |
38 | | #include "exprs/vslot_ref.h" |
39 | | #include "format/table/deletion_vector_reader.h" |
40 | | #include "format_v2/expr/cast.h" |
41 | | #include "format_v2/expr/equality_delete_predicate.h" |
42 | | #include "format_v2/parquet/parquet_reader.h" |
43 | | #include "format_v2/parquet/reader/column_reader.h" |
44 | | #include "format_v2/table_reader.h" |
45 | | #include "io/file_factory.h" |
46 | | |
47 | | namespace doris::format::iceberg { |
48 | | |
49 | | static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id"; |
50 | | static constexpr int32_t ROW_LINEAGE_ROW_ID_FIELD_ID = 2147483540; |
51 | | |
52 | | template <typename T> |
53 | 0 | static std::string join_values_for_debug(const std::vector<T>& values) { |
54 | 0 | std::ostringstream out; |
55 | 0 | out << "["; |
56 | 0 | for (size_t idx = 0; idx < values.size(); ++idx) { |
57 | 0 | if (idx > 0) { |
58 | 0 | out << ", "; |
59 | 0 | } |
60 | 0 | out << values[idx]; |
61 | 0 | } |
62 | 0 | out << "]"; |
63 | 0 | return out.str(); |
64 | 0 | } |
65 | | |
66 | 1 | static bool is_projected_row_lineage_row_id(const format::ColumnDefinition& column) { |
67 | | // Iceberg row lineage columns can be bound by field id when a mapper has already been built, |
68 | | // but customize_file_scan_request() is also exercised directly by scan-request tests before the |
69 | | // mapper exists. In that path, inspect the projected table schema so row-position dependencies |
70 | | // are still added for `_row_id`. |
71 | 1 | return column.name == ROW_LINEAGE_ROW_ID || |
72 | 1 | (column.has_identifier_field_id() && |
73 | 0 | column.get_identifier_field_id() == ROW_LINEAGE_ROW_ID_FIELD_ID); |
74 | 1 | } |
75 | | |
76 | 15 | static bool is_projected_iceberg_rowid(const format::ColumnDefinition& column) { |
77 | 15 | return column.name == BeConsts::ICEBERG_ROWID_COL; |
78 | 15 | } |
79 | | |
80 | 0 | static std::string iceberg_delete_file_debug_string(const TIcebergDeleteFileDesc& delete_file) { |
81 | 0 | std::ostringstream out; |
82 | 0 | out << "TIcebergDeleteFileDesc{path=" << (delete_file.__isset.path ? delete_file.path : "null") |
83 | 0 | << ", content=" << (delete_file.__isset.content ? delete_file.content : -1) |
84 | 0 | << ", file_format=" |
85 | 0 | << (delete_file.__isset.file_format ? static_cast<int>(delete_file.file_format) : -1) |
86 | 0 | << ", position_lower_bound=" |
87 | 0 | << (delete_file.__isset.position_lower_bound ? delete_file.position_lower_bound : -1) |
88 | 0 | << ", position_upper_bound=" |
89 | 0 | << (delete_file.__isset.position_upper_bound ? delete_file.position_upper_bound : -1) |
90 | 0 | << ", field_ids=" |
91 | 0 | << (delete_file.__isset.field_ids ? join_values_for_debug(delete_file.field_ids) : "[]") |
92 | 0 | << ", content_offset=" |
93 | 0 | << (delete_file.__isset.content_offset ? delete_file.content_offset : -1) |
94 | 0 | << ", content_size_in_bytes=" |
95 | 0 | << (delete_file.__isset.content_size_in_bytes ? delete_file.content_size_in_bytes : -1) |
96 | 0 | << "}"; |
97 | 0 | return out.str(); |
98 | 0 | } |
99 | | |
100 | | static std::string iceberg_delete_files_debug_string( |
101 | 0 | const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
102 | 0 | std::ostringstream out; |
103 | 0 | out << "["; |
104 | 0 | for (size_t idx = 0; idx < delete_files.size(); ++idx) { |
105 | 0 | if (idx > 0) { |
106 | 0 | out << ", "; |
107 | 0 | } |
108 | 0 | out << iceberg_delete_file_debug_string(delete_files[idx]); |
109 | 0 | } |
110 | 0 | out << "]"; |
111 | 0 | return out.str(); |
112 | 0 | } |
113 | | |
114 | 0 | static std::string iceberg_params_debug_string(const std::optional<TIcebergFileDesc>& params) { |
115 | 0 | if (!params.has_value()) { |
116 | 0 | return "null"; |
117 | 0 | } |
118 | 0 | const auto& iceberg_params = *params; |
119 | 0 | std::ostringstream out; |
120 | 0 | out << "TIcebergFileDesc{format_version=" |
121 | 0 | << (iceberg_params.__isset.format_version ? iceberg_params.format_version : -1) |
122 | 0 | << ", content=" << (iceberg_params.__isset.content ? iceberg_params.content : -1) |
123 | 0 | << ", original_file_path=" |
124 | 0 | << (iceberg_params.__isset.original_file_path ? iceberg_params.original_file_path : "null") |
125 | 0 | << ", row_count=" << (iceberg_params.__isset.row_count ? iceberg_params.row_count : -1) |
126 | 0 | << ", partition_spec_id=" |
127 | 0 | << (iceberg_params.__isset.partition_spec_id ? iceberg_params.partition_spec_id : 0) |
128 | 0 | << ", has_partition_data_json=" << iceberg_params.__isset.partition_data_json |
129 | 0 | << ", first_row_id=" |
130 | 0 | << (iceberg_params.__isset.first_row_id ? iceberg_params.first_row_id : -1) |
131 | 0 | << ", last_updated_sequence_number=" |
132 | 0 | << (iceberg_params.__isset.last_updated_sequence_number |
133 | 0 | ? iceberg_params.last_updated_sequence_number |
134 | 0 | : -1) |
135 | 0 | << ", delete_file_count=" |
136 | 0 | << (iceberg_params.__isset.delete_files ? iceberg_params.delete_files.size() : 0) |
137 | 0 | << ", delete_files=" |
138 | 0 | << (iceberg_params.__isset.delete_files |
139 | 0 | ? iceberg_delete_files_debug_string(iceberg_params.delete_files) |
140 | 0 | : "[]") |
141 | 0 | << ", has_serialized_split=" << iceberg_params.__isset.serialized_split << "}"; |
142 | 0 | return out.str(); |
143 | 0 | } |
144 | | |
145 | | IcebergTableReader::PositionDeleteRowsCollector::PositionDeleteRowsCollector( |
146 | | std::string data_file_path, format::DeleteRows* rows) |
147 | 6 | : _data_file_path(std::move(data_file_path)), _rows(rows) {} |
148 | | |
149 | | Status IcebergTableReader::PositionDeleteRowsCollector::collect(const Block& block, |
150 | 12 | size_t read_rows) { |
151 | 12 | if (read_rows == 0) { |
152 | 6 | return Status::OK(); |
153 | 6 | } |
154 | 6 | const auto& file_path_column = assert_cast<const ColumnString&>( |
155 | 6 | *remove_nullable((block.get_by_position(ICEBERG_FILE_PATH_BLOCK_POSITION).column))); |
156 | 6 | const auto& pos_column = assert_cast<const ColumnInt64&>( |
157 | 6 | *remove_nullable(block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION).column)); |
158 | 15 | for (size_t row = 0; row < read_rows; ++row) { |
159 | 9 | const auto file_path = file_path_column.get_data_at(row).to_string(); |
160 | 9 | if (file_path == _data_file_path) { |
161 | 8 | _rows->push_back(pos_column.get_element(row)); |
162 | 8 | } |
163 | 9 | } |
164 | 6 | return Status::OK(); |
165 | 12 | } |
166 | | |
167 | 21 | Status IcebergTableReader::prepare_split(const format::SplitReadOptions& options) { |
168 | 21 | _row_lineage_columns = {}; |
169 | 21 | _iceberg_params.reset(); |
170 | 21 | _delete_predicates_initialized = false; |
171 | 21 | _position_delete_rows_storage.clear(); |
172 | 21 | _equality_delete_filters.clear(); |
173 | 21 | if (options.current_range.__isset.table_format_params && |
174 | 21 | options.current_range.table_format_params.__isset.iceberg_params) { |
175 | 19 | const auto& iceberg_params = options.current_range.table_format_params.iceberg_params; |
176 | 19 | _iceberg_params = iceberg_params; |
177 | 19 | if (iceberg_params.__isset.first_row_id) { |
178 | 8 | _row_lineage_columns.first_row_id = iceberg_params.first_row_id; |
179 | 8 | } |
180 | 19 | if (iceberg_params.__isset.last_updated_sequence_number) { |
181 | 6 | _row_lineage_columns.last_updated_sequence_number = |
182 | 6 | iceberg_params.last_updated_sequence_number; |
183 | 6 | } |
184 | 19 | } |
185 | 21 | RETURN_IF_ERROR(TableReader::prepare_split(options)); |
186 | 21 | if (_is_table_level_count_active()) { |
187 | 1 | return Status::OK(); |
188 | 1 | } |
189 | 20 | RETURN_IF_ERROR(_init_delete_predicates(options.current_range.table_format_params)); |
190 | 20 | return Status::OK(); |
191 | 20 | } |
192 | | |
193 | 0 | std::string IcebergTableReader::debug_string() const { |
194 | 0 | size_t position_delete_file_count = 0; |
195 | 0 | size_t equality_delete_file_count = 0; |
196 | 0 | size_t deletion_vector_file_count = 0; |
197 | 0 | if (_iceberg_params.has_value() && _iceberg_params->__isset.delete_files) { |
198 | 0 | for (const auto& delete_file : _iceberg_params->delete_files) { |
199 | 0 | if (!delete_file.__isset.content) { |
200 | 0 | continue; |
201 | 0 | } |
202 | 0 | if (delete_file.content == POSITION_DELETE) { |
203 | 0 | ++position_delete_file_count; |
204 | 0 | } else if (delete_file.content == EQUALITY_DELETE) { |
205 | 0 | ++equality_delete_file_count; |
206 | 0 | } else if (delete_file.content == DELETION_VECTOR) { |
207 | 0 | ++deletion_vector_file_count; |
208 | 0 | } |
209 | 0 | } |
210 | 0 | } |
211 | |
|
212 | 0 | std::ostringstream equality_filters; |
213 | 0 | equality_filters << "["; |
214 | 0 | for (size_t idx = 0; idx < _equality_delete_filters.size(); ++idx) { |
215 | 0 | if (idx > 0) { |
216 | 0 | equality_filters << ", "; |
217 | 0 | } |
218 | 0 | const auto& filter = _equality_delete_filters[idx]; |
219 | 0 | equality_filters << "EqualityDeleteFilter{field_ids=" |
220 | 0 | << join_values_for_debug(filter.field_ids) << ", key_types=["; |
221 | 0 | for (size_t type_idx = 0; type_idx < filter.key_types.size(); ++type_idx) { |
222 | 0 | if (type_idx > 0) { |
223 | 0 | equality_filters << ", "; |
224 | 0 | } |
225 | 0 | equality_filters << (filter.key_types[type_idx] == nullptr |
226 | 0 | ? "null" |
227 | 0 | : filter.key_types[type_idx]->get_name()); |
228 | 0 | } |
229 | 0 | equality_filters << "], delete_block_rows=" << filter.delete_block.rows() |
230 | 0 | << ", delete_block_columns=" << filter.delete_block.columns() << "}"; |
231 | 0 | } |
232 | 0 | equality_filters << "]"; |
233 | |
|
234 | 0 | std::ostringstream out; |
235 | 0 | out << "IcebergTableReader{base=" << format::TableReader::debug_string() |
236 | 0 | << ", iceberg_params=" << iceberg_params_debug_string(_iceberg_params) |
237 | 0 | << ", row_lineage_first_row_id=" << _row_lineage_columns.first_row_id |
238 | 0 | << ", row_lineage_last_updated_sequence_number=" |
239 | 0 | << _row_lineage_columns.last_updated_sequence_number |
240 | 0 | << ", need_row_lineage_row_id=" << _need_row_lineage_row_id() |
241 | 0 | << ", need_iceberg_rowid=" << _need_iceberg_rowid() |
242 | 0 | << ", row_position_block_position=" << _row_position_block_position |
243 | 0 | << ", delete_predicates_initialized=" << _delete_predicates_initialized |
244 | 0 | << ", position_delete_file_count=" << position_delete_file_count |
245 | 0 | << ", equality_delete_file_count=" << equality_delete_file_count |
246 | 0 | << ", deletion_vector_file_count=" << deletion_vector_file_count |
247 | 0 | << ", position_delete_rows_storage_count=" << _position_delete_rows_storage.size() |
248 | 0 | << ", equality_delete_filter_count=" << _equality_delete_filters.size() |
249 | 0 | << ", equality_delete_filters=" << equality_filters.str() << "}"; |
250 | 0 | return out.str(); |
251 | 0 | } |
252 | | |
253 | 19 | Status IcebergTableReader::materialize_virtual_columns(Block* table_block) { |
254 | 56 | for (size_t column_idx = 0; column_idx < _data_reader.column_mapper->mappings().size(); |
255 | 37 | ++column_idx) { |
256 | 37 | const auto& mapping = _data_reader.column_mapper->mappings()[column_idx]; |
257 | 37 | switch (mapping.virtual_column_type) { |
258 | 9 | case format::TableVirtualColumnType::ROW_ID: |
259 | 9 | RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block, column_idx)); |
260 | 9 | break; |
261 | 9 | case format::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER: |
262 | 8 | RETURN_IF_ERROR( |
263 | 8 | _materialize_row_lineage_last_updated_sequence_number(table_block, column_idx)); |
264 | 8 | break; |
265 | 8 | case format::TableVirtualColumnType::ICEBERG_ROWID: |
266 | 1 | RETURN_IF_ERROR(_materialize_iceberg_rowid(table_block, column_idx)); |
267 | 1 | break; |
268 | 19 | case format::TableVirtualColumnType::INVALID: |
269 | 19 | break; |
270 | 37 | } |
271 | 37 | } |
272 | 19 | return Status::OK(); |
273 | 19 | } |
274 | | |
275 | 20 | Status IcebergTableReader::customize_file_scan_request(format::FileScanRequest* file_request) { |
276 | 20 | RETURN_IF_ERROR(TableReader::customize_file_scan_request(file_request)); |
277 | 20 | if ((_row_lineage_columns.first_row_id >= 0 && _need_row_lineage_row_id()) || |
278 | 20 | _need_iceberg_rowid()) { |
279 | 9 | RETURN_IF_ERROR(_append_row_position_output_column(file_request)); |
280 | 9 | } |
281 | 20 | RETURN_IF_ERROR(_append_equality_delete_predicates(file_request)); |
282 | 20 | return Status::OK(); |
283 | 20 | } |
284 | | |
285 | 19 | bool IcebergTableReader::_supports_aggregate_pushdown(TPushAggOp::type agg_type) const { |
286 | 19 | if (!TableReader::_supports_aggregate_pushdown(agg_type)) { |
287 | 18 | return false; |
288 | 18 | } |
289 | 1 | return _equality_delete_filters.empty(); |
290 | 19 | } |
291 | | |
292 | | Status IcebergTableReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, |
293 | | DeleteFileDesc* desc, |
294 | 22 | bool* has_delete_file) { |
295 | 22 | DORIS_CHECK(desc != nullptr); |
296 | 22 | DORIS_CHECK(has_delete_file != nullptr); |
297 | 22 | *has_delete_file = false; |
298 | 22 | if (!t_desc.__isset.iceberg_params) { |
299 | 2 | return Status::OK(); |
300 | 2 | } |
301 | 20 | const auto& iceberg_params = t_desc.iceberg_params; |
302 | 20 | if (!iceberg_params.__isset.format_version || |
303 | 20 | iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION || |
304 | 20 | !iceberg_params.__isset.delete_files || iceberg_params.delete_files.empty()) { |
305 | 8 | return Status::OK(); |
306 | 8 | } |
307 | | |
308 | 12 | const TIcebergDeleteFileDesc* deletion_vector = nullptr; |
309 | 14 | for (const auto& delete_file : iceberg_params.delete_files) { |
310 | 14 | if (!delete_file.__isset.content || delete_file.content != DELETION_VECTOR) { |
311 | 8 | continue; |
312 | 8 | } |
313 | 6 | if (deletion_vector != nullptr) { |
314 | 1 | return Status::DataQualityError("This iceberg data file has multiple DVs."); |
315 | 1 | } |
316 | 5 | deletion_vector = &delete_file; |
317 | 5 | } |
318 | 11 | if (deletion_vector == nullptr) { |
319 | 7 | return Status::OK(); |
320 | 7 | } |
321 | 4 | if (!deletion_vector->__isset.content_offset || |
322 | 4 | !deletion_vector->__isset.content_size_in_bytes) { |
323 | 0 | return Status::InternalError("Deletion vector is missing content offset or length"); |
324 | 0 | } |
325 | | |
326 | 4 | desc->key = _iceberg_delete_vector_cache_key(*deletion_vector); |
327 | 4 | desc->path = deletion_vector->path; |
328 | 4 | desc->start_offset = deletion_vector->content_offset; |
329 | 4 | desc->size = deletion_vector->content_size_in_bytes; |
330 | 4 | desc->file_size = -1; |
331 | 4 | desc->format = DeleteFileDesc::Format::ICEBERG; |
332 | 4 | *has_delete_file = true; |
333 | 4 | return Status::OK(); |
334 | 4 | } |
335 | | |
336 | 20 | Status IcebergTableReader::_init_delete_predicates(const TTableFormatFileDesc& t_desc) { |
337 | 20 | if (!t_desc.__isset.iceberg_params || _delete_predicates_initialized) { |
338 | 2 | _delete_predicates_initialized = true; |
339 | 2 | return Status::OK(); |
340 | 2 | } |
341 | 18 | const auto& iceberg_params = t_desc.iceberg_params; |
342 | 18 | if (!iceberg_params.__isset.format_version || |
343 | 18 | iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION || |
344 | 18 | !iceberg_params.__isset.delete_files || iceberg_params.delete_files.empty()) { |
345 | 8 | _delete_predicates_initialized = true; |
346 | 8 | return Status::OK(); |
347 | 8 | } |
348 | | |
349 | 10 | std::vector<TIcebergDeleteFileDesc> position_delete_files; |
350 | 10 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; |
351 | 11 | for (const auto& delete_file : iceberg_params.delete_files) { |
352 | 11 | if (!delete_file.__isset.content) { |
353 | 0 | continue; |
354 | 0 | } |
355 | 11 | if (delete_file.content == POSITION_DELETE) { |
356 | 6 | position_delete_files.push_back(delete_file); |
357 | 6 | } else if (delete_file.content == EQUALITY_DELETE) { |
358 | 2 | equality_delete_files.push_back(delete_file); |
359 | 2 | } |
360 | 11 | } |
361 | | // `_delete_rows != nullptr` means DeleteVector is parsed |
362 | 10 | if (_delete_rows != nullptr) { |
363 | 3 | _position_delete_rows_storage = *_delete_rows; |
364 | 3 | _delete_rows = &_position_delete_rows_storage; |
365 | 3 | } |
366 | | // Combine position delete rows from both deletion vector and position delete files, and |
367 | | // initialize equality delete predicates. Position delete files contain row positions of |
368 | | // deleted rows, which can be directly added to `_delete_rows`. Equality delete files contain |
369 | | // values of deleted rows, which require reading the files and building predicates for later |
370 | | // filtering. |
371 | 10 | if (!position_delete_files.empty()) { |
372 | 6 | RETURN_IF_ERROR(_init_position_delete_rows(position_delete_files)); |
373 | 6 | } |
374 | 10 | if (!equality_delete_files.empty()) { |
375 | 2 | RETURN_IF_ERROR(_init_equality_delete_predicates(equality_delete_files)); |
376 | 2 | } |
377 | | |
378 | 10 | _delete_predicates_initialized = true; |
379 | 10 | return Status::OK(); |
380 | 10 | } |
381 | | |
382 | | std::string IcebergTableReader::_iceberg_delete_vector_cache_key( |
383 | 4 | const TIcebergDeleteFileDesc& delete_file) { |
384 | 4 | const std::string key_prefix = "iceberg_dv:"; |
385 | 4 | std::string key; |
386 | 4 | key.resize(key_prefix.size() + delete_file.path.size() + sizeof(delete_file.content_offset) + |
387 | 4 | sizeof(delete_file.content_size_in_bytes)); |
388 | 4 | char* data = key.data(); |
389 | 4 | memcpy(data, key_prefix.data(), key_prefix.size()); |
390 | 4 | data += key_prefix.size(); |
391 | 4 | memcpy(data, delete_file.path.data(), delete_file.path.size()); |
392 | 4 | data += delete_file.path.size(); |
393 | 4 | memcpy(data, &delete_file.content_offset, sizeof(delete_file.content_offset)); |
394 | 4 | data += sizeof(delete_file.content_offset); |
395 | 4 | memcpy(data, &delete_file.content_size_in_bytes, sizeof(delete_file.content_size_in_bytes)); |
396 | 4 | return key; |
397 | 4 | } |
398 | | |
399 | | std::shared_ptr<io::FileSystemProperties> IcebergTableReader::_delete_file_system_properties( |
400 | 8 | const TFileScanRangeParams& scan_params) { |
401 | 8 | auto system_properties = std::make_shared<io::FileSystemProperties>(); |
402 | 8 | system_properties->system_type = |
403 | 8 | scan_params.__isset.file_type ? scan_params.file_type : TFileType::FILE_LOCAL; |
404 | 8 | system_properties->properties = scan_params.properties; |
405 | 8 | system_properties->hdfs_params = scan_params.hdfs_params; |
406 | 8 | if (scan_params.__isset.broker_addresses) { |
407 | 0 | system_properties->broker_addresses.assign(scan_params.broker_addresses.begin(), |
408 | 0 | scan_params.broker_addresses.end()); |
409 | 0 | } |
410 | 8 | return system_properties; |
411 | 8 | } |
412 | | |
413 | | std::unique_ptr<io::FileDescription> IcebergTableReader::_delete_file_description( |
414 | 8 | const TFileRangeDesc& range) { |
415 | 8 | auto file_description = std::make_unique<io::FileDescription>(); |
416 | 8 | file_description->path = range.path; |
417 | 8 | file_description->file_size = range.__isset.file_size ? range.file_size : -1; |
418 | 8 | file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0; |
419 | 8 | file_description->range_size = range.__isset.size ? range.size : -1; |
420 | 8 | if (range.__isset.fs_name) { |
421 | 0 | file_description->fs_name = range.fs_name; |
422 | 0 | } |
423 | 8 | return file_description; |
424 | 8 | } |
425 | | |
426 | 7 | std::string IcebergTableReader::_data_file_path() const { |
427 | 7 | if (_iceberg_params.has_value() && _iceberg_params->__isset.original_file_path) { |
428 | 6 | return _iceberg_params->original_file_path; |
429 | 6 | } |
430 | 1 | DORIS_CHECK(_current_task != nullptr); |
431 | 1 | DORIS_CHECK(_current_task->data_file != nullptr); |
432 | 1 | return _current_task->data_file->path; |
433 | 7 | } |
434 | | |
435 | 9 | Status IcebergTableReader::_append_row_position_output_column(format::FileScanRequest* request) { |
436 | 9 | const auto row_position_column_id = format::LocalColumnId(format::ROW_POSITION_COLUMN_ID); |
437 | 9 | _append_file_scan_column(request, row_position_column_id, &request->non_predicate_columns); |
438 | 9 | _row_position_block_position = request->local_positions.at(row_position_column_id).value(); |
439 | 9 | return Status::OK(); |
440 | 9 | } |
441 | | |
442 | 20 | Status IcebergTableReader::_append_equality_delete_predicates(format::FileScanRequest* request) { |
443 | 20 | DORIS_CHECK(request != nullptr); |
444 | 20 | for (const auto& filter : _equality_delete_filters) { |
445 | 2 | auto delete_predicate = |
446 | 2 | std::make_shared<EqualityDeletePredicate>(filter.delete_block, filter.field_ids); |
447 | 2 | DCHECK_EQ(filter.field_ids.size(), filter.key_types.size()); |
448 | 4 | for (size_t idx = 0; idx < filter.field_ids.size(); ++idx) { |
449 | 2 | const int field_id = filter.field_ids[idx]; |
450 | 2 | auto field_it = std::ranges::find_if( |
451 | 2 | _data_reader.file_schema, [field_id](const format::ColumnDefinition& field) { |
452 | 2 | return field.has_identifier_field_id() && |
453 | 2 | field.get_identifier_field_id() == field_id; |
454 | 2 | }); |
455 | 2 | if (field_it == _data_reader.file_schema.end()) { |
456 | 0 | return Status::InternalError( |
457 | 0 | "Can not find equality delete column field id {} in data file schema", |
458 | 0 | field_id); |
459 | 0 | } |
460 | 2 | const auto field_column_id = format::LocalColumnId(field_it->file_local_id()); |
461 | 2 | _append_file_scan_column(request, field_column_id, &request->predicate_columns); |
462 | 2 | const auto block_position = request->local_positions.at(field_column_id).value(); |
463 | 2 | auto slot = VSlotRef::create_shared(cast_set<int>(block_position), |
464 | 2 | cast_set<int>(block_position), -1, field_it->type, |
465 | 2 | field_it->name); |
466 | 2 | if (field_it->type->equals(*filter.key_types[idx])) { |
467 | 1 | delete_predicate->add_child(std::move(slot)); |
468 | 1 | } else { |
469 | 1 | auto cast_expr = Cast::create_shared(filter.key_types[idx]); |
470 | 1 | cast_expr->add_child(std::move(slot)); |
471 | 1 | delete_predicate->add_child(std::move(cast_expr)); |
472 | 1 | } |
473 | 2 | } |
474 | 2 | request->delete_conjuncts.push_back( |
475 | 2 | VExprContext::create_shared(std::move(delete_predicate))); |
476 | 2 | } |
477 | 20 | return Status::OK(); |
478 | 20 | } |
479 | | |
480 | | Status IcebergTableReader::_read_parquet_position_delete_file( |
481 | | const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& scan_params, |
482 | 6 | IcebergDeleteFileIOContext* delete_io_ctx, PositionDeleteRowsCollector* collector) { |
483 | 6 | if (!delete_file.__isset.file_format) { |
484 | 0 | return Status::InternalError("Iceberg position delete file is missing file format"); |
485 | 0 | } |
486 | 6 | if (delete_file.file_format == TFileFormatType::FORMAT_ORC) { |
487 | 0 | return Status::NotSupported("Iceberg ORC position delete file is not supported"); |
488 | 0 | } |
489 | 6 | if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) { |
490 | 0 | return Status::NotSupported("Unsupported Iceberg delete file format {}", |
491 | 0 | delete_file.file_format); |
492 | 0 | } |
493 | | |
494 | 6 | auto delete_range = build_iceberg_delete_file_range(delete_file.path); |
495 | 6 | if (_current_task != nullptr && _current_task->data_file != nullptr && |
496 | 6 | !_current_task->data_file->fs_name.empty()) { |
497 | 0 | delete_range.__set_fs_name(_current_task->data_file->fs_name); |
498 | 0 | } |
499 | 6 | auto system_properties = _delete_file_system_properties(scan_params); |
500 | 6 | auto file_description = _delete_file_description(delete_range); |
501 | 6 | std::shared_ptr<io::IOContext> io_ctx(&delete_io_ctx->io_ctx, [](io::IOContext*) {}); |
502 | 6 | format::parquet::ParquetReader reader(system_properties, file_description, io_ctx, |
503 | 6 | _scanner_profile); |
504 | 6 | RETURN_IF_ERROR(reader.init(_runtime_state)); |
505 | | |
506 | 6 | std::vector<format::ColumnDefinition> schema; |
507 | 6 | RETURN_IF_ERROR(reader.get_schema(&schema)); |
508 | 6 | format::ColumnDefinition* file_path_field = nullptr; |
509 | 6 | format::ColumnDefinition* pos_field = nullptr; |
510 | 12 | for (auto& field : schema) { |
511 | 12 | if (field.name == ICEBERG_FILE_PATH) { |
512 | 6 | file_path_field = &field; |
513 | 6 | } else if (field.name == ICEBERG_ROW_POS) { |
514 | 6 | pos_field = &field; |
515 | 6 | } |
516 | 12 | } |
517 | 6 | if (file_path_field == nullptr || pos_field == nullptr) { |
518 | 0 | return Status::InternalError("Position delete parquet file is missing required columns"); |
519 | 0 | } |
520 | | |
521 | 6 | auto request = std::make_shared<format::FileScanRequest>(); |
522 | 6 | request->non_predicate_columns = { |
523 | 6 | format::LocalColumnIndex::top_level( |
524 | 6 | format::LocalColumnId(file_path_field->file_local_id())), |
525 | 6 | format::LocalColumnIndex::top_level(format::LocalColumnId(pos_field->file_local_id()))}; |
526 | 6 | request->local_positions = { |
527 | 6 | {format::LocalColumnId(file_path_field->file_local_id()), |
528 | 6 | format::LocalIndex(ICEBERG_FILE_PATH_BLOCK_POSITION)}, |
529 | 6 | {format::LocalColumnId(pos_field->file_local_id()), |
530 | 6 | format::LocalIndex(ICEBERG_ROW_POS_BLOCK_POSITION)}, |
531 | 6 | }; |
532 | 6 | RETURN_IF_ERROR(reader.open(request)); |
533 | | |
534 | 6 | bool eof = false; |
535 | 6 | auto build_position_delete_block = [](const format::ColumnDefinition& file_path_field, |
536 | 12 | const format::ColumnDefinition& pos_field) -> Block { |
537 | 12 | Block block; |
538 | 12 | block.insert( |
539 | 12 | {file_path_field.type->create_column(), file_path_field.type, ICEBERG_FILE_PATH}); |
540 | 12 | block.insert({pos_field.type->create_column(), pos_field.type, ICEBERG_ROW_POS}); |
541 | 12 | return block; |
542 | 12 | }; |
543 | 18 | while (!eof) { |
544 | 12 | Block block = build_position_delete_block(*file_path_field, *pos_field); |
545 | 12 | size_t read_rows = 0; |
546 | 12 | RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof)); |
547 | 12 | RETURN_IF_ERROR(collector->collect(block, read_rows)); |
548 | 12 | } |
549 | 6 | return reader.close(); |
550 | 6 | } |
551 | | |
552 | | Status IcebergTableReader::_init_position_delete_rows( |
553 | 6 | const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
554 | 6 | TFileScanRangeParams delete_scan_params = |
555 | 6 | _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params; |
556 | 6 | format::DeleteRows position_delete_rows; |
557 | 6 | IcebergDeleteFileIOContext delete_io_ctx(_runtime_state); |
558 | 6 | PositionDeleteRowsCollector collector(_data_file_path(), &position_delete_rows); |
559 | 6 | for (const auto& delete_file : delete_files) { |
560 | 6 | RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file, delete_scan_params, |
561 | 6 | &delete_io_ctx, &collector)); |
562 | 6 | } |
563 | 6 | if (position_delete_rows.empty()) { |
564 | 0 | return Status::OK(); |
565 | 0 | } |
566 | | // Position delete files and deletion vectors both become row-position deletes for the |
567 | | // common TableReader DeletePredicate path. Keep the merged rows in a member vector because |
568 | | // DeletePredicate stores a reference to the vector used by _delete_rows. |
569 | 6 | _position_delete_rows_storage.insert(_position_delete_rows_storage.end(), |
570 | 6 | position_delete_rows.begin(), position_delete_rows.end()); |
571 | 6 | std::sort(_position_delete_rows_storage.begin(), _position_delete_rows_storage.end()); |
572 | 6 | _position_delete_rows_storage.erase( |
573 | 6 | std::unique(_position_delete_rows_storage.begin(), _position_delete_rows_storage.end()), |
574 | 6 | _position_delete_rows_storage.end()); |
575 | 6 | _delete_rows = &_position_delete_rows_storage; |
576 | 6 | return Status::OK(); |
577 | 6 | } |
578 | | |
579 | | Status IcebergTableReader::_init_equality_delete_predicates( |
580 | 2 | const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
581 | 2 | TFileScanRangeParams delete_scan_params = |
582 | 2 | _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params; |
583 | 2 | IcebergDeleteFileIOContext delete_io_ctx(_runtime_state); |
584 | 2 | for (const auto& delete_file : delete_files) { |
585 | 2 | RETURN_IF_ERROR(_read_parquet_equality_delete_file(delete_file, delete_scan_params, |
586 | 2 | &delete_io_ctx)); |
587 | 2 | } |
588 | 2 | return Status::OK(); |
589 | 2 | } |
590 | | |
591 | | Status IcebergTableReader::_read_parquet_equality_delete_file( |
592 | | const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& scan_params, |
593 | 2 | IcebergDeleteFileIOContext* delete_io_ctx) { |
594 | 2 | if (!delete_file.__isset.file_format) { |
595 | 0 | return Status::InternalError("Iceberg equality delete file is missing file format"); |
596 | 0 | } |
597 | 2 | if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) { |
598 | 0 | return Status::NotSupported("Unsupported Iceberg equality delete file format {}", |
599 | 0 | delete_file.file_format); |
600 | 0 | } |
601 | 2 | if (!delete_file.__isset.field_ids || delete_file.field_ids.empty()) { |
602 | 0 | return Status::InternalError("Iceberg equality delete file is missing field ids"); |
603 | 0 | } |
604 | | |
605 | 2 | auto delete_range = build_iceberg_delete_file_range(delete_file.path); |
606 | 2 | if (_current_task != nullptr && _current_task->data_file != nullptr && |
607 | 2 | !_current_task->data_file->fs_name.empty()) { |
608 | 0 | delete_range.__set_fs_name(_current_task->data_file->fs_name); |
609 | 0 | } |
610 | 2 | auto system_properties = _delete_file_system_properties(scan_params); |
611 | 2 | auto file_description = _delete_file_description(delete_range); |
612 | 2 | std::shared_ptr<io::IOContext> io_ctx(&delete_io_ctx->io_ctx, [](io::IOContext*) {}); |
613 | 2 | format::parquet::ParquetReader reader(system_properties, file_description, io_ctx, |
614 | 2 | _scanner_profile); |
615 | 2 | RETURN_IF_ERROR(reader.init(_runtime_state)); |
616 | | |
617 | 2 | std::vector<format::ColumnDefinition> schema; |
618 | 2 | RETURN_IF_ERROR(reader.get_schema(&schema)); |
619 | 2 | std::vector<format::ColumnDefinition> delete_fields; |
620 | 2 | std::vector<int> delete_field_ids; |
621 | 2 | std::vector<DataTypePtr> delete_key_types; |
622 | 2 | for (const auto field_id : delete_file.field_ids) { |
623 | 2 | auto field_it = std::find_if(schema.begin(), schema.end(), |
624 | 2 | [field_id](const format::ColumnDefinition& field) { |
625 | 2 | return field.has_identifier_field_id() && |
626 | 2 | field_id == field.get_identifier_field_id(); |
627 | 2 | }); |
628 | 2 | if (field_it == schema.end()) { |
629 | 0 | return Status::InternalError("Can not find field id {} in equality delete file {}", |
630 | 0 | field_id, delete_file.path); |
631 | 0 | } |
632 | 2 | if (!field_it->children.empty()) { |
633 | 0 | return Status::NotSupported( |
634 | 0 | "Iceberg equality delete does not support complex column {}", field_it->name); |
635 | 0 | } |
636 | 2 | delete_fields.push_back(*field_it); |
637 | 2 | delete_field_ids.push_back(field_id); |
638 | 2 | delete_key_types.push_back(field_it->type); |
639 | 2 | } |
640 | | |
641 | 2 | auto request = std::make_shared<format::FileScanRequest>(); |
642 | 4 | for (size_t idx = 0; idx < delete_fields.size(); ++idx) { |
643 | 2 | const auto local_column_id = format::LocalColumnId(delete_fields[idx].file_local_id()); |
644 | 2 | request->non_predicate_columns.push_back( |
645 | 2 | format::LocalColumnIndex::top_level(local_column_id)); |
646 | 2 | request->local_positions.emplace(local_column_id, format::LocalIndex(idx)); |
647 | 2 | } |
648 | 2 | RETURN_IF_ERROR(reader.open(request)); |
649 | | |
650 | 2 | auto build_equality_delete_block = |
651 | 6 | [](const std::vector<format::ColumnDefinition> fields) -> Block { |
652 | 6 | Block block; |
653 | 6 | for (const auto& field : fields) { |
654 | 6 | block.insert({field.type->create_column(), field.type, field.name}); |
655 | 6 | } |
656 | 6 | return block; |
657 | 6 | }; |
658 | 2 | Block delete_block = build_equality_delete_block(delete_fields); |
659 | 2 | MutableBlock mutable_delete_block(std::move(delete_block)); |
660 | 2 | bool eof = false; |
661 | 6 | while (!eof) { |
662 | 4 | Block block = build_equality_delete_block(delete_fields); |
663 | 4 | size_t read_rows = 0; |
664 | 4 | RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof)); |
665 | 4 | if (read_rows > 0) { |
666 | 2 | RETURN_IF_ERROR(mutable_delete_block.merge(block)); |
667 | 2 | } |
668 | 4 | } |
669 | 2 | RETURN_IF_ERROR(reader.close()); |
670 | 2 | delete_block = mutable_delete_block.to_block(); |
671 | 2 | _equality_delete_filters.push_back( |
672 | 2 | EqualityDeleteFilter {.field_ids = std::move(delete_field_ids), |
673 | 2 | .key_types = std::move(delete_key_types), |
674 | 2 | .delete_block = std::move(delete_block)}); |
675 | 2 | return Status::OK(); |
676 | 2 | } |
677 | | |
678 | 9 | Status IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block, size_t column_idx) { |
679 | 9 | if (_row_lineage_columns.first_row_id < 0) { |
680 | 2 | return Status::OK(); |
681 | 2 | } |
682 | 7 | DORIS_CHECK(_row_position_block_position < _data_reader.block_template.columns()); |
683 | 7 | const auto& row_position_column = assert_cast<const ColumnInt64&>( |
684 | 7 | *_data_reader.block_template.get_by_position(_row_position_block_position).column); |
685 | 7 | DORIS_CHECK(row_position_column.size() == table_block->rows()); |
686 | 7 | auto column = IColumn::mutate( |
687 | 7 | table_block->get_by_position(column_idx).column->convert_to_full_column_if_const()); |
688 | 7 | auto* nullable_column = assert_cast<ColumnNullable*>(column.get()); |
689 | 7 | auto& null_map = nullable_column->get_null_map_data(); |
690 | 7 | auto& data = assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
691 | 7 | DORIS_CHECK(null_map.size() == row_position_column.size()); |
692 | 7 | DORIS_CHECK(data.size() == row_position_column.size()); |
693 | 23 | for (size_t row = 0; row < row_position_column.size(); ++row) { |
694 | 16 | if (null_map[row]) { |
695 | 10 | null_map[row] = 0; |
696 | 10 | data[row] = _row_lineage_columns.first_row_id + row_position_column.get_element(row); |
697 | 10 | } |
698 | 16 | } |
699 | 7 | table_block->replace_by_position(column_idx, std::move(column)); |
700 | 7 | return Status::OK(); |
701 | 9 | } |
702 | | |
703 | 1 | Status IcebergTableReader::_materialize_iceberg_rowid(Block* table_block, size_t column_idx) { |
704 | 1 | DORIS_CHECK(_row_position_block_position < _data_reader.block_template.columns()); |
705 | 1 | const auto& row_position_column = assert_cast<const ColumnInt64&>( |
706 | 1 | *_data_reader.block_template.get_by_position(_row_position_block_position).column); |
707 | 1 | DORIS_CHECK(row_position_column.size() == table_block->rows()); |
708 | | |
709 | 1 | const auto& type = table_block->get_by_position(column_idx).type; |
710 | 1 | auto column = type->create_column(); |
711 | 1 | auto* nullable_column = check_and_get_column<ColumnNullable>(column.get()); |
712 | 1 | auto* struct_column = nullable_column != nullptr |
713 | 1 | ? check_and_get_column<ColumnStruct>( |
714 | 1 | nullable_column->get_nested_column_ptr().get()) |
715 | 1 | : check_and_get_column<ColumnStruct>(column.get()); |
716 | 1 | DORIS_CHECK(struct_column != nullptr); |
717 | 1 | DORIS_CHECK(struct_column->tuple_size() >= 4); |
718 | | |
719 | 1 | const auto rows = row_position_column.size(); |
720 | 1 | const auto file_path = _data_file_path(); |
721 | 1 | const int32_t partition_spec_id = |
722 | 1 | _iceberg_params.has_value() && _iceberg_params->__isset.partition_spec_id |
723 | 1 | ? _iceberg_params->partition_spec_id |
724 | 1 | : 0; |
725 | 1 | const std::string partition_data_json = |
726 | 1 | _iceberg_params.has_value() && _iceberg_params->__isset.partition_data_json |
727 | 1 | ? _iceberg_params->partition_data_json |
728 | 1 | : ""; |
729 | | |
730 | 1 | auto& file_path_column = struct_column->get_column(0); |
731 | 1 | auto& row_pos_column = struct_column->get_column(1); |
732 | 1 | auto& spec_id_column = struct_column->get_column(2); |
733 | 1 | auto& partition_data_column = struct_column->get_column(3); |
734 | 1 | file_path_column.reserve(rows); |
735 | 1 | row_pos_column.reserve(rows); |
736 | 1 | spec_id_column.reserve(rows); |
737 | 1 | partition_data_column.reserve(rows); |
738 | 3 | for (size_t row = 0; row < rows; ++row) { |
739 | 2 | file_path_column.insert_data(file_path.data(), file_path.size()); |
740 | 2 | const int64_t row_pos = row_position_column.get_element(row); |
741 | 2 | row_pos_column.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos)); |
742 | 2 | spec_id_column.insert_data(reinterpret_cast<const char*>(&partition_spec_id), |
743 | 2 | sizeof(partition_spec_id)); |
744 | 2 | partition_data_column.insert_data(partition_data_json.data(), partition_data_json.size()); |
745 | 2 | } |
746 | 1 | if (nullable_column != nullptr) { |
747 | 1 | nullable_column->get_null_map_data().resize_fill(rows, 0); |
748 | 1 | } |
749 | 1 | table_block->replace_by_position(column_idx, std::move(column)); |
750 | 1 | return Status::OK(); |
751 | 1 | } |
752 | | |
753 | | Status IcebergTableReader::_materialize_row_lineage_last_updated_sequence_number( |
754 | 8 | Block* table_block, size_t column_idx) { |
755 | 8 | if (_row_lineage_columns.last_updated_sequence_number < 0) { |
756 | 2 | return Status::OK(); |
757 | 2 | } |
758 | 6 | auto column = IColumn::mutate( |
759 | 6 | table_block->get_by_position(column_idx).column->convert_to_full_column_if_const()); |
760 | 6 | auto* nullable_column = assert_cast<ColumnNullable*>(column.get()); |
761 | 6 | auto& null_map = nullable_column->get_null_map_data(); |
762 | 6 | auto& data = assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
763 | 6 | DORIS_CHECK(null_map.size() == table_block->rows()); |
764 | 6 | DORIS_CHECK(data.size() == table_block->rows()); |
765 | 20 | for (size_t row = 0; row < table_block->rows(); ++row) { |
766 | 14 | if (null_map[row]) { |
767 | 8 | null_map[row] = 0; |
768 | 8 | data[row] = _row_lineage_columns.last_updated_sequence_number; |
769 | 8 | } |
770 | 14 | } |
771 | 6 | table_block->replace_by_position(column_idx, std::move(column)); |
772 | 6 | return Status::OK(); |
773 | 8 | } |
774 | | |
775 | 8 | bool IcebergTableReader::_need_row_lineage_row_id() const { |
776 | 8 | if (_data_reader.column_mapper != nullptr) { |
777 | 7 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
778 | 7 | if (mapping.virtual_column_type == format::TableVirtualColumnType::ROW_ID) { |
779 | 7 | return true; |
780 | 7 | } |
781 | 7 | } |
782 | 7 | } |
783 | 1 | return std::ranges::any_of(_projected_columns, is_projected_row_lineage_row_id); |
784 | 8 | } |
785 | | |
786 | 12 | bool IcebergTableReader::_need_iceberg_rowid() const { |
787 | 12 | if (_data_reader.column_mapper != nullptr) { |
788 | 16 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
789 | 16 | if (mapping.virtual_column_type == format::TableVirtualColumnType::ICEBERG_ROWID) { |
790 | 1 | return true; |
791 | 1 | } |
792 | 16 | } |
793 | 12 | } |
794 | 11 | return std::ranges::any_of(_projected_columns, is_projected_iceberg_rowid); |
795 | 12 | } |
796 | | |
797 | | } // namespace doris::format::iceberg |