Coverage Report

Created: 2026-03-26 19:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/viceberg_delete_sink.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/DataSinks_types.h>
21
22
#include <map>
23
#include <memory>
24
#include <string>
25
#include <vector>
26
27
#include "common/status.h"
28
#include "core/block/block.h"
29
#include "core/value/bitmap_value.h"
30
#include "exec/sink/writer/async_result_writer.h"
31
#include "exec/sink/writer/iceberg/viceberg_delete_file_writer.h"
32
#include "exprs/vexpr_fwd.h"
33
#include "runtime/runtime_profile.h"
34
35
namespace doris {
36
37
class ObjectPool;
38
class RuntimeState;
39
class Dependency;
40
41
class VIcebergDeleteFileWriter;
42
43
struct IcebergFileDeletion {
44
    IcebergFileDeletion() = default;
45
    IcebergFileDeletion(int32_t spec_id, std::string data_json)
46
5
            : partition_spec_id(spec_id), partition_data_json(std::move(data_json)) {}
47
48
    int32_t partition_spec_id = 0;
49
    std::string partition_data_json;
50
    doris::detail::Roaring64Map rows_to_delete;
51
};
52
53
/**
54
 * Sink for writing Iceberg position delete files.
55
 *
56
 * This sink receives blocks containing a $row_id column with
57
 * (file_path, row_position, partition_spec_id, partition_data).
58
 * It groups delete records by file_path and writes position delete files.
59
 */
60
class VIcebergDeleteSink final : public AsyncResultWriter {
61
public:
62
    VIcebergDeleteSink(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
63
                       std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
64
65
17
    ~VIcebergDeleteSink() override = default;
66
67
    Status init_properties(ObjectPool* pool);
68
69
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
70
71
    Status write(RuntimeState* state, Block& block) override;
72
73
    Status close(Status) override;
74
75
private:
76
    /**
77
     * Extract $row_id column from block and group by file_path.
78
     */
79
    Status _collect_position_deletes(const Block& block,
80
                                     std::map<std::string, IcebergFileDeletion>& file_deletions);
81
82
    /**
83
     * Write grouped position deletes to delete files
84
     */
85
    Status _write_position_delete_files(
86
            const std::map<std::string, IcebergFileDeletion>& file_deletions);
87
88
    /**
89
     * Generate unique delete file path
90
     */
91
    std::string _generate_delete_file_path(const std::string& referenced_data_file = "");
92
93
    /**
94
     * Get $row_id column index from block
95
     */
96
    int _get_row_id_column_index(const Block& block);
97
98
    /**
99
     * Build a block for position delete (file_path, pos)
100
     */
101
    Status _build_position_delete_block(const std::string& file_path,
102
                                        const std::vector<int64_t>& positions, Block& output_block);
103
    Status _init_position_delete_output_exprs();
104
    std::string _get_file_extension() const;
105
106
    TDataSink _t_sink;
107
    RuntimeState* _state = nullptr;
108
109
    TFileContent::type _delete_type = TFileContent::POSITION_DELETES;
110
111
    // Writers for delete files
112
    std::vector<std::unique_ptr<VIcebergDeleteFileWriter>> _writers;
113
114
    // Collected commit data from all writers
115
    std::vector<TIcebergCommitData> _commit_data_list;
116
    // TODO: All deletions are held in memory until close(). Consider flushing
117
    //  per-file when the upstream guarantees file_path ordering, or flushing
118
    //  when estimated memory exceeds a threshold, to reduce peak memory usage.
119
    std::map<std::string, IcebergFileDeletion> _file_deletions;
120
121
    // Hadoop configuration
122
    std::map<std::string, std::string> _hadoop_conf;
123
124
    // File format settings
125
    TFileFormatType::type _file_format_type = TFileFormatType::FORMAT_PARQUET;
126
    TFileCompressType::type _compress_type = TFileCompressType::SNAPPYBLOCK;
127
128
    // Output directory for delete files
129
    std::string _output_path;
130
    std::string _table_location;
131
132
    TFileType::type _file_type = TFileType::FILE_HDFS;
133
    std::vector<TNetworkAddress> _broker_addresses;
134
135
    // Partition information
136
    int32_t _partition_spec_id = 0;
137
    std::string _partition_data_json;
138
139
    // Counters
140
    size_t _row_count = 0;
141
    size_t _delete_file_count = 0;
142
143
    // Profile counters
144
    RuntimeProfile::Counter* _written_rows_counter = nullptr;
145
    RuntimeProfile::Counter* _send_data_timer = nullptr;
146
    RuntimeProfile::Counter* _write_delete_files_timer = nullptr;
147
    RuntimeProfile::Counter* _delete_file_count_counter = nullptr;
148
    RuntimeProfile::Counter* _open_timer = nullptr;
149
    RuntimeProfile::Counter* _close_timer = nullptr;
150
151
    VExprContextSPtrs _position_delete_output_expr_ctxs;
152
};
153
154
} // namespace doris