Coverage Report

Created: 2026-03-31 12:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/viceberg_delete_sink.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <map>
24
#include <memory>
25
#include <string>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "core/block/block.h"
30
#include "exec/sink/writer/async_result_writer.h"
31
#include "exec/sink/writer/iceberg/viceberg_delete_file_writer.h"
32
#include "exprs/vexpr_fwd.h"
33
#include "roaring/roaring64map.hh"
34
#include "runtime/runtime_profile.h"
35
36
namespace doris {
37
38
class ObjectPool;
39
class RuntimeState;
40
class Dependency;
41
42
class VIcebergDeleteFileWriter;
43
44
struct IcebergFileDeletion {
45
    IcebergFileDeletion() = default;
46
    IcebergFileDeletion(int32_t spec_id, std::string data_json)
47
119
            : partition_spec_id(spec_id), partition_data_json(std::move(data_json)) {}
48
49
    int32_t partition_spec_id = 0;
50
    std::string partition_data_json;
51
    roaring::Roaring64Map rows_to_delete;
52
};
53
54
/**
55
 * Sink for writing Iceberg position delete files.
56
 *
57
 * This sink receives blocks containing a $row_id column with
58
 * (file_path, row_position, partition_spec_id, partition_data).
59
 * It groups delete records by file_path and writes position delete files.
60
 */
61
class VIcebergDeleteSink final : public AsyncResultWriter {
62
public:
63
    VIcebergDeleteSink(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
64
                       std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
65
66
818
    ~VIcebergDeleteSink() override = default;
67
68
    Status init_properties(ObjectPool* pool);
69
70
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
71
72
    Status write(RuntimeState* state, Block& block) override;
73
74
    Status close(Status) override;
75
76
private:
77
    /**
78
     * Extract $row_id column from block and group by file_path.
79
     */
80
    Status _collect_position_deletes(const Block& block,
81
                                     std::map<std::string, IcebergFileDeletion>& file_deletions);
82
83
    /**
84
     * Write grouped position deletes to delete files
85
     */
86
    Status _write_position_delete_files(
87
            const std::map<std::string, IcebergFileDeletion>& file_deletions);
88
89
    Status _write_deletion_vector_files(
90
            const std::map<std::string, IcebergFileDeletion>& file_deletions);
91
92
    struct DeletionVectorBlob {
93
        std::string referenced_data_file;
94
        int32_t partition_spec_id = 0;
95
        std::string partition_data_json;
96
        int64_t delete_count = 0; // The number of rows deleted in this delete operation.
97
        int64_t merged_count =
98
                0; // The number of rows after merging the old deletion vector and position delete.
99
        int64_t content_offset = 0;
100
        int64_t content_size_in_bytes = 0;
101
        std::vector<char> blob_data;
102
    };
103
104
    Status _write_puffin_file(const std::string& puffin_path,
105
                              std::vector<DeletionVectorBlob>* blobs, int64_t* out_file_size);
106
107
    std::string _build_puffin_footer_json(const std::vector<DeletionVectorBlob>& blobs);
108
109
    std::string _generate_puffin_file_path();
110
111
    /**
112
     * Generate unique delete file path
113
     */
114
    std::string _generate_delete_file_path(const std::string& referenced_data_file = "");
115
116
    /**
117
     * Get $row_id column index from block
118
     */
119
    int _get_row_id_column_index(const Block& block);
120
121
    /**
122
     * Build a block for position delete (file_path, pos)
123
     */
124
    Status _build_position_delete_block(const std::string& file_path,
125
                                        const std::vector<int64_t>& positions, Block& output_block);
126
    Status _init_position_delete_output_exprs();
127
    std::string _get_file_extension() const;
128
129
    TDataSink _t_sink;
130
    RuntimeState* _state = nullptr;
131
132
    int32_t _format_version = 2;
133
    TFileContent::type _delete_type = TFileContent::POSITION_DELETES;
134
135
    // Writers for delete files
136
    std::vector<std::unique_ptr<VIcebergDeleteFileWriter>> _writers;
137
138
    // Collected commit data from all writers
139
    std::vector<TIcebergCommitData> _commit_data_list;
140
    // TODO: All deletions are held in memory until close(). Consider flushing
141
    //  per-file when the upstream guarantees file_path ordering, or flushing
142
    //  when estimated memory exceeds a threshold, to reduce peak memory usage.
143
    std::map<std::string, IcebergFileDeletion> _file_deletions;
144
    std::map<std::string, std::vector<TIcebergDeleteFileDesc>> _rewritable_delete_files;
145
146
    // Hadoop configuration
147
    std::map<std::string, std::string> _hadoop_conf;
148
149
    // File format settings
150
    TFileFormatType::type _file_format_type = TFileFormatType::FORMAT_PARQUET;
151
    TFileCompressType::type _compress_type = TFileCompressType::SNAPPYBLOCK;
152
153
    // Output directory for delete files
154
    std::string _output_path;
155
    std::string _table_location;
156
157
    TFileType::type _file_type = TFileType::FILE_HDFS;
158
    std::vector<TNetworkAddress> _broker_addresses;
159
160
    // Partition information
161
    int32_t _partition_spec_id = 0;
162
    std::string _partition_data_json;
163
164
    // Counters
165
    size_t _row_count = 0;
166
    size_t _delete_file_count = 0;
167
168
    // Profile counters
169
    RuntimeProfile::Counter* _written_rows_counter = nullptr;
170
    RuntimeProfile::Counter* _send_data_timer = nullptr;
171
    RuntimeProfile::Counter* _write_delete_files_timer = nullptr;
172
    RuntimeProfile::Counter* _delete_file_count_counter = nullptr;
173
    RuntimeProfile::Counter* _open_timer = nullptr;
174
    RuntimeProfile::Counter* _close_timer = nullptr;
175
176
    VExprContextSPtrs _position_delete_output_expr_ctxs;
177
};
178
179
} // namespace doris