be/src/exec/sink/viceberg_delete_sink.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/viceberg_delete_sink.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <rapidjson/stringbuffer.h> |
22 | | #include <rapidjson/writer.h> |
23 | | #include <zlib.h> |
24 | | |
25 | | #include "common/logging.h" |
26 | | #include "core/block/column_with_type_and_name.h" |
27 | | #include "core/column/column_nullable.h" |
28 | | #include "core/column/column_string.h" |
29 | | #include "core/column/column_struct.h" |
30 | | #include "core/column/column_vector.h" |
31 | | #include "core/data_type/data_type_factory.hpp" |
32 | | #include "core/data_type/data_type_nullable.h" |
33 | | #include "core/data_type/data_type_number.h" |
34 | | #include "core/data_type/data_type_string.h" |
35 | | #include "core/data_type/data_type_struct.h" |
36 | | #include "exec/common/endian.h" |
37 | | #include "exprs/vexpr.h" |
38 | | #include "format/table/iceberg_delete_file_reader_helper.h" |
39 | | #include "format/transformer/vfile_format_transformer.h" |
40 | | #include "io/file_factory.h" |
41 | | #include "runtime/runtime_state.h" |
42 | | #include "util/slice.h" |
43 | | #include "util/string_util.h" |
44 | | #include "util/uid_util.h" |
45 | | |
46 | | namespace doris { |
47 | | |
48 | | namespace { |
49 | | |
50 | | class RewriteBitmapVisitor final : public IcebergPositionDeleteVisitor { |
51 | | public: |
52 | | RewriteBitmapVisitor(const std::string& referenced_data_file_path, |
53 | | roaring::Roaring64Map* rows_to_delete) |
54 | 0 | : _referenced_data_file_path(referenced_data_file_path), |
55 | 0 | _rows_to_delete(rows_to_delete) {} |
56 | | |
57 | 0 | Status visit(const std::string& file_path, int64_t pos) override { |
58 | 0 | if (_rows_to_delete == nullptr) { |
59 | 0 | return Status::InvalidArgument("rows_to_delete is null"); |
60 | 0 | } |
61 | 0 | if (file_path == _referenced_data_file_path) { |
62 | 0 | _rows_to_delete->add(static_cast<uint64_t>(pos)); |
63 | 0 | } |
64 | 0 | return Status::OK(); |
65 | 0 | } |
66 | | |
67 | | private: |
68 | | const std::string& _referenced_data_file_path; |
69 | | roaring::Roaring64Map* _rows_to_delete; |
70 | | }; |
71 | | |
72 | | Status load_rewritable_delete_rows(RuntimeState* state, RuntimeProfile* profile, |
73 | | const std::string& referenced_data_file_path, |
74 | | const std::vector<TIcebergDeleteFileDesc>& delete_files, |
75 | | const std::map<std::string, std::string>& hadoop_conf, |
76 | | TFileType::type file_type, |
77 | | const std::vector<TNetworkAddress>& broker_addresses, |
78 | 0 | roaring::Roaring64Map* rows_to_delete) { |
79 | 0 | if (rows_to_delete == nullptr) { |
80 | 0 | return Status::InvalidArgument("rows_to_delete is null"); |
81 | 0 | } |
82 | 0 | if (state == nullptr || profile == nullptr || delete_files.empty()) { |
83 | 0 | return Status::OK(); |
84 | 0 | } |
85 | | |
86 | 0 | TFileScanRangeParams params = |
87 | 0 | build_iceberg_delete_scan_range_params(hadoop_conf, file_type, broker_addresses); |
88 | 0 | IcebergDeleteFileIOContext delete_file_io_ctx(state); |
89 | 0 | IcebergDeleteFileReaderOptions options; |
90 | 0 | options.state = state; |
91 | 0 | options.profile = profile; |
92 | 0 | options.scan_params = ¶ms; |
93 | 0 | options.io_ctx = &delete_file_io_ctx.io_ctx; |
94 | 0 | options.batch_size = 102400; |
95 | |
|
96 | 0 | for (const auto& delete_file : delete_files) { |
97 | 0 | if (is_iceberg_deletion_vector(delete_file)) { |
98 | 0 | RETURN_IF_ERROR(read_iceberg_deletion_vector(delete_file, options, rows_to_delete)); |
99 | 0 | continue; |
100 | 0 | } |
101 | 0 | RewriteBitmapVisitor visitor(referenced_data_file_path, rows_to_delete); |
102 | 0 | RETURN_IF_ERROR(read_iceberg_position_delete_file(delete_file, options, &visitor)); |
103 | 0 | } |
104 | 0 | return Status::OK(); |
105 | 0 | } |
106 | | |
107 | | } // namespace |
108 | | |
109 | | VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink, |
110 | | const VExprContextSPtrs& output_exprs, |
111 | | std::shared_ptr<Dependency> dep, |
112 | | std::shared_ptr<Dependency> fin_dep) |
113 | 18 | : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) { |
114 | 18 | DCHECK(_t_sink.__isset.iceberg_delete_sink); |
115 | 18 | } |
116 | | |
117 | 11 | Status VIcebergDeleteSink::init_properties(ObjectPool* pool) { |
118 | 11 | const auto& delete_sink = _t_sink.iceberg_delete_sink; |
119 | | |
120 | 11 | _delete_type = delete_sink.delete_type; |
121 | 11 | if (_delete_type != TFileContent::POSITION_DELETES) { |
122 | 1 | return Status::NotSupported("Iceberg delete only supports position delete files"); |
123 | 1 | } |
124 | | |
125 | | // Get file format settings |
126 | 10 | if (delete_sink.__isset.file_format) { |
127 | 10 | _file_format_type = delete_sink.file_format; |
128 | 10 | } |
129 | | |
130 | 10 | if (delete_sink.__isset.compress_type) { |
131 | 10 | _compress_type = delete_sink.compress_type; |
132 | 10 | } |
133 | | |
134 | | // Get output path and table location |
135 | 10 | if (delete_sink.__isset.output_path) { |
136 | 10 | _output_path = delete_sink.output_path; |
137 | 10 | } |
138 | | |
139 | 10 | if (delete_sink.__isset.table_location) { |
140 | 10 | _table_location = delete_sink.table_location; |
141 | 10 | } |
142 | | |
143 | | // Get Hadoop configuration |
144 | 10 | if (delete_sink.__isset.hadoop_config) { |
145 | 2 | _hadoop_conf.insert(delete_sink.hadoop_config.begin(), delete_sink.hadoop_config.end()); |
146 | 2 | } |
147 | | |
148 | 10 | if (delete_sink.__isset.file_type) { |
149 | 7 | _file_type = delete_sink.file_type; |
150 | 7 | } |
151 | | |
152 | 10 | if (delete_sink.__isset.broker_addresses) { |
153 | 0 | _broker_addresses.assign(delete_sink.broker_addresses.begin(), |
154 | 0 | delete_sink.broker_addresses.end()); |
155 | 0 | } |
156 | | |
157 | | // Get partition information |
158 | 10 | if (delete_sink.__isset.partition_spec_id) { |
159 | 7 | _partition_spec_id = delete_sink.partition_spec_id; |
160 | 7 | } |
161 | | |
162 | 10 | if (delete_sink.__isset.partition_data_json) { |
163 | 1 | _partition_data_json = delete_sink.partition_data_json; |
164 | 1 | } |
165 | | |
166 | 10 | if (delete_sink.__isset.format_version) { |
167 | 1 | _format_version = delete_sink.format_version; |
168 | 1 | } |
169 | | |
170 | | // for merge old deletion vector and old position delete to a new deletion vector. |
171 | 10 | if (_format_version >= 3 && delete_sink.__isset.rewritable_delete_file_sets) { |
172 | 0 | for (const auto& delete_file_set : delete_sink.rewritable_delete_file_sets) { |
173 | 0 | if (!delete_file_set.__isset.referenced_data_file_path || |
174 | 0 | !delete_file_set.__isset.delete_files || |
175 | 0 | delete_file_set.referenced_data_file_path.empty() || |
176 | 0 | delete_file_set.delete_files.empty()) { |
177 | 0 | continue; |
178 | 0 | } |
179 | 0 | _rewritable_delete_files.emplace(delete_file_set.referenced_data_file_path, |
180 | 0 | delete_file_set.delete_files); |
181 | 0 | } |
182 | 0 | } |
183 | | |
184 | 10 | return Status::OK(); |
185 | 11 | } |
186 | | |
187 | 3 | Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) { |
188 | 3 | _state = state; |
189 | | |
190 | | // Initialize counters |
191 | 3 | _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT); |
192 | 3 | _send_data_timer = ADD_TIMER(profile, "SendDataTime"); |
193 | 3 | _write_delete_files_timer = ADD_TIMER(profile, "WriteDeleteFilesTime"); |
194 | 3 | _delete_file_count_counter = ADD_COUNTER(profile, "DeleteFileCount", TUnit::UNIT); |
195 | 3 | _open_timer = ADD_TIMER(profile, "OpenTime"); |
196 | 3 | _close_timer = ADD_TIMER(profile, "CloseTime"); |
197 | | |
198 | 3 | SCOPED_TIMER(_open_timer); |
199 | | |
200 | 3 | if (_format_version < 3) { |
201 | 3 | RETURN_IF_ERROR(_init_position_delete_output_exprs()); |
202 | 3 | } |
203 | | |
204 | 3 | LOG(INFO) << fmt::format( |
205 | 3 | "VIcebergDeleteSink opened: delete_type={}, output_path={}, format_version={}", |
206 | 3 | to_string(_delete_type), _output_path, _format_version); |
207 | | |
208 | 3 | return Status::OK(); |
209 | 3 | } |
210 | | |
211 | 0 | Status VIcebergDeleteSink::write(RuntimeState* state, Block& block) { |
212 | 0 | SCOPED_TIMER(_send_data_timer); |
213 | |
|
214 | 0 | if (block.rows() == 0) { |
215 | 0 | return Status::OK(); |
216 | 0 | } |
217 | | |
218 | 0 | _row_count += block.rows(); |
219 | |
|
220 | 0 | if (_delete_type != TFileContent::POSITION_DELETES) { |
221 | 0 | return Status::NotSupported("Iceberg delete only supports position delete files"); |
222 | 0 | } |
223 | | |
224 | | // Extract $row_id column and group by file_path |
225 | 0 | RETURN_IF_ERROR(_collect_position_deletes(block, _file_deletions)); |
226 | | |
227 | 0 | if (_written_rows_counter) { |
228 | 0 | COUNTER_UPDATE(_written_rows_counter, block.rows()); |
229 | 0 | } |
230 | |
|
231 | 0 | return Status::OK(); |
232 | 0 | } |
233 | | |
234 | 2 | Status VIcebergDeleteSink::close(Status close_status) { |
235 | 2 | SCOPED_TIMER(_close_timer); |
236 | | |
237 | 2 | if (!close_status.ok()) { |
238 | 0 | LOG(WARNING) << fmt::format("VIcebergDeleteSink close with error: {}", |
239 | 0 | close_status.to_string()); |
240 | 0 | return close_status; |
241 | 0 | } |
242 | | |
243 | 2 | if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty()) { |
244 | 0 | SCOPED_TIMER(_write_delete_files_timer); |
245 | 0 | if (_format_version >= 3) { |
246 | 0 | RETURN_IF_ERROR(_write_deletion_vector_files(_file_deletions)); |
247 | 0 | } else { |
248 | 0 | RETURN_IF_ERROR(_write_position_delete_files(_file_deletions)); |
249 | 0 | } |
250 | 0 | } |
251 | | |
252 | | // Update counters |
253 | 2 | if (_delete_file_count_counter) { |
254 | 2 | COUNTER_UPDATE(_delete_file_count_counter, _delete_file_count); |
255 | 2 | } |
256 | | |
257 | 2 | LOG(INFO) << fmt::format("VIcebergDeleteSink closed: rows={}, delete_files={}", _row_count, |
258 | 2 | _delete_file_count); |
259 | | |
260 | 2 | if (_state != nullptr) { |
261 | 2 | for (const auto& commit_data : _commit_data_list) { |
262 | 0 | _state->add_iceberg_commit_datas(commit_data); |
263 | 0 | } |
264 | 2 | } |
265 | | |
266 | 2 | return Status::OK(); |
267 | 2 | } |
268 | | |
269 | 7 | int VIcebergDeleteSink::_get_row_id_column_index(const Block& block) { |
270 | | // Find __DORIS_ICEBERG_ROWID_COL__ column in block |
271 | 8 | for (size_t i = 0; i < block.columns(); ++i) { |
272 | 8 | const auto& col_name = block.get_by_position(i).name; |
273 | 8 | if (col_name == doris::BeConsts::ICEBERG_ROWID_COL) { |
274 | 7 | return static_cast<int>(i); |
275 | 7 | } |
276 | 8 | } |
277 | 0 | return -1; |
278 | 7 | } |
279 | | |
280 | | Status VIcebergDeleteSink::_collect_position_deletes( |
281 | 5 | const Block& block, std::map<std::string, IcebergFileDeletion>& file_deletions) { |
282 | | // Find row id column |
283 | 5 | int row_id_col_idx = _get_row_id_column_index(block); |
284 | 5 | if (row_id_col_idx < 0) { |
285 | 0 | return Status::InternalError( |
286 | 0 | "__DORIS_ICEBERG_ROWID_COL__ column not found in block for position delete"); |
287 | 0 | } |
288 | | |
289 | 5 | const auto& row_id_col = block.get_by_position(row_id_col_idx); |
290 | 5 | const IColumn* row_id_data = row_id_col.column.get(); |
291 | 5 | const IDataType* row_id_type = row_id_col.type.get(); |
292 | 5 | const auto* nullable_col = check_and_get_column<ColumnNullable>(row_id_data); |
293 | 5 | if (nullable_col != nullptr) { |
294 | 0 | row_id_data = nullable_col->get_nested_column_ptr().get(); |
295 | 0 | } |
296 | 5 | const auto* nullable_type = check_and_get_data_type<DataTypeNullable>(row_id_type); |
297 | 5 | if (nullable_type != nullptr) { |
298 | 0 | row_id_type = nullable_type->get_nested_type().get(); |
299 | 0 | } |
300 | 5 | const auto* struct_col = check_and_get_column<ColumnStruct>(row_id_data); |
301 | 5 | const auto* struct_type = check_and_get_data_type<DataTypeStruct>(row_id_type); |
302 | 5 | if (!struct_col || !struct_type) { |
303 | 0 | return Status::InternalError("__DORIS_ICEBERG_ROWID_COL__ column is not a struct column"); |
304 | 0 | } |
305 | | |
306 | | // __DORIS_ICEBERG_ROWID_COL__ struct: |
307 | | // (file_path: STRING, row_position: BIGINT, partition_spec_id: INT, partition_data: STRING) |
308 | 5 | size_t field_count = struct_col->tuple_size(); |
309 | 5 | if (field_count < 2) { |
310 | 0 | return Status::InternalError( |
311 | 0 | "__DORIS_ICEBERG_ROWID_COL__ struct must have at least 2 fields " |
312 | 0 | "(file_path, row_position)"); |
313 | 0 | } |
314 | | |
315 | 16 | auto normalize = [](const std::string& name) { return doris::to_lower(name); }; |
316 | | |
317 | 5 | int file_path_idx = -1; |
318 | 5 | int row_position_idx = -1; |
319 | 5 | int spec_id_idx = -1; |
320 | 5 | int partition_data_idx = -1; |
321 | 5 | const auto& field_names = struct_type->get_element_names(); |
322 | 21 | for (size_t i = 0; i < field_names.size(); ++i) { |
323 | 16 | std::string name = normalize(field_names[i]); |
324 | 16 | if (file_path_idx < 0 && name == "file_path") { |
325 | 5 | file_path_idx = static_cast<int>(i); |
326 | 11 | } else if (row_position_idx < 0 && name == "row_position") { |
327 | 4 | row_position_idx = static_cast<int>(i); |
328 | 7 | } else if (spec_id_idx < 0 && name == "partition_spec_id") { |
329 | 2 | spec_id_idx = static_cast<int>(i); |
330 | 5 | } else if (partition_data_idx < 0 && name == "partition_data") { |
331 | 2 | partition_data_idx = static_cast<int>(i); |
332 | 2 | } |
333 | 16 | } |
334 | | |
335 | 5 | if (file_path_idx < 0 || row_position_idx < 0) { |
336 | 1 | return Status::InternalError( |
337 | 1 | "__DORIS_ICEBERG_ROWID_COL__ must contain standard fields file_path and " |
338 | 1 | "row_position"); |
339 | 1 | } |
340 | 4 | if (field_count >= 3 && spec_id_idx < 0) { |
341 | 0 | return Status::InternalError( |
342 | 0 | "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_spec_id"); |
343 | 0 | } |
344 | 4 | if (field_count >= 4 && partition_data_idx < 0) { |
345 | 0 | return Status::InternalError( |
346 | 0 | "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_data"); |
347 | 0 | } |
348 | | |
349 | 4 | const auto* file_path_col = check_and_get_column<ColumnString>( |
350 | 4 | remove_nullable(struct_col->get_column_ptr(file_path_idx)).get()); |
351 | 4 | const auto* row_position_col = check_and_get_column<ColumnVector<TYPE_BIGINT>>( |
352 | 4 | remove_nullable(struct_col->get_column_ptr(row_position_idx)).get()); |
353 | | |
354 | 4 | if (!file_path_col || !row_position_col) { |
355 | 0 | return Status::InternalError( |
356 | 0 | "__DORIS_ICEBERG_ROWID_COL__ struct fields have incorrect types"); |
357 | 0 | } |
358 | | |
359 | 4 | const ColumnVector<TYPE_INT>* spec_id_col = nullptr; |
360 | 4 | const ColumnString* partition_data_col = nullptr; |
361 | 4 | if (spec_id_idx >= 0 && spec_id_idx < static_cast<int>(field_count)) { |
362 | 2 | spec_id_col = check_and_get_column<ColumnVector<TYPE_INT>>( |
363 | 2 | remove_nullable(struct_col->get_column_ptr(spec_id_idx)).get()); |
364 | 2 | if (!spec_id_col) { |
365 | 0 | return Status::InternalError( |
366 | 0 | "__DORIS_ICEBERG_ROWID_COL__ partition_spec_id has incorrect type"); |
367 | 0 | } |
368 | 2 | } |
369 | 4 | if (partition_data_idx >= 0 && partition_data_idx < static_cast<int>(field_count)) { |
370 | 2 | partition_data_col = check_and_get_column<ColumnString>( |
371 | 2 | remove_nullable(struct_col->get_column_ptr(partition_data_idx)).get()); |
372 | 2 | if (!partition_data_col) { |
373 | 0 | return Status::InternalError( |
374 | 0 | "__DORIS_ICEBERG_ROWID_COL__ partition_data has incorrect type"); |
375 | 0 | } |
376 | 2 | } |
377 | | |
378 | | // Group by file_path using roaring bitmap |
379 | 9 | for (size_t i = 0; i < block.rows(); ++i) { |
380 | 6 | std::string file_path = file_path_col->get_data_at(i).to_string(); |
381 | 6 | int64_t row_position = row_position_col->get_element(i); |
382 | 6 | if (row_position < 0) { |
383 | 1 | return Status::InternalError("Invalid row_position {} in row_id column", row_position); |
384 | 1 | } |
385 | | |
386 | 5 | int32_t partition_spec_id = _partition_spec_id; |
387 | 5 | std::string partition_data_json = _partition_data_json; |
388 | 5 | if (spec_id_col != nullptr) { |
389 | 4 | partition_spec_id = spec_id_col->get_element(i); |
390 | 4 | } |
391 | 5 | if (partition_data_col != nullptr) { |
392 | 4 | partition_data_json = partition_data_col->get_data_at(i).to_string(); |
393 | 4 | } |
394 | | |
395 | 5 | auto [iter, inserted] = file_deletions.emplace( |
396 | 5 | file_path, IcebergFileDeletion(partition_spec_id, partition_data_json)); |
397 | 5 | if (!inserted) { |
398 | 1 | if (iter->second.partition_spec_id != partition_spec_id || |
399 | 1 | iter->second.partition_data_json != partition_data_json) { |
400 | 0 | LOG(WARNING) << fmt::format( |
401 | 0 | "Mismatched partition info for file {}, existing spec_id={}, data={}, " |
402 | 0 | "new spec_id={}, data={}", |
403 | 0 | file_path, iter->second.partition_spec_id, iter->second.partition_data_json, |
404 | 0 | partition_spec_id, partition_data_json); |
405 | 0 | } |
406 | 1 | } |
407 | 5 | iter->second.rows_to_delete.add(static_cast<uint64_t>(row_position)); |
408 | 5 | } |
409 | | |
410 | 3 | return Status::OK(); |
411 | 4 | } |
412 | | |
413 | | Status VIcebergDeleteSink::_write_position_delete_files( |
414 | 0 | const std::map<std::string, IcebergFileDeletion>& file_deletions) { |
415 | 0 | constexpr size_t kBatchSize = 4096; |
416 | 0 | for (const auto& [data_file_path, deletion] : file_deletions) { |
417 | 0 | if (deletion.rows_to_delete.isEmpty()) { |
418 | 0 | continue; |
419 | 0 | } |
420 | | // Generate unique delete file path |
421 | 0 | std::string delete_file_path = _generate_delete_file_path(data_file_path); |
422 | | |
423 | | // Create delete file writer |
424 | 0 | auto writer = VIcebergDeleteFileWriterFactory::create_writer( |
425 | 0 | TFileContent::POSITION_DELETES, delete_file_path, _file_format_type, |
426 | 0 | _compress_type); |
427 | | |
428 | | // Build column names for position delete |
429 | 0 | std::vector<std::string> column_names = {"file_path", "pos"}; |
430 | |
|
431 | 0 | if (_position_delete_output_expr_ctxs.empty()) { |
432 | 0 | RETURN_IF_ERROR(_init_position_delete_output_exprs()); |
433 | 0 | } |
434 | | |
435 | | // Open writer |
436 | 0 | RETURN_IF_ERROR(writer->open(_state, _state->runtime_profile(), |
437 | 0 | _position_delete_output_expr_ctxs, column_names, _hadoop_conf, |
438 | 0 | _file_type, _broker_addresses)); |
439 | | |
440 | | // Build block with (file_path, pos) columns |
441 | 0 | std::vector<int64_t> positions; |
442 | 0 | positions.reserve(kBatchSize); |
443 | 0 | for (auto it = deletion.rows_to_delete.begin(); it != deletion.rows_to_delete.end(); ++it) { |
444 | 0 | positions.push_back(static_cast<int64_t>(*it)); |
445 | 0 | if (positions.size() >= kBatchSize) { |
446 | 0 | Block delete_block; |
447 | 0 | RETURN_IF_ERROR( |
448 | 0 | _build_position_delete_block(data_file_path, positions, delete_block)); |
449 | 0 | RETURN_IF_ERROR(writer->write(delete_block)); |
450 | 0 | positions.clear(); |
451 | 0 | } |
452 | 0 | } |
453 | 0 | if (!positions.empty()) { |
454 | 0 | Block delete_block; |
455 | 0 | RETURN_IF_ERROR(_build_position_delete_block(data_file_path, positions, delete_block)); |
456 | 0 | RETURN_IF_ERROR(writer->write(delete_block)); |
457 | 0 | } |
458 | | |
459 | | // Set partition info on writer before close |
460 | 0 | writer->set_partition_info(deletion.partition_spec_id, deletion.partition_data_json); |
461 | | |
462 | | // Close writer and collect commit data |
463 | 0 | TIcebergCommitData commit_data; |
464 | 0 | RETURN_IF_ERROR(writer->close(commit_data)); |
465 | | |
466 | | // Set referenced data file path |
467 | 0 | commit_data.__set_referenced_data_file_path(data_file_path); |
468 | |
|
469 | 0 | _commit_data_list.push_back(commit_data); |
470 | 0 | _delete_file_count++; |
471 | |
|
472 | 0 | VLOG(1) << fmt::format("Written position delete file: path={}, rows={}, referenced_file={}", |
473 | 0 | delete_file_path, commit_data.row_count, data_file_path); |
474 | 0 | } |
475 | | |
476 | 0 | return Status::OK(); |
477 | 0 | } |
478 | | |
479 | 3 | Status VIcebergDeleteSink::_init_position_delete_output_exprs() { |
480 | 3 | if (!_position_delete_output_expr_ctxs.empty()) { |
481 | 0 | return Status::OK(); |
482 | 0 | } |
483 | | |
484 | 3 | std::vector<TExpr> texprs; |
485 | 3 | texprs.reserve(2); |
486 | | |
487 | 3 | std::string empty_string; |
488 | 3 | TExprNode file_path_node = |
489 | 3 | create_texpr_node_from(&empty_string, PrimitiveType::TYPE_STRING, 0, 0); |
490 | 3 | file_path_node.__set_num_children(0); |
491 | 3 | file_path_node.__set_output_scale(0); |
492 | 3 | file_path_node.__set_is_nullable(false); |
493 | 3 | TExpr file_path_expr; |
494 | 3 | file_path_expr.nodes.emplace_back(std::move(file_path_node)); |
495 | 3 | texprs.emplace_back(std::move(file_path_expr)); |
496 | | |
497 | 3 | int64_t zero = 0; |
498 | 3 | TExprNode pos_node = create_texpr_node_from(&zero, PrimitiveType::TYPE_BIGINT, 0, 0); |
499 | 3 | pos_node.__set_num_children(0); |
500 | 3 | pos_node.__set_output_scale(0); |
501 | 3 | pos_node.__set_is_nullable(false); |
502 | 3 | TExpr pos_expr; |
503 | 3 | pos_expr.nodes.emplace_back(std::move(pos_node)); |
504 | 3 | texprs.emplace_back(std::move(pos_expr)); |
505 | | |
506 | 3 | RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, _position_delete_output_expr_ctxs)); |
507 | 3 | return Status::OK(); |
508 | 3 | } |
509 | | |
510 | | Status VIcebergDeleteSink::_build_position_delete_block(const std::string& file_path, |
511 | | const std::vector<int64_t>& positions, |
512 | 1 | Block& output_block) { |
513 | | // Create file_path column (repeated for each position) |
514 | 1 | auto file_path_col = ColumnString::create(); |
515 | 5 | for (size_t i = 0; i < positions.size(); ++i) { |
516 | 4 | file_path_col->insert_data(file_path.data(), file_path.size()); |
517 | 4 | } |
518 | | |
519 | | // Create pos column |
520 | 1 | auto pos_col = ColumnVector<TYPE_BIGINT>::create(); |
521 | 1 | pos_col->get_data().assign(positions.begin(), positions.end()); |
522 | | |
523 | | // Build block |
524 | 1 | output_block.insert(ColumnWithTypeAndName(std::move(file_path_col), |
525 | 1 | std::make_shared<DataTypeString>(), "file_path")); |
526 | 1 | output_block.insert( |
527 | 1 | ColumnWithTypeAndName(std::move(pos_col), std::make_shared<DataTypeInt64>(), "pos")); |
528 | | |
529 | 1 | return Status::OK(); |
530 | 1 | } |
531 | | |
532 | 1 | std::string VIcebergDeleteSink::_get_file_extension() const { |
533 | 1 | std::string compress_name; |
534 | 1 | switch (_compress_type) { |
535 | 1 | case TFileCompressType::SNAPPYBLOCK: { |
536 | 1 | compress_name = ".snappy"; |
537 | 1 | break; |
538 | 0 | } |
539 | 0 | case TFileCompressType::ZLIB: { |
540 | 0 | compress_name = ".zlib"; |
541 | 0 | break; |
542 | 0 | } |
543 | 0 | case TFileCompressType::ZSTD: { |
544 | 0 | compress_name = ".zstd"; |
545 | 0 | break; |
546 | 0 | } |
547 | 0 | default: { |
548 | 0 | compress_name = ""; |
549 | 0 | break; |
550 | 0 | } |
551 | 1 | } |
552 | | |
553 | 1 | std::string file_format_name; |
554 | 1 | switch (_file_format_type) { |
555 | 1 | case TFileFormatType::FORMAT_PARQUET: { |
556 | 1 | file_format_name = ".parquet"; |
557 | 1 | break; |
558 | 0 | } |
559 | 0 | case TFileFormatType::FORMAT_ORC: { |
560 | 0 | file_format_name = ".orc"; |
561 | 0 | break; |
562 | 0 | } |
563 | 0 | default: { |
564 | 0 | file_format_name = ""; |
565 | 0 | break; |
566 | 0 | } |
567 | 1 | } |
568 | 1 | return fmt::format("{}{}", compress_name, file_format_name); |
569 | 1 | } |
570 | | |
571 | | Status VIcebergDeleteSink::_write_deletion_vector_files( |
572 | 1 | const std::map<std::string, IcebergFileDeletion>& file_deletions) { |
573 | 1 | std::vector<DeletionVectorBlob> blobs; |
574 | 2 | for (const auto& [data_file_path, deletion] : file_deletions) { |
575 | 2 | if (deletion.rows_to_delete.isEmpty()) { |
576 | 0 | continue; |
577 | 0 | } |
578 | 2 | roaring::Roaring64Map merged_rows = deletion.rows_to_delete; |
579 | 2 | DeletionVectorBlob blob; |
580 | 2 | blob.delete_count = static_cast<int64_t>(merged_rows.cardinality()); |
581 | 2 | auto previous_delete_it = _rewritable_delete_files.find(data_file_path); |
582 | 2 | if (previous_delete_it != _rewritable_delete_files.end()) { |
583 | 0 | roaring::Roaring64Map previous_rows; |
584 | 0 | RETURN_IF_ERROR(load_rewritable_delete_rows( |
585 | 0 | _state, _state->runtime_profile(), data_file_path, previous_delete_it->second, |
586 | 0 | _hadoop_conf, _file_type, _broker_addresses, &previous_rows)); |
587 | 0 | merged_rows |= previous_rows; |
588 | 0 | } |
589 | | |
590 | 2 | size_t bitmap_size = merged_rows.getSizeInBytes(); |
591 | 2 | blob.referenced_data_file = data_file_path; |
592 | 2 | blob.partition_spec_id = deletion.partition_spec_id; |
593 | 2 | blob.partition_data_json = deletion.partition_data_json; |
594 | 2 | blob.merged_count = static_cast<int64_t>(merged_rows.cardinality()); |
595 | 2 | blob.content_size_in_bytes = static_cast<int64_t>(4 + 4 + bitmap_size + 4); |
596 | 2 | blob.blob_data.resize(static_cast<size_t>(blob.content_size_in_bytes)); |
597 | 2 | merged_rows.write(blob.blob_data.data() + 8); |
598 | | |
599 | 2 | uint32_t total_length = static_cast<uint32_t>(4 + bitmap_size); |
600 | 2 | BigEndian::Store32(blob.blob_data.data(), total_length); |
601 | | |
602 | 2 | constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
603 | 2 | memcpy(blob.blob_data.data() + 4, DV_MAGIC, 4); |
604 | | |
605 | 2 | uint32_t crc = static_cast<uint32_t>( |
606 | 2 | ::crc32(0, reinterpret_cast<const Bytef*>(blob.blob_data.data() + 4), |
607 | 2 | 4 + (uInt)bitmap_size)); |
608 | 2 | BigEndian::Store32(blob.blob_data.data() + 8 + bitmap_size, crc); |
609 | 2 | blobs.emplace_back(std::move(blob)); |
610 | 2 | } |
611 | | |
612 | 1 | if (blobs.empty()) { |
613 | 0 | return Status::OK(); |
614 | 0 | } |
615 | | |
616 | 1 | std::string puffin_path = _generate_puffin_file_path(); |
617 | 1 | int64_t puffin_file_size = 0; |
618 | 1 | RETURN_IF_ERROR(_write_puffin_file(puffin_path, &blobs, &puffin_file_size)); |
619 | | |
620 | 2 | for (const auto& blob : blobs) { |
621 | 2 | TIcebergCommitData commit_data; |
622 | 2 | commit_data.__set_file_path(puffin_path); |
623 | 2 | commit_data.__set_row_count(blob.merged_count); |
624 | 2 | commit_data.__set_affected_rows(blob.delete_count); |
625 | 2 | commit_data.__set_file_size(puffin_file_size); |
626 | 2 | commit_data.__set_file_content(TFileContent::DELETION_VECTOR); |
627 | 2 | commit_data.__set_content_offset(blob.content_offset); |
628 | 2 | commit_data.__set_content_size_in_bytes(blob.content_size_in_bytes); |
629 | 2 | commit_data.__set_referenced_data_file_path(blob.referenced_data_file); |
630 | 2 | if (blob.partition_spec_id != 0 || !blob.partition_data_json.empty()) { |
631 | 2 | commit_data.__set_partition_spec_id(blob.partition_spec_id); |
632 | 2 | commit_data.__set_partition_data_json(blob.partition_data_json); |
633 | 2 | } |
634 | | |
635 | 2 | _commit_data_list.push_back(commit_data); |
636 | 2 | _delete_file_count++; |
637 | 2 | } |
638 | 1 | return Status::OK(); |
639 | 1 | } |
640 | | |
641 | | Status VIcebergDeleteSink::_write_puffin_file(const std::string& puffin_path, |
642 | | std::vector<DeletionVectorBlob>* blobs, |
643 | 1 | int64_t* out_file_size) { |
644 | 1 | DCHECK(blobs != nullptr); |
645 | 1 | DCHECK(!blobs->empty()); |
646 | | |
647 | 1 | io::FSPropertiesRef fs_properties(_file_type); |
648 | 1 | fs_properties.properties = &_hadoop_conf; |
649 | 1 | if (!_broker_addresses.empty()) { |
650 | 0 | fs_properties.broker_addresses = &_broker_addresses; |
651 | 0 | } |
652 | 1 | io::FileDescription file_description = {.path = puffin_path, .fs_name {}}; |
653 | 1 | auto fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description)); |
654 | 1 | io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false}; |
655 | 1 | io::FileWriterPtr file_writer; |
656 | 1 | RETURN_IF_ERROR(fs->create_file(file_description.path, &file_writer, &file_writer_options)); |
657 | | |
658 | 1 | constexpr char PUFFIN_MAGIC[] = {'\x50', '\x46', '\x41', '\x31'}; |
659 | 1 | RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4))); |
660 | 1 | int64_t current_offset = 4; |
661 | 2 | for (auto& blob : *blobs) { |
662 | 2 | blob.content_offset = current_offset; |
663 | 2 | RETURN_IF_ERROR(file_writer->append(Slice( |
664 | 2 | reinterpret_cast<const uint8_t*>(blob.blob_data.data()), blob.blob_data.size()))); |
665 | 2 | current_offset += static_cast<int64_t>(blob.blob_data.size()); |
666 | 2 | } |
667 | 1 | RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4))); |
668 | | |
669 | 1 | std::string footer_json = _build_puffin_footer_json(*blobs); |
670 | 1 | RETURN_IF_ERROR(file_writer->append( |
671 | 1 | Slice(reinterpret_cast<const uint8_t*>(footer_json.data()), footer_json.size()))); |
672 | | |
673 | 1 | char footer_size_buf[4]; |
674 | 1 | LittleEndian::Store32(footer_size_buf, static_cast<uint32_t>(footer_json.size())); |
675 | 1 | RETURN_IF_ERROR(file_writer->append( |
676 | 1 | Slice(reinterpret_cast<const uint8_t*>(footer_size_buf), sizeof(footer_size_buf)))); |
677 | | |
678 | 1 | char flags[4] = {0, 0, 0, 0}; |
679 | 1 | RETURN_IF_ERROR( |
680 | 1 | file_writer->append(Slice(reinterpret_cast<const uint8_t*>(flags), sizeof(flags)))); |
681 | 1 | RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4))); |
682 | 1 | RETURN_IF_ERROR(file_writer->close()); |
683 | | |
684 | 1 | *out_file_size = current_offset + 4 + static_cast<int64_t>(footer_json.size()) + 4 + 4 + 4; |
685 | 1 | return Status::OK(); |
686 | 1 | } |
687 | | |
688 | | std::string VIcebergDeleteSink::_build_puffin_footer_json( |
689 | 1 | const std::vector<DeletionVectorBlob>& blobs) { |
690 | 1 | rapidjson::StringBuffer buffer; |
691 | 1 | rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
692 | 1 | writer.StartObject(); |
693 | 1 | writer.Key("blobs"); |
694 | 1 | writer.StartArray(); |
695 | 2 | for (const auto& blob : blobs) { |
696 | 2 | writer.StartObject(); |
697 | 2 | writer.Key("type"); |
698 | 2 | writer.String("deletion-vector-v1"); |
699 | 2 | writer.Key("fields"); |
700 | 2 | writer.StartArray(); |
701 | 2 | writer.EndArray(); |
702 | 2 | writer.Key("snapshot-id"); |
703 | 2 | writer.Int64(-1); |
704 | 2 | writer.Key("sequence-number"); |
705 | 2 | writer.Int64(-1); |
706 | 2 | writer.Key("offset"); |
707 | 2 | writer.Int64(blob.content_offset); |
708 | 2 | writer.Key("length"); |
709 | 2 | writer.Int64(blob.content_size_in_bytes); |
710 | 2 | writer.Key("properties"); |
711 | 2 | writer.StartObject(); |
712 | 2 | writer.Key("referenced-data-file"); |
713 | 2 | writer.String(blob.referenced_data_file.c_str(), |
714 | 2 | static_cast<rapidjson::SizeType>(blob.referenced_data_file.size())); |
715 | 2 | std::string cardinality = std::to_string(blob.merged_count); |
716 | 2 | writer.Key("cardinality"); |
717 | 2 | writer.String(cardinality.c_str(), static_cast<rapidjson::SizeType>(cardinality.size())); |
718 | 2 | writer.EndObject(); |
719 | 2 | writer.EndObject(); |
720 | 2 | } |
721 | 1 | writer.EndArray(); |
722 | 1 | writer.Key("properties"); |
723 | 1 | writer.StartObject(); |
724 | 1 | writer.Key("created-by"); |
725 | 1 | writer.String("doris-puffin-v1"); |
726 | 1 | writer.EndObject(); |
727 | 1 | writer.EndObject(); |
728 | 1 | return {buffer.GetString(), buffer.GetSize()}; |
729 | 1 | } |
730 | | |
731 | | std::string VIcebergDeleteSink::_generate_delete_file_path( |
732 | 1 | const std::string& referenced_data_file) { |
733 | | // Generate unique delete file name using UUID |
734 | 1 | std::string uuid = generate_uuid_string(); |
735 | 1 | std::string file_name; |
736 | | |
737 | 1 | std::string file_extension = _get_file_extension(); |
738 | 1 | file_name = |
739 | 1 | fmt::format("delete_pos_{}_{}{}", uuid, |
740 | 1 | std::hash<std::string> {}(referenced_data_file) % 10000000, file_extension); |
741 | | |
742 | | // Combine with output path or table location |
743 | 1 | std::string base_path = _output_path.empty() ? _table_location : _output_path; |
744 | | |
745 | | // Ensure base path ends with / |
746 | 1 | if (!base_path.empty() && base_path.back() != '/') { |
747 | 1 | base_path += '/'; |
748 | 1 | } |
749 | | |
750 | | // Delete files are data files in Iceberg, write under data location |
751 | 1 | return fmt::format("{}{}", base_path, file_name); |
752 | 1 | } |
753 | | |
754 | 1 | std::string VIcebergDeleteSink::_generate_puffin_file_path() { |
755 | 1 | std::string uuid = generate_uuid_string(); |
756 | 1 | std::string file_name = fmt::format("delete_dv_{}.puffin", uuid); |
757 | 1 | std::string base_path = _output_path.empty() ? _table_location : _output_path; |
758 | 1 | if (!base_path.empty() && base_path.back() != '/') { |
759 | 1 | base_path += '/'; |
760 | 1 | } |
761 | 1 | return fmt::format("{}{}", base_path, file_name); |
762 | 1 | } |
763 | | |
764 | | } // namespace doris |