Coverage Report

Created: 2026-04-10 12:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/viceberg_merge_sink.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/viceberg_merge_sink.h"
19
20
#include <fmt/format.h>
21
22
#include "common/consts.h"
23
#include "common/exception.h"
24
#include "common/logging.h"
25
#include "core/block/block.h"
26
#include "core/column/column_nullable.h"
27
#include "core/column/column_vector.h"
28
#include "exec/sink/sink_common.h"
29
#include "exec/sink/viceberg_delete_sink.h"
30
#include "exec/sink/writer/iceberg/viceberg_table_writer.h"
31
#include "exprs/vexpr_context.h"
32
#include "format/table/iceberg/schema.h"
33
#include "format/table/iceberg/schema_parser.h"
34
#include "runtime/runtime_state.h"
35
#include "util/string_util.h"
36
37
namespace doris {
38
39
namespace {} // namespace
40
41
VIcebergMergeSink::VIcebergMergeSink(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
42
                                     std::shared_ptr<Dependency> dep,
43
                                     std::shared_ptr<Dependency> fin_dep)
44
646
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
45
646
    DCHECK(_t_sink.__isset.iceberg_merge_sink);
46
646
}
47
48
646
VIcebergMergeSink::~VIcebergMergeSink() = default;
49
50
646
Status VIcebergMergeSink::init_properties(ObjectPool* pool, const RowDescriptor& row_desc) {
51
646
    RETURN_IF_ERROR(_build_inner_sinks());
52
53
646
    _table_writer = std::make_unique<VIcebergTableWriter>(_table_sink, _table_output_expr_ctxs,
54
646
                                                          nullptr, nullptr);
55
646
    _delete_writer = std::make_unique<VIcebergDeleteSink>(_delete_sink, _delete_output_expr_ctxs,
56
646
                                                          nullptr, nullptr);
57
646
    RETURN_IF_ERROR(_table_writer->init_properties(pool, row_desc));
58
646
    RETURN_IF_ERROR(_delete_writer->init_properties(pool));
59
646
    return Status::OK();
60
646
}
61
62
646
Status VIcebergMergeSink::open(RuntimeState* state, RuntimeProfile* profile) {
63
646
    _state = state;
64
65
646
    _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
66
646
    _insert_rows_counter = ADD_COUNTER(profile, "InsertRows", TUnit::UNIT);
67
646
    _delete_rows_counter = ADD_COUNTER(profile, "DeleteRows", TUnit::UNIT);
68
646
    _send_data_timer = ADD_TIMER(profile, "SendDataTime");
69
646
    _open_timer = ADD_TIMER(profile, "OpenTime");
70
646
    _close_timer = ADD_TIMER(profile, "CloseTime");
71
72
646
    SCOPED_TIMER(_open_timer);
73
74
646
    RETURN_IF_ERROR(_prepare_output_layout());
75
76
644
    RuntimeProfile* table_profile = profile->create_child("IcebergMergeTableWriter", true, true);
77
644
    RuntimeProfile* delete_profile = profile->create_child("IcebergMergeDeleteWriter", true, true);
78
79
644
    RETURN_IF_ERROR(_table_writer->open(state, table_profile));
80
643
    RETURN_IF_ERROR(_delete_writer->open(state, delete_profile));
81
82
643
    return Status::OK();
83
643
}
84
85
151
Status VIcebergMergeSink::write(RuntimeState* state, Block& block) {
86
151
    SCOPED_TIMER(_send_data_timer);
87
151
    if (block.rows() == 0) {
88
0
        return Status::OK();
89
0
    }
90
91
151
    Block output_block;
92
151
    RETURN_IF_ERROR(_projection_block(block, &output_block));
93
151
    if (output_block.rows() == 0) {
94
0
        return Status::OK();
95
0
    }
96
97
151
    _row_count += output_block.rows();
98
99
151
    if (_operation_idx < 0 || _row_id_idx < 0) {
100
0
        return Status::InternalError("Iceberg merge sink missing operation/row_id columns");
101
0
    }
102
103
151
    const auto& op_column = output_block.get_by_position(_operation_idx).column;
104
151
    const auto* op_data = remove_nullable(op_column).get();
105
106
151
    IColumn::Filter delete_filter(output_block.rows(), 0);
107
151
    IColumn::Filter insert_filter(output_block.rows(), 0);
108
151
    bool has_delete = false;
109
151
    bool has_insert = false;
110
151
    size_t delete_rows = 0;
111
151
    size_t insert_rows = 0;
112
113
340
    for (size_t i = 0; i < output_block.rows(); ++i) {
114
190
        int8_t op = static_cast<int8_t>(op_data->get_int(i));
115
190
        bool delete_op = is_delete_op(op);
116
190
        bool insert_op = is_insert_op(op);
117
190
        if (!delete_op && !insert_op) {
118
1
            return Status::InternalError("Unknown Iceberg merge operation {}", op);
119
1
        }
120
189
        if (delete_op) {
121
95
            delete_filter[i] = 1;
122
95
            has_delete = true;
123
95
            ++_delete_row_count;
124
95
            ++delete_rows;
125
95
        }
126
189
        if (insert_op) {
127
95
            insert_filter[i] = 1;
128
95
            has_insert = true;
129
95
            ++_insert_row_count;
130
95
            ++insert_rows;
131
95
        }
132
189
    }
133
134
150
    bool skip_io = false;
135
#ifdef BE_TEST
136
    skip_io = _skip_io;
137
#endif
138
139
150
    if (has_delete && !skip_io) {
140
84
        Block delete_block = output_block;
141
84
        std::vector<int> delete_indices {_row_id_idx};
142
84
        delete_block.erase_not_in(delete_indices);
143
84
        Block::filter_block_internal(&delete_block, delete_filter);
144
84
        RETURN_IF_ERROR(_delete_writer->write(state, delete_block));
145
84
    }
146
147
150
    if (has_insert && !skip_io) {
148
84
        if (_data_column_indices.empty()) {
149
0
            return Status::InternalError("Iceberg merge sink has no data columns for insert");
150
0
        }
151
84
        Block insert_block = output_block;
152
84
        insert_block.erase_not_in(_data_column_indices);
153
84
        Block::filter_block_internal(&insert_block, insert_filter);
154
84
        RETURN_IF_ERROR(_table_writer->write_prepared_block(insert_block));
155
84
    }
156
157
150
    if (_written_rows_counter != nullptr) {
158
150
        COUNTER_UPDATE(_written_rows_counter, output_block.rows());
159
150
    }
160
150
    if (_insert_rows_counter != nullptr) {
161
150
        COUNTER_UPDATE(_insert_rows_counter, insert_rows);
162
150
    }
163
150
    if (_delete_rows_counter != nullptr) {
164
150
        COUNTER_UPDATE(_delete_rows_counter, delete_rows);
165
150
    }
166
167
150
    return Status::OK();
168
150
}
169
170
642
Status VIcebergMergeSink::close(Status close_status) {
171
642
    SCOPED_TIMER(_close_timer);
172
173
642
    if (!close_status.ok()) {
174
0
        LOG(WARNING) << fmt::format("VIcebergMergeSink close with error: {}",
175
0
                                    close_status.to_string());
176
0
        if (_table_writer) {
177
0
            static_cast<void>(_table_writer->close(close_status));
178
0
        }
179
0
        if (_delete_writer) {
180
0
            static_cast<void>(_delete_writer->close(close_status));
181
0
        }
182
0
        return close_status;
183
0
    }
184
185
642
    Status table_status = Status::OK();
186
642
    Status delete_status = Status::OK();
187
642
    if (_table_writer) {
188
642
        table_status = _table_writer->close(close_status);
189
642
    }
190
642
    if (_delete_writer) {
191
642
        delete_status = _delete_writer->close(close_status);
192
642
    }
193
194
642
    if (_written_rows_counter != nullptr) {
195
642
        COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
196
642
    }
197
642
    if (_insert_rows_counter != nullptr) {
198
642
        COUNTER_SET(_insert_rows_counter, static_cast<int64_t>(_insert_row_count));
199
642
    }
200
642
    if (_delete_rows_counter != nullptr) {
201
642
        COUNTER_SET(_delete_rows_counter, static_cast<int64_t>(_delete_row_count));
202
642
    }
203
204
642
    if (!table_status.ok()) {
205
0
        return table_status;
206
0
    }
207
642
    return delete_status;
208
642
}
209
210
646
Status VIcebergMergeSink::_build_inner_sinks() {
211
646
    if (!_t_sink.__isset.iceberg_merge_sink) {
212
0
        return Status::InternalError("Missing iceberg merge sink config");
213
0
    }
214
215
646
    const auto& merge_sink = _t_sink.iceberg_merge_sink;
216
217
646
    TIcebergTableSink table_sink;
218
646
    if (merge_sink.__isset.db_name) {
219
646
        table_sink.__set_db_name(merge_sink.db_name);
220
646
    }
221
646
    if (merge_sink.__isset.tb_name) {
222
646
        table_sink.__set_tb_name(merge_sink.tb_name);
223
646
    }
224
646
    if (merge_sink.__isset.schema_json) {
225
646
        table_sink.__set_schema_json(merge_sink.schema_json);
226
646
    }
227
646
    if (merge_sink.__isset.partition_specs_json) {
228
160
        table_sink.__set_partition_specs_json(merge_sink.partition_specs_json);
229
160
    }
230
646
    if (merge_sink.__isset.partition_spec_id) {
231
166
        table_sink.__set_partition_spec_id(merge_sink.partition_spec_id);
232
166
    }
233
646
    if (merge_sink.__isset.sort_fields) {
234
0
        table_sink.__set_sort_fields(merge_sink.sort_fields);
235
0
    }
236
646
    if (merge_sink.__isset.file_format) {
237
646
        table_sink.__set_file_format(merge_sink.file_format);
238
646
    }
239
646
    if (merge_sink.__isset.compression_type) {
240
646
        table_sink.__set_compression_type(merge_sink.compression_type);
241
646
    }
242
646
    if (merge_sink.__isset.output_path) {
243
646
        table_sink.__set_output_path(merge_sink.output_path);
244
646
    }
245
646
    if (merge_sink.__isset.original_output_path) {
246
646
        table_sink.__set_original_output_path(merge_sink.original_output_path);
247
646
    }
248
646
    if (merge_sink.__isset.hadoop_config) {
249
640
        table_sink.__set_hadoop_config(merge_sink.hadoop_config);
250
640
    }
251
646
    if (merge_sink.__isset.file_type) {
252
646
        table_sink.__set_file_type(merge_sink.file_type);
253
646
    }
254
646
    if (merge_sink.__isset.broker_addresses) {
255
0
        table_sink.__set_broker_addresses(merge_sink.broker_addresses);
256
0
    }
257
646
    _table_sink.__set_type(TDataSinkType::ICEBERG_TABLE_SINK);
258
646
    _table_sink.__set_iceberg_table_sink(table_sink);
259
260
646
    TIcebergDeleteSink delete_sink;
261
646
    if (merge_sink.__isset.db_name) {
262
646
        delete_sink.__set_db_name(merge_sink.db_name);
263
646
    }
264
646
    if (merge_sink.__isset.tb_name) {
265
646
        delete_sink.__set_tb_name(merge_sink.tb_name);
266
646
    }
267
646
    if (merge_sink.__isset.delete_type) {
268
646
        delete_sink.__set_delete_type(merge_sink.delete_type);
269
646
    }
270
646
    if (merge_sink.__isset.file_format) {
271
646
        delete_sink.__set_file_format(merge_sink.file_format);
272
646
    }
273
646
    if (merge_sink.__isset.compression_type) {
274
646
        delete_sink.__set_compress_type(merge_sink.compression_type);
275
646
    }
276
646
    if (merge_sink.__isset.output_path) {
277
646
        delete_sink.__set_output_path(merge_sink.output_path);
278
646
    }
279
646
    if (merge_sink.__isset.table_location) {
280
646
        delete_sink.__set_table_location(merge_sink.table_location);
281
646
    }
282
646
    if (merge_sink.__isset.hadoop_config) {
283
640
        delete_sink.__set_hadoop_config(merge_sink.hadoop_config);
284
640
    }
285
646
    if (merge_sink.__isset.file_type) {
286
646
        delete_sink.__set_file_type(merge_sink.file_type);
287
646
    }
288
646
    if (merge_sink.__isset.partition_spec_id_for_delete) {
289
166
        delete_sink.__set_partition_spec_id(merge_sink.partition_spec_id_for_delete);
290
166
    }
291
646
    if (merge_sink.__isset.partition_data_json_for_delete) {
292
0
        delete_sink.__set_partition_data_json(merge_sink.partition_data_json_for_delete);
293
0
    }
294
646
    if (merge_sink.__isset.broker_addresses) {
295
0
        delete_sink.__set_broker_addresses(merge_sink.broker_addresses);
296
0
    }
297
646
    if (merge_sink.__isset.format_version) {
298
640
        delete_sink.__set_format_version(merge_sink.format_version);
299
640
    }
300
646
    if (merge_sink.__isset.rewritable_delete_file_sets) {
301
640
        delete_sink.__set_rewritable_delete_file_sets(merge_sink.rewritable_delete_file_sets);
302
640
    }
303
646
    _delete_sink.__set_type(TDataSinkType::ICEBERG_DELETE_SINK);
304
646
    _delete_sink.__set_iceberg_delete_sink(delete_sink);
305
306
646
    return Status::OK();
307
646
}
308
309
646
Status VIcebergMergeSink::_prepare_output_layout() {
310
646
    if (_vec_output_expr_ctxs.empty()) {
311
0
        return Status::InternalError("Iceberg merge sink has empty output expressions");
312
0
    }
313
314
646
    std::string row_id_name = doris::to_lower(BeConsts::ICEBERG_ROWID_COL);
315
646
    std::string op_name = doris::to_lower(kOperationColumnName);
316
317
646
    _operation_idx = -1;
318
646
    _row_id_idx = -1;
319
4.44k
    for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) {
320
3.79k
        std::string expr_name = doris::to_lower(_vec_output_expr_ctxs[i]->expr_name());
321
3.79k
        if (_operation_idx < 0 && expr_name == op_name) {
322
645
            _operation_idx = static_cast<int>(i);
323
3.15k
        } else if (_row_id_idx < 0 && expr_name == row_id_name) {
324
645
            _row_id_idx = static_cast<int>(i);
325
645
        }
326
3.79k
    }
327
328
646
    if (_operation_idx < 0) {
329
1
        return Status::InternalError("Iceberg merge sink missing operation column");
330
1
    }
331
645
    if (_row_id_idx < 0) {
332
1
        return Status::InternalError("Iceberg merge sink missing row_id column");
333
1
    }
334
335
644
    _data_column_indices.clear();
336
644
    _table_output_expr_ctxs.clear();
337
4.43k
    for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) {
338
3.78k
        if (static_cast<int>(i) == _operation_idx || static_cast<int>(i) == _row_id_idx) {
339
1.28k
            continue;
340
1.28k
        }
341
2.49k
        _data_column_indices.push_back(static_cast<int>(i));
342
2.49k
        _table_output_expr_ctxs.emplace_back(_vec_output_expr_ctxs[i]);
343
2.49k
    }
344
345
644
    return Status::OK();
346
645
}
347
348
} // namespace doris