be/src/format/table/iceberg_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/table/iceberg_reader.h" |
19 | | |
20 | | #include <gen_cpp/Descriptors_types.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <gen_cpp/PlanNodes_types.h> |
23 | | #include <gen_cpp/parquet_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <parallel_hashmap/phmap.h> |
26 | | #include <rapidjson/document.h> |
27 | | |
28 | | #include <algorithm> |
29 | | #include <cstring> |
30 | | #include <functional> |
31 | | #include <memory> |
32 | | #include <set> |
33 | | |
34 | | #include "common/compiler_util.h" // IWYU pragma: keep |
35 | | #include "common/status.h" |
36 | | #include "core/assert_cast.h" |
37 | | #include "core/block/block.h" |
38 | | #include "core/block/column_with_type_and_name.h" |
39 | | #include "core/column/column.h" |
40 | | #include "core/column/column_string.h" |
41 | | #include "core/column/column_vector.h" |
42 | | #include "core/data_type/data_type_factory.hpp" |
43 | | #include "core/data_type/define_primitive_type.h" |
44 | | #include "core/data_type/primitive_type.h" |
45 | | #include "core/string_ref.h" |
46 | | #include "exprs/aggregate/aggregate_function.h" |
47 | | #include "format/format_common.h" |
48 | | #include "format/generic_reader.h" |
49 | | #include "format/orc/vorc_reader.h" |
50 | | #include "format/parquet/schema_desc.h" |
51 | | #include "format/parquet/vparquet_column_chunk_reader.h" |
52 | | #include "format/table/deletion_vector_reader.h" |
53 | | #include "format/table/iceberg/iceberg_orc_nested_column_utils.h" |
54 | | #include "format/table/iceberg/iceberg_parquet_nested_column_utils.h" |
55 | | #include "format/table/nested_column_access_helper.h" |
56 | | #include "format/table/table_format_reader.h" |
57 | | #include "runtime/runtime_state.h" |
58 | | #include "util/coding.h" |
59 | | |
60 | | namespace cctz { |
61 | | #include "common/compile_check_begin.h" |
62 | | class time_zone; |
63 | | } // namespace cctz |
64 | | namespace doris { |
65 | | class RowDescriptor; |
66 | | class SlotDescriptor; |
67 | | class TupleDescriptor; |
68 | | |
69 | | namespace io { |
70 | | struct IOContext; |
71 | | } // namespace io |
72 | | class VExprContext; |
73 | | } // namespace doris |
74 | | |
75 | | namespace doris { |
76 | | const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id"; |
77 | | |
78 | | IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, |
79 | | RuntimeProfile* profile, RuntimeState* state, |
80 | | const TFileScanRangeParams& params, |
81 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, |
82 | | io::IOContext* io_ctx, FileMetaCache* meta_cache) |
83 | 16 | : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx, |
84 | 16 | meta_cache), |
85 | 16 | _kv_cache(kv_cache) { |
86 | 16 | static const char* iceberg_profile = "IcebergProfile"; |
87 | 16 | ADD_TIMER(_profile, iceberg_profile); |
88 | 16 | _iceberg_profile.num_delete_files = |
89 | 16 | ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile); |
90 | 16 | _iceberg_profile.num_delete_rows = |
91 | 16 | ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile); |
92 | 16 | _iceberg_profile.delete_files_read_time = |
93 | 16 | ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); |
94 | 16 | _iceberg_profile.delete_rows_sort_time = |
95 | 16 | ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); |
96 | 16 | _iceberg_profile.parse_delete_file_time = |
97 | 16 | ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile); |
98 | 16 | } |
99 | | |
100 | 2 | Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { |
101 | 2 | RETURN_IF_ERROR(_expand_block_if_need(block)); |
102 | | |
103 | 2 | RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); |
104 | | |
105 | 2 | if (_equality_delete_impls.size() > 0) { |
106 | 0 | std::unique_ptr<IColumn::Filter> filter = |
107 | 0 | std::make_unique<IColumn::Filter>(block->rows(), 1); |
108 | 0 | for (auto& equality_delete_impl : _equality_delete_impls) { |
109 | 0 | RETURN_IF_ERROR(equality_delete_impl->filter_data_block( |
110 | 0 | block, _col_name_to_block_idx, _id_to_block_column_name, *filter)); |
111 | 0 | } |
112 | 0 | Block::filter_block_internal(block, *filter, block->columns()); |
113 | 0 | } |
114 | | |
115 | 2 | *read_rows = block->rows(); |
116 | 2 | return _shrink_block_if_need(block); |
117 | 2 | } |
118 | | |
119 | 2 | Status IcebergTableReader::init_row_filters() { |
120 | | // We get the count value by doris's be, so we don't need to read the delete file |
121 | 2 | if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) { |
122 | 0 | return Status::OK(); |
123 | 0 | } |
124 | | |
125 | 2 | const auto& table_desc = _range.table_format_params.iceberg_params; |
126 | 2 | const auto& version = table_desc.format_version; |
127 | 2 | if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { |
128 | 2 | return Status::OK(); |
129 | 2 | } |
130 | | |
131 | | // Initialize file information for $row_id generation |
132 | | // Extract from table_desc which contains current file's metadata |
133 | 0 | if (_need_row_id_column) { |
134 | 0 | std::string file_path = table_desc.original_file_path; |
135 | 0 | int32_t partition_spec_id = 0; |
136 | 0 | std::string partition_data_json; |
137 | 0 | if (table_desc.__isset.partition_spec_id) { |
138 | 0 | partition_spec_id = table_desc.partition_spec_id; |
139 | 0 | } |
140 | 0 | if (table_desc.__isset.partition_data_json) { |
141 | 0 | partition_data_json = table_desc.partition_data_json; |
142 | 0 | } |
143 | |
|
144 | 0 | if (auto* parquet_reader = dynamic_cast<ParquetReader*>(_file_format_reader.get())) { |
145 | 0 | parquet_reader->set_iceberg_rowid_params(file_path, partition_spec_id, |
146 | 0 | partition_data_json, _row_id_column_position); |
147 | 0 | } else if (auto* orc_reader = dynamic_cast<OrcReader*>(_file_format_reader.get())) { |
148 | 0 | orc_reader->set_iceberg_rowid_params(file_path, partition_spec_id, partition_data_json, |
149 | 0 | _row_id_column_position); |
150 | 0 | } |
151 | 0 | LOG(INFO) << "Initialized $row_id generation for file: " << file_path |
152 | 0 | << ", partition_spec_id: " << partition_spec_id; |
153 | 0 | } |
154 | |
|
155 | 0 | std::vector<TIcebergDeleteFileDesc> position_delete_files; |
156 | 0 | std::vector<TIcebergDeleteFileDesc> equality_delete_files; |
157 | 0 | std::vector<TIcebergDeleteFileDesc> deletion_vector_files; |
158 | 0 | for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { |
159 | 0 | if (desc.content == POSITION_DELETE) { |
160 | 0 | position_delete_files.emplace_back(desc); |
161 | 0 | } else if (desc.content == EQUALITY_DELETE) { |
162 | 0 | equality_delete_files.emplace_back(desc); |
163 | 0 | } else if (desc.content == DELETION_VECTOR) { |
164 | 0 | deletion_vector_files.emplace_back(desc); |
165 | 0 | } |
166 | 0 | } |
167 | |
|
168 | 0 | if (!equality_delete_files.empty()) { |
169 | 0 | RETURN_IF_ERROR(_process_equality_delete(equality_delete_files)); |
170 | 0 | _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); |
171 | 0 | } |
172 | | |
173 | 0 | if (!deletion_vector_files.empty()) { |
174 | 0 | if (deletion_vector_files.size() != 1) [[unlikely]] { |
175 | | /* |
176 | | * Deletion vectors are a binary representation of deletes for a single data file that is more efficient |
177 | | * at execution time than position delete files. Unlike equality or position delete files, there can be |
178 | | * at most one deletion vector for a given data file in a snapshot. |
179 | | */ |
180 | 0 | return Status::DataQualityError("This iceberg data file has multiple DVs."); |
181 | 0 | } |
182 | 0 | RETURN_IF_ERROR( |
183 | 0 | read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); |
184 | | |
185 | 0 | _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); |
186 | | // Readers can safely ignore position delete files if there is a DV for a data file. |
187 | 0 | } else if (!position_delete_files.empty()) { |
188 | 0 | RETURN_IF_ERROR( |
189 | 0 | _position_delete_base(table_desc.original_file_path, position_delete_files)); |
190 | 0 | _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); |
191 | 0 | } |
192 | | |
193 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); |
194 | 0 | return Status::OK(); |
195 | 0 | } |
196 | | |
197 | | void IcebergTableReader::_generate_equality_delete_block( |
198 | | Block* block, const std::vector<std::string>& equality_delete_col_names, |
199 | 0 | const std::vector<DataTypePtr>& equality_delete_col_types) { |
200 | 0 | for (int i = 0; i < equality_delete_col_names.size(); ++i) { |
201 | 0 | DataTypePtr data_type = make_nullable(equality_delete_col_types[i]); |
202 | 0 | MutableColumnPtr data_column = data_type->create_column(); |
203 | 0 | block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, |
204 | 0 | equality_delete_col_names[i])); |
205 | 0 | } |
206 | 0 | } |
207 | | |
208 | 2 | Status IcebergTableReader::_expand_block_if_need(Block* block) { |
209 | 2 | std::set<std::string> names; |
210 | 2 | auto block_names = block->get_names(); |
211 | 2 | names.insert(block_names.begin(), block_names.end()); |
212 | 2 | for (auto& col : _expand_columns) { |
213 | 0 | col.column->assume_mutable()->clear(); |
214 | 0 | if (names.contains(col.name)) { |
215 | 0 | return Status::InternalError("Wrong expand column '{}'", col.name); |
216 | 0 | } |
217 | 0 | names.insert(col.name); |
218 | 0 | (*_col_name_to_block_idx)[col.name] = static_cast<uint32_t>(block->columns()); |
219 | 0 | block->insert(col); |
220 | 0 | } |
221 | 2 | return Status::OK(); |
222 | 2 | } |
223 | | |
224 | 2 | Status IcebergTableReader::_shrink_block_if_need(Block* block) { |
225 | 2 | std::set<size_t> positions_to_erase; |
226 | 2 | for (const std::string& expand_col : _expand_col_names) { |
227 | 0 | if (!_col_name_to_block_idx->contains(expand_col)) { |
228 | 0 | return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, |
229 | 0 | block->dump_names()); |
230 | 0 | } |
231 | 0 | positions_to_erase.emplace((*_col_name_to_block_idx)[expand_col]); |
232 | 0 | } |
233 | 2 | block->erase(positions_to_erase); |
234 | 2 | for (const std::string& expand_col : _expand_col_names) { |
235 | 0 | _col_name_to_block_idx->erase(expand_col); |
236 | 0 | } |
237 | 2 | return Status::OK(); |
238 | 2 | } |
239 | | |
240 | | Status IcebergTableReader::_position_delete_base( |
241 | 0 | const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
242 | 0 | std::vector<DeleteRows*> delete_rows_array; |
243 | 0 | int64_t num_delete_rows = 0; |
244 | 0 | for (const auto& delete_file : delete_files) { |
245 | 0 | SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
246 | 0 | Status create_status = Status::OK(); |
247 | 0 | auto* delete_file_cache = _kv_cache->get<DeleteFile>( |
248 | 0 | _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* { |
249 | 0 | auto* position_delete = new DeleteFile; |
250 | 0 | TFileRangeDesc delete_file_range; |
251 | | // must use __set() method to make sure __isset is true |
252 | 0 | delete_file_range.__set_fs_name(_range.fs_name); |
253 | 0 | delete_file_range.path = delete_file.path; |
254 | 0 | delete_file_range.start_offset = 0; |
255 | 0 | delete_file_range.size = -1; |
256 | 0 | delete_file_range.file_size = -1; |
257 | | //read position delete file base on delete_file_range , generate DeleteFile , add DeleteFile to kv_cache |
258 | 0 | create_status = _read_position_delete_file(&delete_file_range, position_delete); |
259 | |
|
260 | 0 | if (!create_status) { |
261 | 0 | return nullptr; |
262 | 0 | } |
263 | | |
264 | 0 | return position_delete; |
265 | 0 | }); |
266 | 0 | if (create_status.is<ErrorCode::END_OF_FILE>()) { |
267 | 0 | continue; |
268 | 0 | } else if (!create_status.ok()) { |
269 | 0 | return create_status; |
270 | 0 | } |
271 | | |
272 | 0 | DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache); |
273 | 0 | auto get_value = [&](const auto& v) { |
274 | 0 | DeleteRows* row_ids = v.second.get(); |
275 | 0 | if (!row_ids->empty()) { |
276 | 0 | delete_rows_array.emplace_back(row_ids); |
277 | 0 | num_delete_rows += row_ids->size(); |
278 | 0 | } |
279 | 0 | }; |
280 | 0 | delete_file_map.if_contains(data_file_path, get_value); |
281 | 0 | } |
282 | | // Use a KV cache to store the delete rows corresponding to a data file path. |
283 | | // The Parquet/ORC reader holds a reference (pointer) to this cached entry. |
284 | | // This allows delete rows to be reused when a single data file is split into |
285 | | // multiple splits, avoiding excessive memory usage when delete rows are large. |
286 | 0 | if (num_delete_rows > 0) { |
287 | 0 | SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time); |
288 | 0 | _iceberg_delete_rows = |
289 | 0 | _kv_cache->get<DeleteRows>(data_file_path, |
290 | 0 | [&]() -> DeleteRows* { |
291 | 0 | auto* data_file_position_delete = new DeleteRows; |
292 | 0 | _sort_delete_rows(delete_rows_array, num_delete_rows, |
293 | 0 | *data_file_position_delete); |
294 | |
|
295 | 0 | return data_file_position_delete; |
296 | 0 | } |
297 | |
|
298 | 0 | ); |
299 | 0 | set_delete_rows(); |
300 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); |
301 | 0 | } |
302 | 0 | return Status::OK(); |
303 | 0 | } |
304 | | |
305 | | IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range( |
306 | 0 | const ColumnDictI32& file_path_column) { |
307 | 0 | IcebergTableReader::PositionDeleteRange range; |
308 | 0 | size_t read_rows = file_path_column.get_data().size(); |
309 | 0 | const int* code_path = file_path_column.get_data().data(); |
310 | 0 | const int* code_path_start = code_path; |
311 | 0 | const int* code_path_end = code_path + read_rows; |
312 | 0 | while (code_path < code_path_end) { |
313 | 0 | int code = code_path[0]; |
314 | 0 | const int* code_end = std::upper_bound(code_path, code_path_end, code); |
315 | 0 | range.data_file_path.emplace_back(file_path_column.get_value(code).to_string()); |
316 | 0 | range.range.emplace_back(code_path - code_path_start, code_end - code_path_start); |
317 | 0 | code_path = code_end; |
318 | 0 | } |
319 | 0 | return range; |
320 | 0 | } |
321 | | |
322 | | IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range( |
323 | 0 | const ColumnString& file_path_column) { |
324 | 0 | IcebergTableReader::PositionDeleteRange range; |
325 | 0 | size_t read_rows = file_path_column.size(); |
326 | 0 | size_t index = 0; |
327 | 0 | while (index < read_rows) { |
328 | 0 | StringRef data_path = file_path_column.get_data_at(index); |
329 | 0 | size_t left = index - 1; |
330 | 0 | size_t right = read_rows; |
331 | 0 | while (left + 1 != right) { |
332 | 0 | size_t mid = left + (right - left) / 2; |
333 | 0 | if (file_path_column.get_data_at(mid) > data_path) { |
334 | 0 | right = mid; |
335 | 0 | } else { |
336 | 0 | left = mid; |
337 | 0 | } |
338 | 0 | } |
339 | 0 | range.data_file_path.emplace_back(data_path.to_string()); |
340 | 0 | range.range.emplace_back(index, left + 1); |
341 | 0 | index = left + 1; |
342 | 0 | } |
343 | 0 | return range; |
344 | 0 | } |
345 | | |
346 | | /** |
347 | | * https://iceberg.apache.org/spec/#position-delete-files |
348 | | * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning. |
349 | | * Sorting by file_path allows filter pushdown by file in columnar storage formats. |
350 | | * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. |
351 | | */ |
352 | | void IcebergTableReader::_sort_delete_rows( |
353 | | const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows, |
354 | 0 | std::vector<int64_t>& result) { |
355 | 0 | if (delete_rows_array.empty()) { |
356 | 0 | return; |
357 | 0 | } |
358 | 0 | if (delete_rows_array.size() == 1) { |
359 | 0 | result.resize(num_delete_rows); |
360 | 0 | memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows); |
361 | 0 | return; |
362 | 0 | } |
363 | 0 | if (delete_rows_array.size() == 2) { |
364 | 0 | result.resize(num_delete_rows); |
365 | 0 | std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(), |
366 | 0 | delete_rows_array.back()->begin(), delete_rows_array.back()->end(), |
367 | 0 | result.begin()); |
368 | 0 | return; |
369 | 0 | } |
370 | | |
371 | 0 | using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>; |
372 | 0 | result.resize(num_delete_rows); |
373 | 0 | auto row_id_iter = result.begin(); |
374 | 0 | auto iter_end = result.end(); |
375 | 0 | std::vector<vec_pair> rows_array; |
376 | 0 | for (auto* rows : delete_rows_array) { |
377 | 0 | if (!rows->empty()) { |
378 | 0 | rows_array.emplace_back(rows->begin(), rows->end()); |
379 | 0 | } |
380 | 0 | } |
381 | 0 | size_t array_size = rows_array.size(); |
382 | 0 | while (row_id_iter != iter_end) { |
383 | 0 | int64_t min_index = 0; |
384 | 0 | int64_t min = *rows_array[0].first; |
385 | 0 | for (size_t i = 0; i < array_size; ++i) { |
386 | 0 | if (*rows_array[i].first < min) { |
387 | 0 | min_index = i; |
388 | 0 | min = *rows_array[i].first; |
389 | 0 | } |
390 | 0 | } |
391 | 0 | *row_id_iter++ = min; |
392 | 0 | rows_array[min_index].first++; |
393 | 0 | if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) { |
394 | 0 | rows_array.erase(rows_array.begin() + min_index); |
395 | 0 | array_size--; |
396 | 0 | } |
397 | 0 | } |
398 | 0 | } |
399 | | |
400 | | void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete, |
401 | | size_t read_rows, |
402 | 0 | bool file_path_column_dictionary_coded) { |
403 | 0 | SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
404 | | // todo: maybe do not need to build name to index map every time |
405 | 0 | auto name_to_pos_map = block.get_name_to_pos_map(); |
406 | 0 | ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column; |
407 | 0 | DCHECK_EQ(path_column->size(), read_rows); |
408 | 0 | ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column; |
409 | 0 | using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; |
410 | 0 | const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data(); |
411 | 0 | IcebergTableReader::PositionDeleteRange range; |
412 | 0 | if (file_path_column_dictionary_coded) { |
413 | 0 | range = _get_range(assert_cast<const ColumnDictI32&>(*path_column)); |
414 | 0 | } else { |
415 | 0 | range = _get_range(assert_cast<const ColumnString&>(*path_column)); |
416 | 0 | } |
417 | 0 | for (int i = 0; i < range.range.size(); ++i) { |
418 | 0 | std::string key = range.data_file_path[i]; |
419 | 0 | auto iter = position_delete->find(key); |
420 | 0 | DeleteRows* delete_rows; |
421 | 0 | if (iter == position_delete->end()) { |
422 | 0 | delete_rows = new DeleteRows; |
423 | 0 | std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows); |
424 | 0 | (*position_delete)[key] = std::move(delete_rows_ptr); |
425 | 0 | } else { |
426 | 0 | delete_rows = iter->second.get(); |
427 | 0 | } |
428 | 0 | const int64_t* cpy_start = src_data + range.range[i].first; |
429 | 0 | const int64_t cpy_count = range.range[i].second - range.range[i].first; |
430 | 0 | int64_t origin_size = delete_rows->size(); |
431 | 0 | delete_rows->resize(origin_size + cpy_count); |
432 | 0 | int64_t* dest_position = &(*delete_rows)[origin_size]; |
433 | 0 | memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t)); |
434 | 0 | } |
435 | 0 | } |
436 | | |
437 | | Status IcebergParquetReader::init_reader( |
438 | | const std::vector<std::string>& file_col_names, |
439 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
440 | | const VExprContextSPtrs& conjuncts, |
441 | | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
442 | | slot_id_to_predicates, |
443 | | const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, |
444 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
445 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
446 | 1 | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) { |
447 | 1 | _file_format = Fileformat::PARQUET; |
448 | 1 | _col_name_to_block_idx = col_name_to_block_idx; |
449 | 1 | auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get()); |
450 | 1 | RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&_data_file_field_desc)); |
451 | 1 | DCHECK(_data_file_field_desc != nullptr); |
452 | | |
453 | 1 | auto column_id_result = _create_column_ids(_data_file_field_desc, tuple_descriptor); |
454 | 1 | auto& column_ids = column_id_result.column_ids; |
455 | 1 | const auto& filter_column_ids = column_id_result.filter_column_ids; |
456 | | |
457 | 1 | RETURN_IF_ERROR(init_row_filters()); |
458 | 1 | _all_required_col_names = file_col_names; |
459 | | |
460 | 1 | if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] { |
461 | 1 | RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name( |
462 | 1 | tuple_descriptor, *_data_file_field_desc, table_info_node_ptr)); |
463 | 1 | } else { |
464 | 0 | std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end()); |
465 | |
|
466 | 0 | bool exist_field_id = true; |
467 | 0 | for (int idx = 0; idx < _data_file_field_desc->size(); idx++) { |
468 | 0 | if (_data_file_field_desc->get_column(idx)->field_id == -1) { |
469 | | // the data file may be from hive table migrated to iceberg, field id is missing |
470 | 0 | exist_field_id = false; |
471 | 0 | break; |
472 | 0 | } |
473 | 0 | } |
474 | 0 | const auto& table_schema = _params.history_schema_info.front().root_field; |
475 | |
|
476 | 0 | table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
477 | 0 | if (exist_field_id) { |
478 | | // id -> table column name. columns that need read data file. |
479 | 0 | std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field; |
480 | 0 | for (const auto& table_field : table_schema.fields) { |
481 | 0 | auto field = table_field.field_ptr; |
482 | 0 | DCHECK(field->__isset.name); |
483 | 0 | if (!read_col_name_set.contains(field->name)) { |
484 | 0 | continue; |
485 | 0 | } |
486 | 0 | id_to_table_field.emplace(field->id, field); |
487 | 0 | } |
488 | |
|
489 | 0 | for (int idx = 0; idx < _data_file_field_desc->size(); idx++) { |
490 | 0 | const auto& data_file_field = _data_file_field_desc->get_column(idx); |
491 | 0 | auto data_file_column_id = _data_file_field_desc->get_column(idx)->field_id; |
492 | |
|
493 | 0 | if (id_to_table_field.contains(data_file_column_id)) { |
494 | 0 | const auto& table_field = id_to_table_field[data_file_column_id]; |
495 | |
|
496 | 0 | std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
497 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( |
498 | 0 | *table_field, *data_file_field, exist_field_id, field_node)); |
499 | 0 | table_info_node_ptr->add_children(table_field->name, data_file_field->name, |
500 | 0 | field_node); |
501 | |
|
502 | 0 | _id_to_block_column_name.emplace(data_file_column_id, table_field->name); |
503 | 0 | id_to_table_field.erase(data_file_column_id); |
504 | 0 | } else if (_equality_delete_col_ids.contains(data_file_column_id)) { |
505 | | // Columns that need to be read for equality delete. |
506 | 0 | const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; |
507 | | |
508 | | // Construct table column names that avoid duplication with current table schema. |
509 | | // As the columns currently being read may have been deleted in the latest |
510 | | // table structure or have undergone a series of schema changes... |
511 | 0 | std::string table_column_name = EQ_DELETE_PRE + data_file_field->name; |
512 | 0 | table_info_node_ptr->add_children( |
513 | 0 | table_column_name, data_file_field->name, |
514 | 0 | std::make_shared<TableSchemaChangeHelper::ConstNode>()); |
515 | |
|
516 | 0 | _id_to_block_column_name.emplace(data_file_column_id, table_column_name); |
517 | 0 | _expand_col_names.emplace_back(table_column_name); |
518 | 0 | auto expand_data_type = make_nullable(data_file_field->data_type); |
519 | 0 | _expand_columns.emplace_back( |
520 | 0 | ColumnWithTypeAndName {expand_data_type->create_column(), |
521 | 0 | expand_data_type, table_column_name}); |
522 | |
|
523 | 0 | _all_required_col_names.emplace_back(table_column_name); |
524 | 0 | column_ids.insert(data_file_field->get_column_id()); |
525 | 0 | } |
526 | 0 | } |
527 | 0 | for (const auto& [id, table_field] : id_to_table_field) { |
528 | 0 | table_info_node_ptr->add_not_exist_children(table_field->name); |
529 | 0 | } |
530 | 0 | } else { |
531 | 0 | if (!_equality_delete_col_ids.empty()) [[unlikely]] { |
532 | 0 | return Status::InternalError( |
533 | 0 | "Can not read missing field id data file when have equality delete"); |
534 | 0 | } |
535 | 0 | std::map<std::string, size_t> file_column_idx_map; |
536 | 0 | for (size_t idx = 0; idx < _data_file_field_desc->size(); idx++) { |
537 | 0 | file_column_idx_map.emplace(_data_file_field_desc->get_column(idx)->name, idx); |
538 | 0 | } |
539 | |
|
540 | 0 | for (const auto& table_field : table_schema.fields) { |
541 | 0 | DCHECK(table_field.__isset.field_ptr); |
542 | 0 | DCHECK(table_field.field_ptr->__isset.name); |
543 | 0 | const auto& table_column_name = table_field.field_ptr->name; |
544 | 0 | if (!read_col_name_set.contains(table_column_name)) { |
545 | 0 | continue; |
546 | 0 | } |
547 | 0 | if (!table_field.field_ptr->__isset.name_mapping || |
548 | 0 | table_field.field_ptr->name_mapping.size() == 0) { |
549 | 0 | return Status::DataQualityError( |
550 | 0 | "name_mapping must be set when read missing field id data file."); |
551 | 0 | } |
552 | 0 | bool have_mapping = false; |
553 | 0 | for (const auto& mapped_name : table_field.field_ptr->name_mapping) { |
554 | 0 | if (file_column_idx_map.contains(mapped_name)) { |
555 | 0 | std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
556 | 0 | const auto& file_field = _data_file_field_desc->get_column( |
557 | 0 | file_column_idx_map.at(mapped_name)); |
558 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( |
559 | 0 | *table_field.field_ptr, *file_field, exist_field_id, field_node)); |
560 | 0 | table_info_node_ptr->add_children(table_column_name, file_field->name, |
561 | 0 | field_node); |
562 | 0 | have_mapping = true; |
563 | 0 | break; |
564 | 0 | } |
565 | 0 | } |
566 | 0 | if (!have_mapping) { |
567 | 0 | table_info_node_ptr->add_not_exist_children(table_column_name); |
568 | 0 | } |
569 | 0 | } |
570 | 0 | } |
571 | 0 | } |
572 | | |
573 | 1 | return parquet_reader->init_reader( |
574 | 1 | _all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates, |
575 | 1 | tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, |
576 | 1 | slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids); |
577 | 1 | } |
578 | | |
579 | | ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc, |
580 | 7 | const TupleDescriptor* tuple_descriptor) { |
581 | | // First, assign column IDs to the field descriptor |
582 | 7 | auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc); |
583 | 7 | mutable_field_desc->assign_ids(); |
584 | | |
585 | | // map top-level table column iceberg_id -> FieldSchema* |
586 | 7 | std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map; |
587 | | |
588 | 58 | for (int i = 0; i < field_desc->size(); ++i) { |
589 | 51 | auto field_schema = field_desc->get_column(i); |
590 | 51 | if (!field_schema) continue; |
591 | | |
592 | 51 | int iceberg_id = field_schema->field_id; |
593 | 51 | iceberg_id_to_field_schema_map[iceberg_id] = field_schema; |
594 | 51 | } |
595 | | |
596 | 7 | std::set<uint64_t> column_ids; |
597 | 7 | std::set<uint64_t> filter_column_ids; |
598 | | |
599 | | // helper to process access paths for a given top-level parquet field |
600 | 7 | auto process_access_paths = [](const FieldSchema* parquet_field, |
601 | 7 | const std::vector<TColumnAccessPath>& access_paths, |
602 | 14 | std::set<uint64_t>& out_ids) { |
603 | 14 | process_nested_access_paths( |
604 | 14 | parquet_field, access_paths, out_ids, |
605 | 14 | [](const FieldSchema* field) { return field->get_column_id(); }, |
606 | 14 | [](const FieldSchema* field) { return field->get_max_column_id(); }, |
607 | 14 | IcebergParquetNestedColumnUtils::extract_nested_column_ids); |
608 | 14 | }; |
609 | | |
610 | 15 | for (const auto* slot : tuple_descriptor->slots()) { |
611 | 15 | auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id()); |
612 | 15 | if (it == iceberg_id_to_field_schema_map.end()) { |
613 | | // Column not found in file (e.g., partition column, added column) |
614 | 0 | continue; |
615 | 0 | } |
616 | 15 | auto field_schema = it->second; |
617 | | |
618 | | // primitive (non-nested) types: direct mapping by name |
619 | 15 | if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY && |
620 | 15 | slot->col_type() != TYPE_MAP)) { |
621 | 7 | column_ids.insert(field_schema->column_id); |
622 | | |
623 | 7 | if (slot->is_predicate()) { |
624 | 0 | filter_column_ids.insert(field_schema->column_id); |
625 | 0 | } |
626 | 7 | continue; |
627 | 7 | } |
628 | | |
629 | | // complex types: |
630 | 8 | const auto& all_access_paths = slot->all_access_paths(); |
631 | 8 | process_access_paths(field_schema, all_access_paths, column_ids); |
632 | | |
633 | 8 | const auto& predicate_access_paths = slot->predicate_access_paths(); |
634 | 8 | if (!predicate_access_paths.empty()) { |
635 | 6 | process_access_paths(field_schema, predicate_access_paths, filter_column_ids); |
636 | 6 | } |
637 | 8 | } |
638 | 7 | return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); |
639 | 7 | } |
640 | | |
641 | | Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* delete_range, |
642 | 0 | DeleteFile* position_delete) { |
643 | 0 | ParquetReader parquet_delete_reader(_profile, _params, *delete_range, |
644 | 0 | READ_DELETE_FILE_BATCH_SIZE, &_state->timezone_obj(), |
645 | 0 | _io_ctx, _state, _meta_cache); |
646 | 0 | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp; |
647 | 0 | RETURN_IF_ERROR(parquet_delete_reader.init_reader( |
648 | 0 | delete_file_col_names, |
649 | 0 | const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX), |
650 | 0 | {}, tmp, nullptr, nullptr, nullptr, nullptr, nullptr, |
651 | 0 | TableSchemaChangeHelper::ConstNode::get_instance(), false)); |
652 | | |
653 | 0 | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
654 | 0 | partition_columns; |
655 | 0 | std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
656 | 0 | RETURN_IF_ERROR(parquet_delete_reader.set_fill_columns(partition_columns, missing_columns)); |
657 | | |
658 | 0 | const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data(); |
659 | 0 | bool dictionary_coded = true; |
660 | 0 | for (const auto& row_group : meta_data->row_groups) { |
661 | 0 | const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX]; |
662 | 0 | if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) { |
663 | 0 | dictionary_coded = false; |
664 | 0 | break; |
665 | 0 | } |
666 | 0 | } |
667 | 0 | DataTypePtr data_type_file_path {new DataTypeString}; |
668 | 0 | DataTypePtr data_type_pos {new DataTypeInt64}; |
669 | 0 | bool eof = false; |
670 | 0 | while (!eof) { |
671 | 0 | Block block = {dictionary_coded |
672 | 0 | ? ColumnWithTypeAndName {ColumnDictI32::create( |
673 | 0 | FieldType::OLAP_FIELD_TYPE_VARCHAR), |
674 | 0 | data_type_file_path, ICEBERG_FILE_PATH} |
675 | 0 | : ColumnWithTypeAndName {data_type_file_path, ICEBERG_FILE_PATH}, |
676 | |
|
677 | 0 | {data_type_pos, ICEBERG_ROW_POS}}; |
678 | 0 | size_t read_rows = 0; |
679 | 0 | RETURN_IF_ERROR(parquet_delete_reader.get_next_block(&block, &read_rows, &eof)); |
680 | | |
681 | 0 | if (read_rows <= 0) { |
682 | 0 | break; |
683 | 0 | } |
684 | 0 | _gen_position_delete_file_range(block, position_delete, read_rows, dictionary_coded); |
685 | 0 | } |
686 | 0 | return Status::OK(); |
687 | 0 | }; |
688 | | |
689 | | Status IcebergOrcReader::init_reader( |
690 | | const std::vector<std::string>& file_col_names, |
691 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
692 | | const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, |
693 | | const RowDescriptor* row_descriptor, |
694 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
695 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
696 | 1 | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) { |
697 | 1 | _file_format = Fileformat::ORC; |
698 | 1 | _col_name_to_block_idx = col_name_to_block_idx; |
699 | 1 | auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get()); |
700 | 1 | RETURN_IF_ERROR(orc_reader->get_file_type(&_data_file_type_desc)); |
701 | 1 | std::vector<std::string> data_file_col_names; |
702 | 1 | std::vector<DataTypePtr> data_file_col_types; |
703 | 1 | RETURN_IF_ERROR(orc_reader->get_parsed_schema(&data_file_col_names, &data_file_col_types)); |
704 | | |
705 | 1 | auto column_id_result = _create_column_ids(_data_file_type_desc, tuple_descriptor); |
706 | 1 | auto& column_ids = column_id_result.column_ids; |
707 | 1 | const auto& filter_column_ids = column_id_result.filter_column_ids; |
708 | | |
709 | 1 | RETURN_IF_ERROR(init_row_filters()); |
710 | | |
711 | 1 | _all_required_col_names = file_col_names; |
712 | 1 | if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] { |
713 | 1 | RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, _data_file_type_desc, |
714 | 1 | table_info_node_ptr)); |
715 | 1 | } else { |
716 | 0 | std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end()); |
717 | |
|
718 | 0 | bool exist_field_id = true; |
719 | 0 | for (size_t idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { |
720 | 0 | if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
721 | 0 | exist_field_id = false; |
722 | 0 | break; |
723 | 0 | } |
724 | 0 | } |
725 | |
|
726 | 0 | const auto& table_schema = _params.history_schema_info.front().root_field; |
727 | 0 | table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
728 | 0 | if (exist_field_id) { |
729 | | // id -> table column name. columns that need read data file. |
730 | 0 | std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field; |
731 | 0 | for (const auto& table_field : table_schema.fields) { |
732 | 0 | auto field = table_field.field_ptr; |
733 | 0 | DCHECK(field->__isset.name); |
734 | 0 | if (!read_col_name_set.contains(field->name)) { |
735 | 0 | continue; |
736 | 0 | } |
737 | | |
738 | 0 | id_to_table_field.emplace(field->id, field); |
739 | 0 | } |
740 | |
|
741 | 0 | for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { |
742 | 0 | const auto& data_file_field = _data_file_type_desc->getSubtype(idx); |
743 | 0 | auto data_file_column_id = |
744 | 0 | std::stoi(data_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
745 | 0 | auto const& file_column_name = _data_file_type_desc->getFieldName(idx); |
746 | |
|
747 | 0 | if (id_to_table_field.contains(data_file_column_id)) { |
748 | 0 | const auto& table_field = id_to_table_field[data_file_column_id]; |
749 | |
|
750 | 0 | std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
751 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( |
752 | 0 | *table_field, data_file_field, ICEBERG_ORC_ATTRIBUTE, exist_field_id, |
753 | 0 | field_node)); |
754 | 0 | table_info_node_ptr->add_children(table_field->name, file_column_name, |
755 | 0 | field_node); |
756 | |
|
757 | 0 | _id_to_block_column_name.emplace(data_file_column_id, table_field->name); |
758 | 0 | id_to_table_field.erase(data_file_column_id); |
759 | 0 | } else if (_equality_delete_col_ids.contains(data_file_column_id)) { |
760 | | // Columns that need to be read for equality delete. |
761 | 0 | const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; |
762 | | |
763 | | // Construct table column names that avoid duplication with current table schema. |
764 | | // As the columns currently being read may have been deleted in the latest |
765 | | // table structure or have undergone a series of schema changes... |
766 | 0 | std::string table_column_name = EQ_DELETE_PRE + file_column_name; |
767 | 0 | table_info_node_ptr->add_children( |
768 | 0 | table_column_name, file_column_name, |
769 | 0 | std::make_shared<TableSchemaChangeHelper::ConstNode>()); |
770 | |
|
771 | 0 | _id_to_block_column_name.emplace(data_file_column_id, table_column_name); |
772 | 0 | _expand_col_names.emplace_back(table_column_name); |
773 | |
|
774 | 0 | auto expand_data_type = make_nullable(data_file_col_types[idx]); |
775 | 0 | _expand_columns.emplace_back( |
776 | 0 | ColumnWithTypeAndName {expand_data_type->create_column(), |
777 | 0 | expand_data_type, table_column_name}); |
778 | |
|
779 | 0 | _all_required_col_names.emplace_back(table_column_name); |
780 | 0 | column_ids.insert(data_file_field->getColumnId()); |
781 | 0 | } |
782 | 0 | } |
783 | 0 | for (const auto& [id, table_field] : id_to_table_field) { |
784 | 0 | table_info_node_ptr->add_not_exist_children(table_field->name); |
785 | 0 | } |
786 | 0 | } else { |
787 | 0 | if (!_equality_delete_col_ids.empty()) [[unlikely]] { |
788 | 0 | return Status::InternalError( |
789 | 0 | "Can not read missing field id data file when have equality delete"); |
790 | 0 | } |
791 | 0 | std::map<std::string, size_t> file_column_idx_map; |
792 | 0 | for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { |
793 | 0 | auto const& file_column_name = _data_file_type_desc->getFieldName(idx); |
794 | 0 | file_column_idx_map.emplace(file_column_name, idx); |
795 | 0 | } |
796 | |
|
797 | 0 | for (const auto& table_field : table_schema.fields) { |
798 | 0 | DCHECK(table_field.__isset.field_ptr); |
799 | 0 | DCHECK(table_field.field_ptr->__isset.name); |
800 | 0 | const auto& table_column_name = table_field.field_ptr->name; |
801 | 0 | if (!read_col_name_set.contains(table_column_name)) { |
802 | 0 | continue; |
803 | 0 | } |
804 | 0 | if (!table_field.field_ptr->__isset.name_mapping || |
805 | 0 | table_field.field_ptr->name_mapping.size() == 0) { |
806 | 0 | return Status::DataQualityError( |
807 | 0 | "name_mapping must be set when read missing field id data file."); |
808 | 0 | } |
809 | 0 | auto have_mapping = false; |
810 | 0 | for (const auto& mapped_name : table_field.field_ptr->name_mapping) { |
811 | 0 | if (file_column_idx_map.contains(mapped_name)) { |
812 | 0 | auto file_column_idx = file_column_idx_map.at(mapped_name); |
813 | 0 | std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
814 | 0 | const auto& file_field = _data_file_type_desc->getSubtype(file_column_idx); |
815 | 0 | RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( |
816 | 0 | *table_field.field_ptr, file_field, ICEBERG_ORC_ATTRIBUTE, |
817 | 0 | exist_field_id, field_node)); |
818 | 0 | table_info_node_ptr->add_children( |
819 | 0 | table_column_name, |
820 | 0 | _data_file_type_desc->getFieldName(file_column_idx), field_node); |
821 | 0 | have_mapping = true; |
822 | 0 | break; |
823 | 0 | } |
824 | 0 | } |
825 | 0 | if (!have_mapping) { |
826 | 0 | table_info_node_ptr->add_not_exist_children(table_column_name); |
827 | 0 | } |
828 | 0 | } |
829 | 0 | } |
830 | 0 | } |
831 | | |
832 | 1 | return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts, |
833 | 1 | false, tuple_descriptor, row_descriptor, |
834 | 1 | not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, |
835 | 1 | table_info_node_ptr, column_ids, filter_column_ids); |
836 | 1 | } |
837 | | |
838 | | ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, |
839 | 7 | const TupleDescriptor* tuple_descriptor) { |
840 | | // map top-level table column iceberg_id -> orc::Type* |
841 | 7 | std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map; |
842 | 58 | for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) { |
843 | 51 | auto orc_sub_type = orc_type->getSubtype(i); |
844 | 51 | if (!orc_sub_type) continue; |
845 | | |
846 | 51 | if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
847 | 0 | continue; |
848 | 0 | } |
849 | 51 | int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
850 | 51 | iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type; |
851 | 51 | } |
852 | | |
853 | 7 | std::set<uint64_t> column_ids; |
854 | 7 | std::set<uint64_t> filter_column_ids; |
855 | | |
856 | | // helper to process access paths for a given top-level orc field |
857 | 7 | auto process_access_paths = [](const orc::Type* orc_field, |
858 | 7 | const std::vector<TColumnAccessPath>& access_paths, |
859 | 14 | std::set<uint64_t>& out_ids) { |
860 | 14 | process_nested_access_paths( |
861 | 14 | orc_field, access_paths, out_ids, |
862 | 14 | [](const orc::Type* type) { return type->getColumnId(); }, |
863 | 14 | [](const orc::Type* type) { return type->getMaximumColumnId(); }, |
864 | 14 | IcebergOrcNestedColumnUtils::extract_nested_column_ids); |
865 | 14 | }; |
866 | | |
867 | 15 | for (const auto* slot : tuple_descriptor->slots()) { |
868 | 15 | auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id()); |
869 | 15 | if (it == iceberg_id_to_orc_type_map.end()) { |
870 | | // Column not found in file |
871 | 0 | continue; |
872 | 0 | } |
873 | 15 | const orc::Type* orc_field = it->second; |
874 | | |
875 | | // primitive (non-nested) types |
876 | 15 | if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY && |
877 | 15 | slot->col_type() != TYPE_MAP)) { |
878 | 7 | column_ids.insert(orc_field->getColumnId()); |
879 | 7 | if (slot->is_predicate()) { |
880 | 0 | filter_column_ids.insert(orc_field->getColumnId()); |
881 | 0 | } |
882 | 7 | continue; |
883 | 7 | } |
884 | | |
885 | | // complex types |
886 | 8 | const auto& all_access_paths = slot->all_access_paths(); |
887 | 8 | process_access_paths(orc_field, all_access_paths, column_ids); |
888 | | |
889 | 8 | const auto& predicate_access_paths = slot->predicate_access_paths(); |
890 | 8 | if (!predicate_access_paths.empty()) { |
891 | 6 | process_access_paths(orc_field, predicate_access_paths, filter_column_ids); |
892 | 6 | } |
893 | 8 | } |
894 | | |
895 | 7 | return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); |
896 | 7 | } |
897 | | |
898 | | Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, |
899 | 0 | DeleteFile* position_delete) { |
900 | 0 | OrcReader orc_delete_reader(_profile, _state, _params, *delete_range, |
901 | 0 | READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx, |
902 | 0 | _meta_cache); |
903 | 0 | RETURN_IF_ERROR(orc_delete_reader.init_reader( |
904 | 0 | &delete_file_col_names, |
905 | 0 | const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX), |
906 | 0 | {}, false, {}, {}, nullptr, nullptr)); |
907 | | |
908 | 0 | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
909 | 0 | partition_columns; |
910 | 0 | std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
911 | 0 | RETURN_IF_ERROR(orc_delete_reader.set_fill_columns(partition_columns, missing_columns)); |
912 | | |
913 | 0 | bool eof = false; |
914 | 0 | DataTypePtr data_type_file_path {new DataTypeString}; |
915 | 0 | DataTypePtr data_type_pos {new DataTypeInt64}; |
916 | 0 | while (!eof) { |
917 | 0 | Block block = {{data_type_file_path, ICEBERG_FILE_PATH}, {data_type_pos, ICEBERG_ROW_POS}}; |
918 | |
|
919 | 0 | size_t read_rows = 0; |
920 | 0 | RETURN_IF_ERROR(orc_delete_reader.get_next_block(&block, &read_rows, &eof)); |
921 | | |
922 | 0 | _gen_position_delete_file_range(block, position_delete, read_rows, false); |
923 | 0 | } |
924 | 0 | return Status::OK(); |
925 | 0 | } |
926 | | |
927 | | // Directly read the deletion vector using the `content_offset` and |
928 | | // `content_size_in_bytes` provided by FE in `delete_file_desc`. |
929 | | // These two fields indicate the location of a blob in storage. |
930 | | // Since the current format is `deletion-vector-v1`, which does not |
931 | | // compress any blobs, we can temporarily skip parsing the Puffin footer. |
932 | | Status IcebergTableReader::read_deletion_vector(const std::string& data_file_path, |
933 | 0 | const TIcebergDeleteFileDesc& delete_file_desc) { |
934 | 0 | Status create_status = Status::OK(); |
935 | 0 | SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
936 | 0 | _iceberg_delete_rows = _kv_cache->get<DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
937 | 0 | auto* delete_rows = new DeleteRows; |
938 | |
|
939 | 0 | TFileRangeDesc delete_range; |
940 | | // must use __set() method to make sure __isset is true |
941 | 0 | delete_range.__set_fs_name(_range.fs_name); |
942 | 0 | delete_range.path = delete_file_desc.path; |
943 | 0 | delete_range.start_offset = delete_file_desc.content_offset; |
944 | 0 | delete_range.size = delete_file_desc.content_size_in_bytes; |
945 | 0 | delete_range.file_size = -1; |
946 | | |
947 | | // We may consider caching the DeletionVectorReader when reading Puffin files, |
948 | | // where the underlying reader is an `InMemoryFileReader` and a single data file is |
949 | | // split into multiple splits. However, we need to ensure that the underlying |
950 | | // reader supports multi-threaded access. |
951 | 0 | DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx); |
952 | 0 | create_status = dv_reader.open(); |
953 | 0 | if (!create_status.ok()) [[unlikely]] { |
954 | 0 | return nullptr; |
955 | 0 | } |
956 | | |
957 | 0 | size_t buffer_size = delete_range.size; |
958 | 0 | std::vector<char> buf(buffer_size); |
959 | 0 | if (buffer_size < 12) [[unlikely]] { |
960 | | // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32 |
961 | 0 | create_status = Status::DataQualityError("Deletion vector file size too small: {}", |
962 | 0 | buffer_size); |
963 | 0 | return nullptr; |
964 | 0 | } |
965 | | |
966 | 0 | create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size}); |
967 | 0 | if (!create_status) [[unlikely]] { |
968 | 0 | return nullptr; |
969 | 0 | } |
970 | | // The serialized blob contains: |
971 | | // |
972 | | // Combined length of the vector and magic bytes stored as 4 bytes, big-endian |
973 | | // A 4-byte magic sequence, D1 D3 39 64 |
974 | | // The vector, serialized as described below |
975 | | // A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian |
976 | | |
977 | 0 | auto total_length = BigEndian::Load32(buf.data()); |
978 | 0 | if (total_length + 8 != buffer_size) [[unlikely]] { |
979 | 0 | create_status = Status::DataQualityError( |
980 | 0 | "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8, |
981 | 0 | buffer_size); |
982 | 0 | return nullptr; |
983 | 0 | } |
984 | | |
985 | 0 | constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
986 | 0 | if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] { |
987 | 0 | create_status = Status::DataQualityError("Deletion vector magic number mismatch"); |
988 | 0 | return nullptr; |
989 | 0 | } |
990 | | |
991 | 0 | roaring::Roaring64Map bitmap; |
992 | 0 | SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
993 | 0 | try { |
994 | 0 | bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12); |
995 | 0 | } catch (const std::runtime_error& e) { |
996 | 0 | create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
997 | 0 | return nullptr; |
998 | 0 | } |
999 | | // skip CRC-32 checksum |
1000 | | |
1001 | 0 | delete_rows->reserve(bitmap.cardinality()); |
1002 | 0 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
1003 | 0 | delete_rows->push_back(*it); |
1004 | 0 | } |
1005 | 0 | COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size()); |
1006 | 0 | return delete_rows; |
1007 | 0 | }); |
1008 | |
|
1009 | 0 | RETURN_IF_ERROR(create_status); |
1010 | 0 | if (!_iceberg_delete_rows->empty()) [[likely]] { |
1011 | 0 | set_delete_rows(); |
1012 | 0 | } |
1013 | 0 | return Status::OK(); |
1014 | 0 | } |
1015 | | |
1016 | | // Similar to the code structure of IcebergOrcReader::_process_equality_delete, |
1017 | | // but considering the significant differences in how parquet/orc obtains |
1018 | | // attributes/column IDs, it is not easy to combine them. |
1019 | | Status IcebergParquetReader::_process_equality_delete( |
1020 | 0 | const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
1021 | 0 | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
1022 | 0 | partition_columns; |
1023 | 0 | std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
1024 | |
|
1025 | 0 | std::map<int, const FieldSchema*> data_file_id_to_field_schema; |
1026 | 0 | for (int idx = 0; idx < _data_file_field_desc->size(); ++idx) { |
1027 | 0 | auto field_schema = _data_file_field_desc->get_column(idx); |
1028 | 0 | if (_data_file_field_desc->get_column(idx)->field_id == -1) { |
1029 | 0 | return Status::DataQualityError("Iceberg equality delete data file missing field id."); |
1030 | 0 | } |
1031 | 0 | data_file_id_to_field_schema[_data_file_field_desc->get_column(idx)->field_id] = |
1032 | 0 | field_schema; |
1033 | 0 | } |
1034 | | |
1035 | 0 | for (const auto& delete_file : delete_files) { |
1036 | 0 | TFileRangeDesc delete_desc; |
1037 | | // must use __set() method to make sure __isset is true |
1038 | 0 | delete_desc.__set_fs_name(_range.fs_name); |
1039 | 0 | delete_desc.path = delete_file.path; |
1040 | 0 | delete_desc.start_offset = 0; |
1041 | 0 | delete_desc.size = -1; |
1042 | 0 | delete_desc.file_size = -1; |
1043 | |
|
1044 | 0 | if (!delete_file.__isset.field_ids) [[unlikely]] { |
1045 | 0 | return Status::InternalError( |
1046 | 0 | "missing delete field ids when reading equality delete file"); |
1047 | 0 | } |
1048 | 0 | auto& read_column_field_ids = delete_file.field_ids; |
1049 | 0 | std::set<int> read_column_field_ids_set; |
1050 | 0 | for (const auto& field_id : read_column_field_ids) { |
1051 | 0 | read_column_field_ids_set.insert(field_id); |
1052 | 0 | _equality_delete_col_ids.insert(field_id); |
1053 | 0 | } |
1054 | |
|
1055 | 0 | auto delete_reader = ParquetReader::create_unique( |
1056 | 0 | _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE, |
1057 | 0 | &_state->timezone_obj(), _io_ctx, _state, _meta_cache); |
1058 | 0 | RETURN_IF_ERROR(delete_reader->init_schema_reader()); |
1059 | | |
1060 | | // the column that to read equality delete file. |
1061 | | // (delete file may be have extra columns that don't need to read) |
1062 | 0 | std::vector<std::string> delete_col_names; |
1063 | 0 | std::vector<DataTypePtr> delete_col_types; |
1064 | 0 | std::vector<int> delete_col_ids; |
1065 | 0 | std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx; |
1066 | |
|
1067 | 0 | const FieldDescriptor* delete_field_desc = nullptr; |
1068 | 0 | RETURN_IF_ERROR(delete_reader->get_file_metadata_schema(&delete_field_desc)); |
1069 | 0 | DCHECK(delete_field_desc != nullptr); |
1070 | |
|
1071 | 0 | auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
1072 | 0 | for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) { |
1073 | 0 | if (delete_file_field.field_id == -1) [[unlikely]] { // missing delete_file_field id |
1074 | | // equality delete file must have delete_file_field id to match column. |
1075 | 0 | return Status::DataQualityError( |
1076 | 0 | "missing delete_file_field id when reading equality delete file"); |
1077 | 0 | } else if (read_column_field_ids_set.contains(delete_file_field.field_id)) { |
1078 | | // the column that need to read. |
1079 | 0 | if (delete_file_field.children.size() > 0) [[unlikely]] { // complex column |
1080 | 0 | return Status::InternalError( |
1081 | 0 | "can not support read complex column in equality delete file"); |
1082 | 0 | } else if (!data_file_id_to_field_schema.contains(delete_file_field.field_id)) |
1083 | 0 | [[unlikely]] { |
1084 | 0 | return Status::DataQualityError( |
1085 | 0 | "can not find delete field id in data file schema when reading " |
1086 | 0 | "equality delete file"); |
1087 | 0 | } |
1088 | 0 | auto data_file_field = data_file_id_to_field_schema[delete_file_field.field_id]; |
1089 | 0 | if (data_file_field->data_type->get_primitive_type() != |
1090 | 0 | delete_file_field.data_type->get_primitive_type()) [[unlikely]] { |
1091 | 0 | return Status::NotSupported( |
1092 | 0 | "Not Support type change in equality delete, field: {}, delete " |
1093 | 0 | "file type: {}, data file type: {}", |
1094 | 0 | delete_file_field.field_id, delete_file_field.data_type->get_name(), |
1095 | 0 | data_file_field->data_type->get_name()); |
1096 | 0 | } |
1097 | | |
1098 | 0 | std::string filed_lower_name = to_lower(delete_file_field.name); |
1099 | 0 | eq_file_node->add_children(filed_lower_name, delete_file_field.name, |
1100 | 0 | std::make_shared<TableSchemaChangeHelper::ScalarNode>()); |
1101 | |
|
1102 | 0 | delete_col_ids.emplace_back(delete_file_field.field_id); |
1103 | 0 | delete_col_names.emplace_back(filed_lower_name); |
1104 | 0 | delete_col_types.emplace_back(make_nullable(delete_file_field.data_type)); |
1105 | |
|
1106 | 0 | read_column_field_ids_set.erase(delete_file_field.field_id); |
1107 | 0 | } else { |
1108 | | // delete file may be have extra columns that don't need to read |
1109 | 0 | } |
1110 | 0 | } |
1111 | 0 | if (!read_column_field_ids_set.empty()) [[unlikely]] { |
1112 | 0 | return Status::DataQualityError("some field ids not found in equality delete file."); |
1113 | 0 | } |
1114 | | |
1115 | 0 | for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
1116 | 0 | delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
1117 | 0 | } |
1118 | 0 | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp; |
1119 | 0 | RETURN_IF_ERROR(delete_reader->init_reader(delete_col_names, &delete_col_name_to_block_idx, |
1120 | 0 | {}, tmp, nullptr, nullptr, nullptr, nullptr, |
1121 | 0 | nullptr, eq_file_node, false)); |
1122 | 0 | RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns)); |
1123 | | |
1124 | 0 | if (!_equality_delete_block_map.contains(delete_col_ids)) { |
1125 | 0 | _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); |
1126 | 0 | Block block; |
1127 | 0 | _generate_equality_delete_block(&block, delete_col_names, delete_col_types); |
1128 | 0 | _equality_delete_blocks.emplace_back(block); |
1129 | 0 | } |
1130 | 0 | Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; |
1131 | 0 | bool eof = false; |
1132 | 0 | while (!eof) { |
1133 | 0 | Block tmp_block; |
1134 | 0 | _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); |
1135 | 0 | size_t read_rows = 0; |
1136 | 0 | RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof)); |
1137 | 0 | if (read_rows > 0) { |
1138 | 0 | MutableBlock mutable_block(&eq_file_block); |
1139 | 0 | RETURN_IF_ERROR(mutable_block.merge(tmp_block)); |
1140 | 0 | } |
1141 | 0 | } |
1142 | 0 | } |
1143 | | |
1144 | 0 | for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { |
1145 | 0 | auto& eq_file_block = _equality_delete_blocks[block_idx]; |
1146 | 0 | auto equality_delete_impl = |
1147 | 0 | EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); |
1148 | 0 | RETURN_IF_ERROR(equality_delete_impl->init(_profile)); |
1149 | 0 | _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); |
1150 | 0 | } |
1151 | 0 | return Status::OK(); |
1152 | 0 | } |
1153 | | |
1154 | | Status IcebergOrcReader::_process_equality_delete( |
1155 | 0 | const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
1156 | 0 | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
1157 | 0 | partition_columns; |
1158 | 0 | std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
1159 | |
|
1160 | 0 | std::map<int, int> data_file_id_to_field_idx; |
1161 | 0 | for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); ++idx) { |
1162 | 0 | if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
1163 | 0 | return Status::DataQualityError("Iceberg equality delete data file missing field id."); |
1164 | 0 | } |
1165 | 0 | auto field_id = std::stoi( |
1166 | 0 | _data_file_type_desc->getSubtype(idx)->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
1167 | 0 | data_file_id_to_field_idx[field_id] = idx; |
1168 | 0 | } |
1169 | | |
1170 | 0 | for (const auto& delete_file : delete_files) { |
1171 | 0 | TFileRangeDesc delete_desc; |
1172 | | // must use __set() method to make sure __isset is true |
1173 | 0 | delete_desc.__set_fs_name(_range.fs_name); |
1174 | 0 | delete_desc.path = delete_file.path; |
1175 | 0 | delete_desc.start_offset = 0; |
1176 | 0 | delete_desc.size = -1; |
1177 | 0 | delete_desc.file_size = -1; |
1178 | |
|
1179 | 0 | if (!delete_file.__isset.field_ids) [[unlikely]] { |
1180 | 0 | return Status::InternalError( |
1181 | 0 | "missing delete field ids when reading equality delete file"); |
1182 | 0 | } |
1183 | 0 | auto& read_column_field_ids = delete_file.field_ids; |
1184 | 0 | std::set<int> read_column_field_ids_set; |
1185 | 0 | for (const auto& field_id : read_column_field_ids) { |
1186 | 0 | read_column_field_ids_set.insert(field_id); |
1187 | 0 | _equality_delete_col_ids.insert(field_id); |
1188 | 0 | } |
1189 | |
|
1190 | 0 | auto delete_reader = OrcReader::create_unique(_profile, _state, _params, delete_desc, |
1191 | 0 | READ_DELETE_FILE_BATCH_SIZE, |
1192 | 0 | _state->timezone(), _io_ctx, _meta_cache); |
1193 | 0 | RETURN_IF_ERROR(delete_reader->init_schema_reader()); |
1194 | | // delete file schema |
1195 | 0 | std::vector<std::string> delete_file_col_names; |
1196 | 0 | std::vector<DataTypePtr> delete_file_col_types; |
1197 | 0 | RETURN_IF_ERROR( |
1198 | 0 | delete_reader->get_parsed_schema(&delete_file_col_names, &delete_file_col_types)); |
1199 | | |
1200 | | // the column that to read equality delete file. |
1201 | | // (delete file maybe have extra columns that don't need to read) |
1202 | 0 | std::vector<std::string> delete_col_names; |
1203 | 0 | std::vector<DataTypePtr> delete_col_types; |
1204 | 0 | std::vector<int> delete_col_ids; |
1205 | 0 | std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx; |
1206 | |
|
1207 | 0 | const orc::Type* delete_field_desc = nullptr; |
1208 | 0 | RETURN_IF_ERROR(delete_reader->get_file_type(&delete_field_desc)); |
1209 | 0 | DCHECK(delete_field_desc != nullptr); |
1210 | |
|
1211 | 0 | auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
1212 | |
|
1213 | 0 | for (size_t idx = 0; idx < delete_field_desc->getSubtypeCount(); idx++) { |
1214 | 0 | auto delete_file_field = delete_field_desc->getSubtype(idx); |
1215 | |
|
1216 | 0 | if (!delete_file_field->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) |
1217 | 0 | [[unlikely]] { // missing delete_file_field id |
1218 | | // equality delete file must have delete_file_field id to match column. |
1219 | 0 | return Status::DataQualityError( |
1220 | 0 | "missing delete_file_field id when reading equality delete file"); |
1221 | 0 | } else { |
1222 | 0 | auto delete_field_id = |
1223 | 0 | std::stoi(delete_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
1224 | 0 | if (read_column_field_ids_set.contains(delete_field_id)) { |
1225 | | // the column that need to read. |
1226 | 0 | if (is_complex_type(delete_file_col_types[idx]->get_primitive_type())) |
1227 | 0 | [[unlikely]] { |
1228 | 0 | return Status::InternalError( |
1229 | 0 | "can not support read complex column in equality delete file."); |
1230 | 0 | } else if (!data_file_id_to_field_idx.contains(delete_field_id)) [[unlikely]] { |
1231 | 0 | return Status::DataQualityError( |
1232 | 0 | "can not find delete field id in data file schema when reading " |
1233 | 0 | "equality delete file"); |
1234 | 0 | } |
1235 | | |
1236 | 0 | auto data_file_field = _data_file_type_desc->getSubtype( |
1237 | 0 | data_file_id_to_field_idx[delete_field_id]); |
1238 | |
|
1239 | 0 | if (delete_file_field->getKind() != data_file_field->getKind()) [[unlikely]] { |
1240 | 0 | return Status::NotSupported( |
1241 | 0 | "Not Support type change in equality delete, field: {}, delete " |
1242 | 0 | "file type: {}, data file type: {}", |
1243 | 0 | delete_field_id, delete_file_field->getKind(), |
1244 | 0 | data_file_field->getKind()); |
1245 | 0 | } |
1246 | 0 | std::string filed_lower_name = to_lower(delete_field_desc->getFieldName(idx)); |
1247 | 0 | eq_file_node->add_children( |
1248 | 0 | filed_lower_name, delete_field_desc->getFieldName(idx), |
1249 | 0 | std::make_shared<TableSchemaChangeHelper::ScalarNode>()); |
1250 | |
|
1251 | 0 | delete_col_ids.emplace_back(delete_field_id); |
1252 | 0 | delete_col_names.emplace_back(filed_lower_name); |
1253 | 0 | delete_col_types.emplace_back(make_nullable(delete_file_col_types[idx])); |
1254 | 0 | read_column_field_ids_set.erase(delete_field_id); |
1255 | 0 | } |
1256 | 0 | } |
1257 | 0 | } |
1258 | 0 | if (!read_column_field_ids_set.empty()) [[unlikely]] { |
1259 | 0 | return Status::DataQualityError("some field ids not found in equality delete file."); |
1260 | 0 | } |
1261 | | |
1262 | 0 | for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
1263 | 0 | delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
1264 | 0 | } |
1265 | |
|
1266 | 0 | RETURN_IF_ERROR(delete_reader->init_reader(&delete_col_names, &delete_col_name_to_block_idx, |
1267 | 0 | {}, false, nullptr, nullptr, nullptr, nullptr, |
1268 | 0 | eq_file_node)); |
1269 | 0 | RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns)); |
1270 | | |
1271 | 0 | if (!_equality_delete_block_map.contains(delete_col_ids)) { |
1272 | 0 | _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); |
1273 | 0 | Block block; |
1274 | 0 | _generate_equality_delete_block(&block, delete_col_names, delete_col_types); |
1275 | 0 | _equality_delete_blocks.emplace_back(block); |
1276 | 0 | } |
1277 | 0 | Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; |
1278 | 0 | bool eof = false; |
1279 | 0 | while (!eof) { |
1280 | 0 | Block tmp_block; |
1281 | 0 | _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); |
1282 | 0 | size_t read_rows = 0; |
1283 | 0 | RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof)); |
1284 | 0 | if (read_rows > 0) { |
1285 | 0 | MutableBlock mutable_block(&eq_file_block); |
1286 | 0 | RETURN_IF_ERROR(mutable_block.merge(tmp_block)); |
1287 | 0 | } |
1288 | 0 | } |
1289 | 0 | } |
1290 | | |
1291 | 0 | for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { |
1292 | 0 | auto& eq_file_block = _equality_delete_blocks[block_idx]; |
1293 | 0 | auto equality_delete_impl = |
1294 | 0 | EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); |
1295 | 0 | RETURN_IF_ERROR(equality_delete_impl->init(_profile)); |
1296 | 0 | _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); |
1297 | 0 | } |
1298 | 0 | return Status::OK(); |
1299 | 0 | } |
1300 | | #include "common/compile_check_end.h" |
1301 | | } // namespace doris |