Coverage Report

Created: 2026-03-30 11:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/viceberg_delete_sink.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/viceberg_delete_sink.h"
19
20
#include <fmt/format.h>
21
#include <rapidjson/stringbuffer.h>
22
#include <rapidjson/writer.h>
23
#include <zlib.h>
24
25
#include "common/logging.h"
26
#include "core/block/column_with_type_and_name.h"
27
#include "core/column/column_nullable.h"
28
#include "core/column/column_string.h"
29
#include "core/column/column_struct.h"
30
#include "core/column/column_vector.h"
31
#include "core/data_type/data_type_factory.hpp"
32
#include "core/data_type/data_type_nullable.h"
33
#include "core/data_type/data_type_number.h"
34
#include "core/data_type/data_type_string.h"
35
#include "core/data_type/data_type_struct.h"
36
#include "exec/common/endian.h"
37
#include "exprs/vexpr.h"
38
#include "format/table/iceberg_delete_file_reader_helper.h"
39
#include "format/transformer/vfile_format_transformer.h"
40
#include "io/file_factory.h"
41
#include "runtime/runtime_state.h"
42
#include "util/slice.h"
43
#include "util/string_util.h"
44
#include "util/uid_util.h"
45
46
namespace doris {
47
48
namespace {
49
50
class RewriteBitmapVisitor final : public IcebergPositionDeleteVisitor {
51
public:
52
    RewriteBitmapVisitor(const std::string& referenced_data_file_path,
53
                         roaring::Roaring64Map* rows_to_delete)
54
0
            : _referenced_data_file_path(referenced_data_file_path),
55
0
              _rows_to_delete(rows_to_delete) {}
56
57
0
    Status visit(const std::string& file_path, int64_t pos) override {
58
0
        if (_rows_to_delete == nullptr) {
59
0
            return Status::InvalidArgument("rows_to_delete is null");
60
0
        }
61
0
        if (file_path == _referenced_data_file_path) {
62
0
            _rows_to_delete->add(static_cast<uint64_t>(pos));
63
0
        }
64
0
        return Status::OK();
65
0
    }
66
67
private:
68
    const std::string& _referenced_data_file_path;
69
    roaring::Roaring64Map* _rows_to_delete;
70
};
71
72
Status load_rewritable_delete_rows(RuntimeState* state, RuntimeProfile* profile,
73
                                   const std::string& referenced_data_file_path,
74
                                   const std::vector<TIcebergDeleteFileDesc>& delete_files,
75
                                   const std::map<std::string, std::string>& hadoop_conf,
76
                                   TFileType::type file_type,
77
                                   const std::vector<TNetworkAddress>& broker_addresses,
78
0
                                   roaring::Roaring64Map* rows_to_delete) {
79
0
    if (rows_to_delete == nullptr) {
80
0
        return Status::InvalidArgument("rows_to_delete is null");
81
0
    }
82
0
    if (state == nullptr || profile == nullptr || delete_files.empty()) {
83
0
        return Status::OK();
84
0
    }
85
86
0
    TFileScanRangeParams params =
87
0
            build_iceberg_delete_scan_range_params(hadoop_conf, file_type, broker_addresses);
88
0
    IcebergDeleteFileIOContext delete_file_io_ctx(state);
89
0
    IcebergDeleteFileReaderOptions options;
90
0
    options.state = state;
91
0
    options.profile = profile;
92
0
    options.scan_params = &params;
93
0
    options.io_ctx = &delete_file_io_ctx.io_ctx;
94
0
    options.batch_size = 102400;
95
96
0
    for (const auto& delete_file : delete_files) {
97
0
        if (is_iceberg_deletion_vector(delete_file)) {
98
0
            RETURN_IF_ERROR(read_iceberg_deletion_vector(delete_file, options, rows_to_delete));
99
0
            continue;
100
0
        }
101
0
        RewriteBitmapVisitor visitor(referenced_data_file_path, rows_to_delete);
102
0
        RETURN_IF_ERROR(read_iceberg_position_delete_file(delete_file, options, &visitor));
103
0
    }
104
0
    return Status::OK();
105
0
}
106
107
} // namespace
108
109
VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink,
110
                                       const VExprContextSPtrs& output_exprs,
111
                                       std::shared_ptr<Dependency> dep,
112
                                       std::shared_ptr<Dependency> fin_dep)
113
18
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
114
18
    DCHECK(_t_sink.__isset.iceberg_delete_sink);
115
18
}
116
117
11
Status VIcebergDeleteSink::init_properties(ObjectPool* pool) {
118
11
    const auto& delete_sink = _t_sink.iceberg_delete_sink;
119
120
11
    _delete_type = delete_sink.delete_type;
121
11
    if (_delete_type != TFileContent::POSITION_DELETES) {
122
1
        return Status::NotSupported("Iceberg delete only supports position delete files");
123
1
    }
124
125
    // Get file format settings
126
10
    if (delete_sink.__isset.file_format) {
127
10
        _file_format_type = delete_sink.file_format;
128
10
    }
129
130
10
    if (delete_sink.__isset.compress_type) {
131
10
        _compress_type = delete_sink.compress_type;
132
10
    }
133
134
    // Get output path and table location
135
10
    if (delete_sink.__isset.output_path) {
136
10
        _output_path = delete_sink.output_path;
137
10
    }
138
139
10
    if (delete_sink.__isset.table_location) {
140
10
        _table_location = delete_sink.table_location;
141
10
    }
142
143
    // Get Hadoop configuration
144
10
    if (delete_sink.__isset.hadoop_config) {
145
2
        _hadoop_conf.insert(delete_sink.hadoop_config.begin(), delete_sink.hadoop_config.end());
146
2
    }
147
148
10
    if (delete_sink.__isset.file_type) {
149
7
        _file_type = delete_sink.file_type;
150
7
    }
151
152
10
    if (delete_sink.__isset.broker_addresses) {
153
0
        _broker_addresses.assign(delete_sink.broker_addresses.begin(),
154
0
                                 delete_sink.broker_addresses.end());
155
0
    }
156
157
    // Get partition information
158
10
    if (delete_sink.__isset.partition_spec_id) {
159
7
        _partition_spec_id = delete_sink.partition_spec_id;
160
7
    }
161
162
10
    if (delete_sink.__isset.partition_data_json) {
163
1
        _partition_data_json = delete_sink.partition_data_json;
164
1
    }
165
166
10
    if (delete_sink.__isset.format_version) {
167
1
        _format_version = delete_sink.format_version;
168
1
    }
169
170
    // for merge old deletion vector and old position delete to a new deletion vector.
171
10
    if (_format_version >= 3 && delete_sink.__isset.rewritable_delete_file_sets) {
172
0
        for (const auto& delete_file_set : delete_sink.rewritable_delete_file_sets) {
173
0
            if (!delete_file_set.__isset.referenced_data_file_path ||
174
0
                !delete_file_set.__isset.delete_files ||
175
0
                delete_file_set.referenced_data_file_path.empty() ||
176
0
                delete_file_set.delete_files.empty()) {
177
0
                continue;
178
0
            }
179
0
            _rewritable_delete_files.emplace(delete_file_set.referenced_data_file_path,
180
0
                                             delete_file_set.delete_files);
181
0
        }
182
0
    }
183
184
10
    return Status::OK();
185
11
}
186
187
3
Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) {
188
3
    _state = state;
189
190
    // Initialize counters
191
3
    _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
192
3
    _send_data_timer = ADD_TIMER(profile, "SendDataTime");
193
3
    _write_delete_files_timer = ADD_TIMER(profile, "WriteDeleteFilesTime");
194
3
    _delete_file_count_counter = ADD_COUNTER(profile, "DeleteFileCount", TUnit::UNIT);
195
3
    _open_timer = ADD_TIMER(profile, "OpenTime");
196
3
    _close_timer = ADD_TIMER(profile, "CloseTime");
197
198
3
    SCOPED_TIMER(_open_timer);
199
200
3
    if (_format_version < 3) {
201
3
        RETURN_IF_ERROR(_init_position_delete_output_exprs());
202
3
    }
203
204
3
    LOG(INFO) << fmt::format(
205
3
            "VIcebergDeleteSink opened: delete_type={}, output_path={}, format_version={}",
206
3
            to_string(_delete_type), _output_path, _format_version);
207
208
3
    return Status::OK();
209
3
}
210
211
0
Status VIcebergDeleteSink::write(RuntimeState* state, Block& block) {
212
0
    SCOPED_TIMER(_send_data_timer);
213
214
0
    if (block.rows() == 0) {
215
0
        return Status::OK();
216
0
    }
217
218
0
    _row_count += block.rows();
219
220
0
    if (_delete_type != TFileContent::POSITION_DELETES) {
221
0
        return Status::NotSupported("Iceberg delete only supports position delete files");
222
0
    }
223
224
    // Extract $row_id column and group by file_path
225
0
    RETURN_IF_ERROR(_collect_position_deletes(block, _file_deletions));
226
227
0
    if (_written_rows_counter) {
228
0
        COUNTER_UPDATE(_written_rows_counter, block.rows());
229
0
    }
230
231
0
    return Status::OK();
232
0
}
233
234
2
Status VIcebergDeleteSink::close(Status close_status) {
235
2
    SCOPED_TIMER(_close_timer);
236
237
2
    if (!close_status.ok()) {
238
0
        LOG(WARNING) << fmt::format("VIcebergDeleteSink close with error: {}",
239
0
                                    close_status.to_string());
240
0
        return close_status;
241
0
    }
242
243
2
    if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty()) {
244
0
        SCOPED_TIMER(_write_delete_files_timer);
245
0
        if (_format_version >= 3) {
246
0
            RETURN_IF_ERROR(_write_deletion_vector_files(_file_deletions));
247
0
        } else {
248
0
            RETURN_IF_ERROR(_write_position_delete_files(_file_deletions));
249
0
        }
250
0
    }
251
252
    // Update counters
253
2
    if (_delete_file_count_counter) {
254
2
        COUNTER_UPDATE(_delete_file_count_counter, _delete_file_count);
255
2
    }
256
257
2
    LOG(INFO) << fmt::format("VIcebergDeleteSink closed: rows={}, delete_files={}", _row_count,
258
2
                             _delete_file_count);
259
260
2
    if (_state != nullptr) {
261
2
        for (const auto& commit_data : _commit_data_list) {
262
0
            _state->add_iceberg_commit_datas(commit_data);
263
0
        }
264
2
    }
265
266
2
    return Status::OK();
267
2
}
268
269
7
int VIcebergDeleteSink::_get_row_id_column_index(const Block& block) {
270
    // Find __DORIS_ICEBERG_ROWID_COL__ column in block
271
8
    for (size_t i = 0; i < block.columns(); ++i) {
272
8
        const auto& col_name = block.get_by_position(i).name;
273
8
        if (col_name == doris::BeConsts::ICEBERG_ROWID_COL) {
274
7
            return static_cast<int>(i);
275
7
        }
276
8
    }
277
0
    return -1;
278
7
}
279
280
Status VIcebergDeleteSink::_collect_position_deletes(
281
5
        const Block& block, std::map<std::string, IcebergFileDeletion>& file_deletions) {
282
    // Find row id column
283
5
    int row_id_col_idx = _get_row_id_column_index(block);
284
5
    if (row_id_col_idx < 0) {
285
0
        return Status::InternalError(
286
0
                "__DORIS_ICEBERG_ROWID_COL__ column not found in block for position delete");
287
0
    }
288
289
5
    const auto& row_id_col = block.get_by_position(row_id_col_idx);
290
5
    const IColumn* row_id_data = row_id_col.column.get();
291
5
    const IDataType* row_id_type = row_id_col.type.get();
292
5
    const auto* nullable_col = check_and_get_column<ColumnNullable>(row_id_data);
293
5
    if (nullable_col != nullptr) {
294
0
        row_id_data = nullable_col->get_nested_column_ptr().get();
295
0
    }
296
5
    const auto* nullable_type = check_and_get_data_type<DataTypeNullable>(row_id_type);
297
5
    if (nullable_type != nullptr) {
298
0
        row_id_type = nullable_type->get_nested_type().get();
299
0
    }
300
5
    const auto* struct_col = check_and_get_column<ColumnStruct>(row_id_data);
301
5
    const auto* struct_type = check_and_get_data_type<DataTypeStruct>(row_id_type);
302
5
    if (!struct_col || !struct_type) {
303
0
        return Status::InternalError("__DORIS_ICEBERG_ROWID_COL__ column is not a struct column");
304
0
    }
305
306
    // __DORIS_ICEBERG_ROWID_COL__ struct:
307
    // (file_path: STRING, row_position: BIGINT, partition_spec_id: INT, partition_data: STRING)
308
5
    size_t field_count = struct_col->tuple_size();
309
5
    if (field_count < 2) {
310
0
        return Status::InternalError(
311
0
                "__DORIS_ICEBERG_ROWID_COL__ struct must have at least 2 fields "
312
0
                "(file_path, row_position)");
313
0
    }
314
315
16
    auto normalize = [](const std::string& name) { return doris::to_lower(name); };
316
317
5
    int file_path_idx = -1;
318
5
    int row_position_idx = -1;
319
5
    int spec_id_idx = -1;
320
5
    int partition_data_idx = -1;
321
5
    const auto& field_names = struct_type->get_element_names();
322
21
    for (size_t i = 0; i < field_names.size(); ++i) {
323
16
        std::string name = normalize(field_names[i]);
324
16
        if (file_path_idx < 0 && name == "file_path") {
325
5
            file_path_idx = static_cast<int>(i);
326
11
        } else if (row_position_idx < 0 && name == "row_position") {
327
4
            row_position_idx = static_cast<int>(i);
328
7
        } else if (spec_id_idx < 0 && name == "partition_spec_id") {
329
2
            spec_id_idx = static_cast<int>(i);
330
5
        } else if (partition_data_idx < 0 && name == "partition_data") {
331
2
            partition_data_idx = static_cast<int>(i);
332
2
        }
333
16
    }
334
335
5
    if (file_path_idx < 0 || row_position_idx < 0) {
336
1
        return Status::InternalError(
337
1
                "__DORIS_ICEBERG_ROWID_COL__ must contain standard fields file_path and "
338
1
                "row_position");
339
1
    }
340
4
    if (field_count >= 3 && spec_id_idx < 0) {
341
0
        return Status::InternalError(
342
0
                "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_spec_id");
343
0
    }
344
4
    if (field_count >= 4 && partition_data_idx < 0) {
345
0
        return Status::InternalError(
346
0
                "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_data");
347
0
    }
348
349
4
    const auto* file_path_col = check_and_get_column<ColumnString>(
350
4
            remove_nullable(struct_col->get_column_ptr(file_path_idx)).get());
351
4
    const auto* row_position_col = check_and_get_column<ColumnVector<TYPE_BIGINT>>(
352
4
            remove_nullable(struct_col->get_column_ptr(row_position_idx)).get());
353
354
4
    if (!file_path_col || !row_position_col) {
355
0
        return Status::InternalError(
356
0
                "__DORIS_ICEBERG_ROWID_COL__ struct fields have incorrect types");
357
0
    }
358
359
4
    const ColumnVector<TYPE_INT>* spec_id_col = nullptr;
360
4
    const ColumnString* partition_data_col = nullptr;
361
4
    if (spec_id_idx >= 0 && spec_id_idx < static_cast<int>(field_count)) {
362
2
        spec_id_col = check_and_get_column<ColumnVector<TYPE_INT>>(
363
2
                remove_nullable(struct_col->get_column_ptr(spec_id_idx)).get());
364
2
        if (!spec_id_col) {
365
0
            return Status::InternalError(
366
0
                    "__DORIS_ICEBERG_ROWID_COL__ partition_spec_id has incorrect type");
367
0
        }
368
2
    }
369
4
    if (partition_data_idx >= 0 && partition_data_idx < static_cast<int>(field_count)) {
370
2
        partition_data_col = check_and_get_column<ColumnString>(
371
2
                remove_nullable(struct_col->get_column_ptr(partition_data_idx)).get());
372
2
        if (!partition_data_col) {
373
0
            return Status::InternalError(
374
0
                    "__DORIS_ICEBERG_ROWID_COL__ partition_data has incorrect type");
375
0
        }
376
2
    }
377
378
    // Group by file_path using roaring bitmap
379
9
    for (size_t i = 0; i < block.rows(); ++i) {
380
6
        std::string file_path = file_path_col->get_data_at(i).to_string();
381
6
        int64_t row_position = row_position_col->get_element(i);
382
6
        if (row_position < 0) {
383
1
            return Status::InternalError("Invalid row_position {} in row_id column", row_position);
384
1
        }
385
386
5
        int32_t partition_spec_id = _partition_spec_id;
387
5
        std::string partition_data_json = _partition_data_json;
388
5
        if (spec_id_col != nullptr) {
389
4
            partition_spec_id = spec_id_col->get_element(i);
390
4
        }
391
5
        if (partition_data_col != nullptr) {
392
4
            partition_data_json = partition_data_col->get_data_at(i).to_string();
393
4
        }
394
395
5
        auto [iter, inserted] = file_deletions.emplace(
396
5
                file_path, IcebergFileDeletion(partition_spec_id, partition_data_json));
397
5
        if (!inserted) {
398
1
            if (iter->second.partition_spec_id != partition_spec_id ||
399
1
                iter->second.partition_data_json != partition_data_json) {
400
0
                LOG(WARNING) << fmt::format(
401
0
                        "Mismatched partition info for file {}, existing spec_id={}, data={}, "
402
0
                        "new spec_id={}, data={}",
403
0
                        file_path, iter->second.partition_spec_id, iter->second.partition_data_json,
404
0
                        partition_spec_id, partition_data_json);
405
0
            }
406
1
        }
407
5
        iter->second.rows_to_delete.add(static_cast<uint64_t>(row_position));
408
5
    }
409
410
3
    return Status::OK();
411
4
}
412
413
Status VIcebergDeleteSink::_write_position_delete_files(
414
0
        const std::map<std::string, IcebergFileDeletion>& file_deletions) {
415
0
    constexpr size_t kBatchSize = 4096;
416
0
    for (const auto& [data_file_path, deletion] : file_deletions) {
417
0
        if (deletion.rows_to_delete.isEmpty()) {
418
0
            continue;
419
0
        }
420
        // Generate unique delete file path
421
0
        std::string delete_file_path = _generate_delete_file_path(data_file_path);
422
423
        // Create delete file writer
424
0
        auto writer = VIcebergDeleteFileWriterFactory::create_writer(
425
0
                TFileContent::POSITION_DELETES, delete_file_path, _file_format_type,
426
0
                _compress_type);
427
428
        // Build column names for position delete
429
0
        std::vector<std::string> column_names = {"file_path", "pos"};
430
431
0
        if (_position_delete_output_expr_ctxs.empty()) {
432
0
            RETURN_IF_ERROR(_init_position_delete_output_exprs());
433
0
        }
434
435
        // Open writer
436
0
        RETURN_IF_ERROR(writer->open(_state, _state->runtime_profile(),
437
0
                                     _position_delete_output_expr_ctxs, column_names, _hadoop_conf,
438
0
                                     _file_type, _broker_addresses));
439
440
        // Build block with (file_path, pos) columns
441
0
        std::vector<int64_t> positions;
442
0
        positions.reserve(kBatchSize);
443
0
        for (auto it = deletion.rows_to_delete.begin(); it != deletion.rows_to_delete.end(); ++it) {
444
0
            positions.push_back(static_cast<int64_t>(*it));
445
0
            if (positions.size() >= kBatchSize) {
446
0
                Block delete_block;
447
0
                RETURN_IF_ERROR(
448
0
                        _build_position_delete_block(data_file_path, positions, delete_block));
449
0
                RETURN_IF_ERROR(writer->write(delete_block));
450
0
                positions.clear();
451
0
            }
452
0
        }
453
0
        if (!positions.empty()) {
454
0
            Block delete_block;
455
0
            RETURN_IF_ERROR(_build_position_delete_block(data_file_path, positions, delete_block));
456
0
            RETURN_IF_ERROR(writer->write(delete_block));
457
0
        }
458
459
        // Set partition info on writer before close
460
0
        writer->set_partition_info(deletion.partition_spec_id, deletion.partition_data_json);
461
462
        // Close writer and collect commit data
463
0
        TIcebergCommitData commit_data;
464
0
        RETURN_IF_ERROR(writer->close(commit_data));
465
466
        // Set referenced data file path
467
0
        commit_data.__set_referenced_data_file_path(data_file_path);
468
469
0
        _commit_data_list.push_back(commit_data);
470
0
        _delete_file_count++;
471
472
0
        VLOG(1) << fmt::format("Written position delete file: path={}, rows={}, referenced_file={}",
473
0
                               delete_file_path, commit_data.row_count, data_file_path);
474
0
    }
475
476
0
    return Status::OK();
477
0
}
478
479
3
Status VIcebergDeleteSink::_init_position_delete_output_exprs() {
480
3
    if (!_position_delete_output_expr_ctxs.empty()) {
481
0
        return Status::OK();
482
0
    }
483
484
3
    std::vector<TExpr> texprs;
485
3
    texprs.reserve(2);
486
487
3
    std::string empty_string;
488
3
    TExprNode file_path_node =
489
3
            create_texpr_node_from(&empty_string, PrimitiveType::TYPE_STRING, 0, 0);
490
3
    file_path_node.__set_num_children(0);
491
3
    file_path_node.__set_output_scale(0);
492
3
    file_path_node.__set_is_nullable(false);
493
3
    TExpr file_path_expr;
494
3
    file_path_expr.nodes.emplace_back(std::move(file_path_node));
495
3
    texprs.emplace_back(std::move(file_path_expr));
496
497
3
    int64_t zero = 0;
498
3
    TExprNode pos_node = create_texpr_node_from(&zero, PrimitiveType::TYPE_BIGINT, 0, 0);
499
3
    pos_node.__set_num_children(0);
500
3
    pos_node.__set_output_scale(0);
501
3
    pos_node.__set_is_nullable(false);
502
3
    TExpr pos_expr;
503
3
    pos_expr.nodes.emplace_back(std::move(pos_node));
504
3
    texprs.emplace_back(std::move(pos_expr));
505
506
3
    RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, _position_delete_output_expr_ctxs));
507
3
    return Status::OK();
508
3
}
509
510
Status VIcebergDeleteSink::_build_position_delete_block(const std::string& file_path,
511
                                                        const std::vector<int64_t>& positions,
512
1
                                                        Block& output_block) {
513
    // Create file_path column (repeated for each position)
514
1
    auto file_path_col = ColumnString::create();
515
5
    for (size_t i = 0; i < positions.size(); ++i) {
516
4
        file_path_col->insert_data(file_path.data(), file_path.size());
517
4
    }
518
519
    // Create pos column
520
1
    auto pos_col = ColumnVector<TYPE_BIGINT>::create();
521
1
    pos_col->get_data().assign(positions.begin(), positions.end());
522
523
    // Build block
524
1
    output_block.insert(ColumnWithTypeAndName(std::move(file_path_col),
525
1
                                              std::make_shared<DataTypeString>(), "file_path"));
526
1
    output_block.insert(
527
1
            ColumnWithTypeAndName(std::move(pos_col), std::make_shared<DataTypeInt64>(), "pos"));
528
529
1
    return Status::OK();
530
1
}
531
532
1
std::string VIcebergDeleteSink::_get_file_extension() const {
533
1
    std::string compress_name;
534
1
    switch (_compress_type) {
535
1
    case TFileCompressType::SNAPPYBLOCK: {
536
1
        compress_name = ".snappy";
537
1
        break;
538
0
    }
539
0
    case TFileCompressType::ZLIB: {
540
0
        compress_name = ".zlib";
541
0
        break;
542
0
    }
543
0
    case TFileCompressType::ZSTD: {
544
0
        compress_name = ".zstd";
545
0
        break;
546
0
    }
547
0
    default: {
548
0
        compress_name = "";
549
0
        break;
550
0
    }
551
1
    }
552
553
1
    std::string file_format_name;
554
1
    switch (_file_format_type) {
555
1
    case TFileFormatType::FORMAT_PARQUET: {
556
1
        file_format_name = ".parquet";
557
1
        break;
558
0
    }
559
0
    case TFileFormatType::FORMAT_ORC: {
560
0
        file_format_name = ".orc";
561
0
        break;
562
0
    }
563
0
    default: {
564
0
        file_format_name = "";
565
0
        break;
566
0
    }
567
1
    }
568
1
    return fmt::format("{}{}", compress_name, file_format_name);
569
1
}
570
571
Status VIcebergDeleteSink::_write_deletion_vector_files(
572
1
        const std::map<std::string, IcebergFileDeletion>& file_deletions) {
573
1
    std::vector<DeletionVectorBlob> blobs;
574
2
    for (const auto& [data_file_path, deletion] : file_deletions) {
575
2
        if (deletion.rows_to_delete.isEmpty()) {
576
0
            continue;
577
0
        }
578
2
        roaring::Roaring64Map merged_rows = deletion.rows_to_delete;
579
2
        DeletionVectorBlob blob;
580
2
        blob.delete_count = static_cast<int64_t>(merged_rows.cardinality());
581
2
        auto previous_delete_it = _rewritable_delete_files.find(data_file_path);
582
2
        if (previous_delete_it != _rewritable_delete_files.end()) {
583
0
            roaring::Roaring64Map previous_rows;
584
0
            RETURN_IF_ERROR(load_rewritable_delete_rows(
585
0
                    _state, _state->runtime_profile(), data_file_path, previous_delete_it->second,
586
0
                    _hadoop_conf, _file_type, _broker_addresses, &previous_rows));
587
0
            merged_rows |= previous_rows;
588
0
        }
589
590
2
        size_t bitmap_size = merged_rows.getSizeInBytes();
591
2
        blob.referenced_data_file = data_file_path;
592
2
        blob.partition_spec_id = deletion.partition_spec_id;
593
2
        blob.partition_data_json = deletion.partition_data_json;
594
2
        blob.merged_count = static_cast<int64_t>(merged_rows.cardinality());
595
2
        blob.content_size_in_bytes = static_cast<int64_t>(4 + 4 + bitmap_size + 4);
596
2
        blob.blob_data.resize(static_cast<size_t>(blob.content_size_in_bytes));
597
2
        merged_rows.write(blob.blob_data.data() + 8);
598
599
2
        uint32_t total_length = static_cast<uint32_t>(4 + bitmap_size);
600
2
        BigEndian::Store32(blob.blob_data.data(), total_length);
601
602
2
        constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'};
603
2
        memcpy(blob.blob_data.data() + 4, DV_MAGIC, 4);
604
605
2
        uint32_t crc = static_cast<uint32_t>(
606
2
                ::crc32(0, reinterpret_cast<const Bytef*>(blob.blob_data.data() + 4),
607
2
                        4 + (uInt)bitmap_size));
608
2
        BigEndian::Store32(blob.blob_data.data() + 8 + bitmap_size, crc);
609
2
        blobs.emplace_back(std::move(blob));
610
2
    }
611
612
1
    if (blobs.empty()) {
613
0
        return Status::OK();
614
0
    }
615
616
1
    std::string puffin_path = _generate_puffin_file_path();
617
1
    int64_t puffin_file_size = 0;
618
1
    RETURN_IF_ERROR(_write_puffin_file(puffin_path, &blobs, &puffin_file_size));
619
620
2
    for (const auto& blob : blobs) {
621
2
        TIcebergCommitData commit_data;
622
2
        commit_data.__set_file_path(puffin_path);
623
2
        commit_data.__set_row_count(blob.merged_count);
624
2
        commit_data.__set_affected_rows(blob.delete_count);
625
2
        commit_data.__set_file_size(puffin_file_size);
626
2
        commit_data.__set_file_content(TFileContent::DELETION_VECTOR);
627
2
        commit_data.__set_content_offset(blob.content_offset);
628
2
        commit_data.__set_content_size_in_bytes(blob.content_size_in_bytes);
629
2
        commit_data.__set_referenced_data_file_path(blob.referenced_data_file);
630
2
        if (blob.partition_spec_id != 0 || !blob.partition_data_json.empty()) {
631
2
            commit_data.__set_partition_spec_id(blob.partition_spec_id);
632
2
            commit_data.__set_partition_data_json(blob.partition_data_json);
633
2
        }
634
635
2
        _commit_data_list.push_back(commit_data);
636
2
        _delete_file_count++;
637
2
    }
638
1
    return Status::OK();
639
1
}
640
641
Status VIcebergDeleteSink::_write_puffin_file(const std::string& puffin_path,
642
                                              std::vector<DeletionVectorBlob>* blobs,
643
1
                                              int64_t* out_file_size) {
644
1
    DCHECK(blobs != nullptr);
645
1
    DCHECK(!blobs->empty());
646
647
1
    io::FSPropertiesRef fs_properties(_file_type);
648
1
    fs_properties.properties = &_hadoop_conf;
649
1
    if (!_broker_addresses.empty()) {
650
0
        fs_properties.broker_addresses = &_broker_addresses;
651
0
    }
652
1
    io::FileDescription file_description = {.path = puffin_path, .fs_name {}};
653
1
    auto fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
654
1
    io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false};
655
1
    io::FileWriterPtr file_writer;
656
1
    RETURN_IF_ERROR(fs->create_file(file_description.path, &file_writer, &file_writer_options));
657
658
1
    constexpr char PUFFIN_MAGIC[] = {'\x50', '\x46', '\x41', '\x31'};
659
1
    RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
660
1
    int64_t current_offset = 4;
661
2
    for (auto& blob : *blobs) {
662
2
        blob.content_offset = current_offset;
663
2
        RETURN_IF_ERROR(file_writer->append(Slice(
664
2
                reinterpret_cast<const uint8_t*>(blob.blob_data.data()), blob.blob_data.size())));
665
2
        current_offset += static_cast<int64_t>(blob.blob_data.size());
666
2
    }
667
1
    RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
668
669
1
    std::string footer_json = _build_puffin_footer_json(*blobs);
670
1
    RETURN_IF_ERROR(file_writer->append(
671
1
            Slice(reinterpret_cast<const uint8_t*>(footer_json.data()), footer_json.size())));
672
673
1
    char footer_size_buf[4];
674
1
    LittleEndian::Store32(footer_size_buf, static_cast<uint32_t>(footer_json.size()));
675
1
    RETURN_IF_ERROR(file_writer->append(
676
1
            Slice(reinterpret_cast<const uint8_t*>(footer_size_buf), sizeof(footer_size_buf))));
677
678
1
    char flags[4] = {0, 0, 0, 0};
679
1
    RETURN_IF_ERROR(
680
1
            file_writer->append(Slice(reinterpret_cast<const uint8_t*>(flags), sizeof(flags))));
681
1
    RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
682
1
    RETURN_IF_ERROR(file_writer->close());
683
684
1
    *out_file_size = current_offset + 4 + static_cast<int64_t>(footer_json.size()) + 4 + 4 + 4;
685
1
    return Status::OK();
686
1
}
687
688
std::string VIcebergDeleteSink::_build_puffin_footer_json(
689
1
        const std::vector<DeletionVectorBlob>& blobs) {
690
1
    rapidjson::StringBuffer buffer;
691
1
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
692
1
    writer.StartObject();
693
1
    writer.Key("blobs");
694
1
    writer.StartArray();
695
2
    for (const auto& blob : blobs) {
696
2
        writer.StartObject();
697
2
        writer.Key("type");
698
2
        writer.String("deletion-vector-v1");
699
2
        writer.Key("fields");
700
2
        writer.StartArray();
701
2
        writer.EndArray();
702
2
        writer.Key("snapshot-id");
703
2
        writer.Int64(-1);
704
2
        writer.Key("sequence-number");
705
2
        writer.Int64(-1);
706
2
        writer.Key("offset");
707
2
        writer.Int64(blob.content_offset);
708
2
        writer.Key("length");
709
2
        writer.Int64(blob.content_size_in_bytes);
710
2
        writer.Key("properties");
711
2
        writer.StartObject();
712
2
        writer.Key("referenced-data-file");
713
2
        writer.String(blob.referenced_data_file.c_str(),
714
2
                      static_cast<rapidjson::SizeType>(blob.referenced_data_file.size()));
715
2
        std::string cardinality = std::to_string(blob.merged_count);
716
2
        writer.Key("cardinality");
717
2
        writer.String(cardinality.c_str(), static_cast<rapidjson::SizeType>(cardinality.size()));
718
2
        writer.EndObject();
719
2
        writer.EndObject();
720
2
    }
721
1
    writer.EndArray();
722
1
    writer.Key("properties");
723
1
    writer.StartObject();
724
1
    writer.Key("created-by");
725
1
    writer.String("doris-puffin-v1");
726
1
    writer.EndObject();
727
1
    writer.EndObject();
728
1
    return {buffer.GetString(), buffer.GetSize()};
729
1
}
730
731
std::string VIcebergDeleteSink::_generate_delete_file_path(
732
1
        const std::string& referenced_data_file) {
733
    // Generate unique delete file name using UUID
734
1
    std::string uuid = generate_uuid_string();
735
1
    std::string file_name;
736
737
1
    std::string file_extension = _get_file_extension();
738
1
    file_name =
739
1
            fmt::format("delete_pos_{}_{}{}", uuid,
740
1
                        std::hash<std::string> {}(referenced_data_file) % 10000000, file_extension);
741
742
    // Combine with output path or table location
743
1
    std::string base_path = _output_path.empty() ? _table_location : _output_path;
744
745
    // Ensure base path ends with /
746
1
    if (!base_path.empty() && base_path.back() != '/') {
747
1
        base_path += '/';
748
1
    }
749
750
    // Delete files are data files in Iceberg, write under data location
751
1
    return fmt::format("{}{}", base_path, file_name);
752
1
}
753
754
1
std::string VIcebergDeleteSink::_generate_puffin_file_path() {
755
1
    std::string uuid = generate_uuid_string();
756
1
    std::string file_name = fmt::format("delete_dv_{}.puffin", uuid);
757
1
    std::string base_path = _output_path.empty() ? _table_location : _output_path;
758
1
    if (!base_path.empty() && base_path.back() != '/') {
759
1
        base_path += '/';
760
1
    }
761
1
    return fmt::format("{}{}", base_path, file_name);
762
1
}
763
764
} // namespace doris