Coverage Report

Created: 2026-03-27 23:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/viceberg_delete_sink.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/viceberg_delete_sink.h"
19
20
#include <fmt/format.h>
21
22
#include "common/logging.h"
23
#include "core/block/column_with_type_and_name.h"
24
#include "core/column/column_nullable.h"
25
#include "core/column/column_string.h"
26
#include "core/column/column_struct.h"
27
#include "core/column/column_vector.h"
28
#include "core/data_type/data_type_factory.hpp"
29
#include "core/data_type/data_type_nullable.h"
30
#include "core/data_type/data_type_number.h"
31
#include "core/data_type/data_type_string.h"
32
#include "core/data_type/data_type_struct.h"
33
#include "exprs/vexpr.h"
34
#include "format/transformer/vfile_format_transformer.h"
35
#include "runtime/runtime_state.h"
36
#include "util/string_util.h"
37
#include "util/uid_util.h"
38
39
namespace doris {
40
41
VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink,
42
                                       const VExprContextSPtrs& output_exprs,
43
                                       std::shared_ptr<Dependency> dep,
44
                                       std::shared_ptr<Dependency> fin_dep)
45
17
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
46
17
    DCHECK(_t_sink.__isset.iceberg_delete_sink);
47
17
}
48
49
10
Status VIcebergDeleteSink::init_properties(ObjectPool* pool) {
50
10
    const auto& delete_sink = _t_sink.iceberg_delete_sink;
51
52
10
    _delete_type = delete_sink.delete_type;
53
10
    if (_delete_type != TFileContent::POSITION_DELETES) {
54
1
        return Status::NotSupported("Iceberg delete only supports position delete files");
55
1
    }
56
57
    // Get file format settings
58
9
    if (delete_sink.__isset.file_format) {
59
9
        _file_format_type = delete_sink.file_format;
60
9
    }
61
62
9
    if (delete_sink.__isset.compress_type) {
63
9
        _compress_type = delete_sink.compress_type;
64
9
    }
65
66
    // Get output path and table location
67
9
    if (delete_sink.__isset.output_path) {
68
9
        _output_path = delete_sink.output_path;
69
9
    }
70
71
9
    if (delete_sink.__isset.table_location) {
72
9
        _table_location = delete_sink.table_location;
73
9
    }
74
75
    // Get Hadoop configuration
76
9
    if (delete_sink.__isset.hadoop_config) {
77
2
        _hadoop_conf.insert(delete_sink.hadoop_config.begin(), delete_sink.hadoop_config.end());
78
2
    }
79
80
9
    if (delete_sink.__isset.file_type) {
81
6
        _file_type = delete_sink.file_type;
82
6
    }
83
84
9
    if (delete_sink.__isset.broker_addresses) {
85
0
        _broker_addresses.assign(delete_sink.broker_addresses.begin(),
86
0
                                 delete_sink.broker_addresses.end());
87
0
    }
88
89
    // Get partition information
90
9
    if (delete_sink.__isset.partition_spec_id) {
91
7
        _partition_spec_id = delete_sink.partition_spec_id;
92
7
    }
93
94
9
    if (delete_sink.__isset.partition_data_json) {
95
1
        _partition_data_json = delete_sink.partition_data_json;
96
1
    }
97
98
9
    return Status::OK();
99
10
}
100
101
3
Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) {
102
3
    _state = state;
103
104
    // Initialize counters
105
3
    _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
106
3
    _send_data_timer = ADD_TIMER(profile, "SendDataTime");
107
3
    _write_delete_files_timer = ADD_TIMER(profile, "WriteDeleteFilesTime");
108
3
    _delete_file_count_counter = ADD_COUNTER(profile, "DeleteFileCount", TUnit::UNIT);
109
3
    _open_timer = ADD_TIMER(profile, "OpenTime");
110
3
    _close_timer = ADD_TIMER(profile, "CloseTime");
111
112
3
    SCOPED_TIMER(_open_timer);
113
114
3
    RETURN_IF_ERROR(_init_position_delete_output_exprs());
115
116
3
    LOG(INFO) << fmt::format("VIcebergDeleteSink opened: delete_type={}, output_path={}",
117
3
                             to_string(_delete_type), _output_path);
118
119
3
    return Status::OK();
120
3
}
121
122
0
Status VIcebergDeleteSink::write(RuntimeState* state, Block& block) {
123
0
    SCOPED_TIMER(_send_data_timer);
124
125
0
    if (block.rows() == 0) {
126
0
        return Status::OK();
127
0
    }
128
129
0
    _row_count += block.rows();
130
131
0
    if (_delete_type != TFileContent::POSITION_DELETES) {
132
0
        return Status::NotSupported("Iceberg delete only supports position delete files");
133
0
    }
134
135
    // Extract $row_id column and group by file_path
136
0
    RETURN_IF_ERROR(_collect_position_deletes(block, _file_deletions));
137
138
0
    if (_written_rows_counter) {
139
0
        COUNTER_UPDATE(_written_rows_counter, block.rows());
140
0
    }
141
142
0
    return Status::OK();
143
0
}
144
145
2
Status VIcebergDeleteSink::close(Status close_status) {
146
2
    SCOPED_TIMER(_close_timer);
147
148
2
    if (!close_status.ok()) {
149
0
        LOG(WARNING) << fmt::format("VIcebergDeleteSink close with error: {}",
150
0
                                    close_status.to_string());
151
0
        return close_status;
152
0
    }
153
154
2
    if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty()) {
155
0
        SCOPED_TIMER(_write_delete_files_timer);
156
0
        RETURN_IF_ERROR(_write_position_delete_files(_file_deletions));
157
0
    }
158
159
    // Update counters
160
2
    if (_delete_file_count_counter) {
161
2
        COUNTER_UPDATE(_delete_file_count_counter, _delete_file_count);
162
2
    }
163
164
2
    LOG(INFO) << fmt::format("VIcebergDeleteSink closed: rows={}, delete_files={}", _row_count,
165
2
                             _delete_file_count);
166
167
2
    if (_state != nullptr) {
168
2
        for (const auto& commit_data : _commit_data_list) {
169
0
            _state->add_iceberg_commit_datas(commit_data);
170
0
        }
171
2
    }
172
173
2
    return Status::OK();
174
2
}
175
176
7
int VIcebergDeleteSink::_get_row_id_column_index(const Block& block) {
177
    // Find __DORIS_ICEBERG_ROWID_COL__ column in block
178
8
    for (size_t i = 0; i < block.columns(); ++i) {
179
8
        const auto& col_name = block.get_by_position(i).name;
180
8
        if (col_name == doris::BeConsts::ICEBERG_ROWID_COL) {
181
7
            return static_cast<int>(i);
182
7
        }
183
8
    }
184
0
    return -1;
185
7
}
186
187
Status VIcebergDeleteSink::_collect_position_deletes(
188
5
        const Block& block, std::map<std::string, IcebergFileDeletion>& file_deletions) {
189
    // Find row id column
190
5
    int row_id_col_idx = _get_row_id_column_index(block);
191
5
    if (row_id_col_idx < 0) {
192
0
        return Status::InternalError(
193
0
                "__DORIS_ICEBERG_ROWID_COL__ column not found in block for position delete");
194
0
    }
195
196
5
    const auto& row_id_col = block.get_by_position(row_id_col_idx);
197
5
    const IColumn* row_id_data = row_id_col.column.get();
198
5
    const IDataType* row_id_type = row_id_col.type.get();
199
5
    const auto* nullable_col = check_and_get_column<ColumnNullable>(row_id_data);
200
5
    if (nullable_col != nullptr) {
201
0
        row_id_data = nullable_col->get_nested_column_ptr().get();
202
0
    }
203
5
    const auto* nullable_type = check_and_get_data_type<DataTypeNullable>(row_id_type);
204
5
    if (nullable_type != nullptr) {
205
0
        row_id_type = nullable_type->get_nested_type().get();
206
0
    }
207
5
    const auto* struct_col = check_and_get_column<ColumnStruct>(row_id_data);
208
5
    const auto* struct_type = check_and_get_data_type<DataTypeStruct>(row_id_type);
209
5
    if (!struct_col || !struct_type) {
210
0
        return Status::InternalError("__DORIS_ICEBERG_ROWID_COL__ column is not a struct column");
211
0
    }
212
213
    // __DORIS_ICEBERG_ROWID_COL__ struct:
214
    // (file_path: STRING, row_position: BIGINT, partition_spec_id: INT, partition_data: STRING)
215
5
    size_t field_count = struct_col->tuple_size();
216
5
    if (field_count < 2) {
217
0
        return Status::InternalError(
218
0
                "__DORIS_ICEBERG_ROWID_COL__ struct must have at least 2 fields "
219
0
                "(file_path, row_position)");
220
0
    }
221
222
16
    auto normalize = [](const std::string& name) { return doris::to_lower(name); };
223
224
5
    int file_path_idx = -1;
225
5
    int row_position_idx = -1;
226
5
    int spec_id_idx = -1;
227
5
    int partition_data_idx = -1;
228
5
    const auto& field_names = struct_type->get_element_names();
229
21
    for (size_t i = 0; i < field_names.size(); ++i) {
230
16
        std::string name = normalize(field_names[i]);
231
16
        if (file_path_idx < 0 && name == "file_path") {
232
5
            file_path_idx = static_cast<int>(i);
233
11
        } else if (row_position_idx < 0 && name == "row_position") {
234
4
            row_position_idx = static_cast<int>(i);
235
7
        } else if (spec_id_idx < 0 && name == "partition_spec_id") {
236
2
            spec_id_idx = static_cast<int>(i);
237
5
        } else if (partition_data_idx < 0 && name == "partition_data") {
238
2
            partition_data_idx = static_cast<int>(i);
239
2
        }
240
16
    }
241
242
5
    if (file_path_idx < 0 || row_position_idx < 0) {
243
1
        return Status::InternalError(
244
1
                "__DORIS_ICEBERG_ROWID_COL__ must contain standard fields file_path and "
245
1
                "row_position");
246
1
    }
247
4
    if (field_count >= 3 && spec_id_idx < 0) {
248
0
        return Status::InternalError(
249
0
                "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_spec_id");
250
0
    }
251
4
    if (field_count >= 4 && partition_data_idx < 0) {
252
0
        return Status::InternalError(
253
0
                "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_data");
254
0
    }
255
256
4
    const auto* file_path_col = check_and_get_column<ColumnString>(
257
4
            remove_nullable(struct_col->get_column_ptr(file_path_idx)).get());
258
4
    const auto* row_position_col = check_and_get_column<ColumnVector<TYPE_BIGINT>>(
259
4
            remove_nullable(struct_col->get_column_ptr(row_position_idx)).get());
260
261
4
    if (!file_path_col || !row_position_col) {
262
0
        return Status::InternalError(
263
0
                "__DORIS_ICEBERG_ROWID_COL__ struct fields have incorrect types");
264
0
    }
265
266
4
    const ColumnVector<TYPE_INT>* spec_id_col = nullptr;
267
4
    const ColumnString* partition_data_col = nullptr;
268
4
    if (spec_id_idx >= 0 && spec_id_idx < static_cast<int>(field_count)) {
269
2
        spec_id_col = check_and_get_column<ColumnVector<TYPE_INT>>(
270
2
                remove_nullable(struct_col->get_column_ptr(spec_id_idx)).get());
271
2
        if (!spec_id_col) {
272
0
            return Status::InternalError(
273
0
                    "__DORIS_ICEBERG_ROWID_COL__ partition_spec_id has incorrect type");
274
0
        }
275
2
    }
276
4
    if (partition_data_idx >= 0 && partition_data_idx < static_cast<int>(field_count)) {
277
2
        partition_data_col = check_and_get_column<ColumnString>(
278
2
                remove_nullable(struct_col->get_column_ptr(partition_data_idx)).get());
279
2
        if (!partition_data_col) {
280
0
            return Status::InternalError(
281
0
                    "__DORIS_ICEBERG_ROWID_COL__ partition_data has incorrect type");
282
0
        }
283
2
    }
284
285
    // Group by file_path using roaring bitmap
286
9
    for (size_t i = 0; i < block.rows(); ++i) {
287
6
        std::string file_path = file_path_col->get_data_at(i).to_string();
288
6
        int64_t row_position = row_position_col->get_element(i);
289
6
        if (row_position < 0) {
290
1
            return Status::InternalError("Invalid row_position {} in row_id column", row_position);
291
1
        }
292
293
5
        int32_t partition_spec_id = _partition_spec_id;
294
5
        std::string partition_data_json = _partition_data_json;
295
5
        if (spec_id_col != nullptr) {
296
4
            partition_spec_id = spec_id_col->get_element(i);
297
4
        }
298
5
        if (partition_data_col != nullptr) {
299
4
            partition_data_json = partition_data_col->get_data_at(i).to_string();
300
4
        }
301
302
5
        auto [iter, inserted] = file_deletions.emplace(
303
5
                file_path, IcebergFileDeletion(partition_spec_id, partition_data_json));
304
5
        if (!inserted) {
305
1
            if (iter->second.partition_spec_id != partition_spec_id ||
306
1
                iter->second.partition_data_json != partition_data_json) {
307
0
                LOG(WARNING) << fmt::format(
308
0
                        "Mismatched partition info for file {}, existing spec_id={}, data={}, "
309
0
                        "new spec_id={}, data={}",
310
0
                        file_path, iter->second.partition_spec_id, iter->second.partition_data_json,
311
0
                        partition_spec_id, partition_data_json);
312
0
            }
313
1
        }
314
5
        iter->second.rows_to_delete.add(static_cast<uint64_t>(row_position));
315
5
    }
316
317
3
    return Status::OK();
318
4
}
319
320
Status VIcebergDeleteSink::_write_position_delete_files(
321
0
        const std::map<std::string, IcebergFileDeletion>& file_deletions) {
322
0
    constexpr size_t kBatchSize = 4096;
323
0
    for (const auto& [data_file_path, deletion] : file_deletions) {
324
0
        if (deletion.rows_to_delete.isEmpty()) {
325
0
            continue;
326
0
        }
327
        // Generate unique delete file path
328
0
        std::string delete_file_path = _generate_delete_file_path(data_file_path);
329
330
        // Create delete file writer
331
0
        auto writer = VIcebergDeleteFileWriterFactory::create_writer(
332
0
                TFileContent::POSITION_DELETES, delete_file_path, _file_format_type,
333
0
                _compress_type);
334
335
        // Build column names for position delete
336
0
        std::vector<std::string> column_names = {"file_path", "pos"};
337
338
0
        if (_position_delete_output_expr_ctxs.empty()) {
339
0
            RETURN_IF_ERROR(_init_position_delete_output_exprs());
340
0
        }
341
342
        // Open writer
343
0
        RETURN_IF_ERROR(writer->open(_state, _state->runtime_profile(),
344
0
                                     _position_delete_output_expr_ctxs, column_names, _hadoop_conf,
345
0
                                     _file_type, _broker_addresses));
346
347
        // Build block with (file_path, pos) columns
348
0
        std::vector<int64_t> positions;
349
0
        positions.reserve(kBatchSize);
350
0
        for (auto it = deletion.rows_to_delete.begin(); it != deletion.rows_to_delete.end(); ++it) {
351
0
            positions.push_back(static_cast<int64_t>(*it));
352
0
            if (positions.size() >= kBatchSize) {
353
0
                Block delete_block;
354
0
                RETURN_IF_ERROR(
355
0
                        _build_position_delete_block(data_file_path, positions, delete_block));
356
0
                RETURN_IF_ERROR(writer->write(delete_block));
357
0
                positions.clear();
358
0
            }
359
0
        }
360
0
        if (!positions.empty()) {
361
0
            Block delete_block;
362
0
            RETURN_IF_ERROR(_build_position_delete_block(data_file_path, positions, delete_block));
363
0
            RETURN_IF_ERROR(writer->write(delete_block));
364
0
        }
365
366
        // Set partition info on writer before close
367
0
        writer->set_partition_info(deletion.partition_spec_id, deletion.partition_data_json);
368
369
        // Close writer and collect commit data
370
0
        TIcebergCommitData commit_data;
371
0
        RETURN_IF_ERROR(writer->close(commit_data));
372
373
        // Set referenced data file path
374
0
        commit_data.__set_referenced_data_file_path(data_file_path);
375
376
0
        _commit_data_list.push_back(commit_data);
377
0
        _delete_file_count++;
378
379
0
        VLOG(1) << fmt::format("Written position delete file: path={}, rows={}, referenced_file={}",
380
0
                               delete_file_path, commit_data.row_count, data_file_path);
381
0
    }
382
383
0
    return Status::OK();
384
0
}
385
386
3
Status VIcebergDeleteSink::_init_position_delete_output_exprs() {
387
3
    if (!_position_delete_output_expr_ctxs.empty()) {
388
0
        return Status::OK();
389
0
    }
390
391
3
    std::vector<TExpr> texprs;
392
3
    texprs.reserve(2);
393
394
3
    std::string empty_string;
395
3
    TExprNode file_path_node =
396
3
            create_texpr_node_from(&empty_string, PrimitiveType::TYPE_STRING, 0, 0);
397
3
    file_path_node.__set_num_children(0);
398
3
    file_path_node.__set_output_scale(0);
399
3
    file_path_node.__set_is_nullable(false);
400
3
    TExpr file_path_expr;
401
3
    file_path_expr.nodes.emplace_back(std::move(file_path_node));
402
3
    texprs.emplace_back(std::move(file_path_expr));
403
404
3
    int64_t zero = 0;
405
3
    TExprNode pos_node = create_texpr_node_from(&zero, PrimitiveType::TYPE_BIGINT, 0, 0);
406
3
    pos_node.__set_num_children(0);
407
3
    pos_node.__set_output_scale(0);
408
3
    pos_node.__set_is_nullable(false);
409
3
    TExpr pos_expr;
410
3
    pos_expr.nodes.emplace_back(std::move(pos_node));
411
3
    texprs.emplace_back(std::move(pos_expr));
412
413
3
    RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, _position_delete_output_expr_ctxs));
414
3
    return Status::OK();
415
3
}
416
417
Status VIcebergDeleteSink::_build_position_delete_block(const std::string& file_path,
418
                                                        const std::vector<int64_t>& positions,
419
1
                                                        Block& output_block) {
420
    // Create file_path column (repeated for each position)
421
1
    auto file_path_col = ColumnString::create();
422
5
    for (size_t i = 0; i < positions.size(); ++i) {
423
4
        file_path_col->insert_data(file_path.data(), file_path.size());
424
4
    }
425
426
    // Create pos column
427
1
    auto pos_col = ColumnVector<TYPE_BIGINT>::create();
428
1
    pos_col->get_data().assign(positions.begin(), positions.end());
429
430
    // Build block
431
1
    output_block.insert(ColumnWithTypeAndName(std::move(file_path_col),
432
1
                                              std::make_shared<DataTypeString>(), "file_path"));
433
1
    output_block.insert(
434
1
            ColumnWithTypeAndName(std::move(pos_col), std::make_shared<DataTypeInt64>(), "pos"));
435
436
1
    return Status::OK();
437
1
}
438
439
1
std::string VIcebergDeleteSink::_get_file_extension() const {
440
1
    std::string compress_name;
441
1
    switch (_compress_type) {
442
1
    case TFileCompressType::SNAPPYBLOCK: {
443
1
        compress_name = ".snappy";
444
1
        break;
445
0
    }
446
0
    case TFileCompressType::ZLIB: {
447
0
        compress_name = ".zlib";
448
0
        break;
449
0
    }
450
0
    case TFileCompressType::ZSTD: {
451
0
        compress_name = ".zstd";
452
0
        break;
453
0
    }
454
0
    default: {
455
0
        compress_name = "";
456
0
        break;
457
0
    }
458
1
    }
459
460
1
    std::string file_format_name;
461
1
    switch (_file_format_type) {
462
1
    case TFileFormatType::FORMAT_PARQUET: {
463
1
        file_format_name = ".parquet";
464
1
        break;
465
0
    }
466
0
    case TFileFormatType::FORMAT_ORC: {
467
0
        file_format_name = ".orc";
468
0
        break;
469
0
    }
470
0
    default: {
471
0
        file_format_name = "";
472
0
        break;
473
0
    }
474
1
    }
475
1
    return fmt::format("{}{}", compress_name, file_format_name);
476
1
}
477
478
std::string VIcebergDeleteSink::_generate_delete_file_path(
479
1
        const std::string& referenced_data_file) {
480
    // Generate unique delete file name using UUID
481
1
    std::string uuid = generate_uuid_string();
482
1
    std::string file_name;
483
484
1
    std::string file_extension = _get_file_extension();
485
1
    file_name =
486
1
            fmt::format("delete_pos_{}_{}{}", uuid,
487
1
                        std::hash<std::string> {}(referenced_data_file) % 10000000, file_extension);
488
489
    // Combine with output path or table location
490
1
    std::string base_path = _output_path.empty() ? _table_location : _output_path;
491
492
    // Ensure base path ends with /
493
1
    if (!base_path.empty() && base_path.back() != '/') {
494
1
        base_path += '/';
495
1
    }
496
497
    // Delete files are data files in Iceberg, write under data location
498
1
    return fmt::format("{}{}", base_path, file_name);
499
1
}
500
501
} // namespace doris