Coverage Report

Created: 2026-03-29 11:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/viceberg_merge_sink.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/viceberg_merge_sink.h"
19
20
#include <fmt/format.h>
21
22
#include "common/consts.h"
23
#include "common/exception.h"
24
#include "common/logging.h"
25
#include "core/block/block.h"
26
#include "core/column/column_nullable.h"
27
#include "core/column/column_vector.h"
28
#include "exec/sink/sink_common.h"
29
#include "exec/sink/viceberg_delete_sink.h"
30
#include "exec/sink/writer/iceberg/viceberg_table_writer.h"
31
#include "format/table/iceberg/schema.h"
32
#include "format/table/iceberg/schema_parser.h"
33
#include "runtime/runtime_state.h"
34
#include "util/string_util.h"
35
36
namespace doris {
37
38
namespace {} // namespace
39
40
VIcebergMergeSink::VIcebergMergeSink(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
41
                                     std::shared_ptr<Dependency> dep,
42
                                     std::shared_ptr<Dependency> fin_dep)
43
518
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
44
518
    DCHECK(_t_sink.__isset.iceberg_merge_sink);
45
518
}
46
47
518
VIcebergMergeSink::~VIcebergMergeSink() = default;
48
49
518
Status VIcebergMergeSink::init_properties(ObjectPool* pool, const RowDescriptor& row_desc) {
50
518
    RETURN_IF_ERROR(_build_inner_sinks());
51
52
518
    _table_writer = std::make_unique<VIcebergTableWriter>(_table_sink, _table_output_expr_ctxs,
53
518
                                                          nullptr, nullptr);
54
518
    _delete_writer = std::make_unique<VIcebergDeleteSink>(_delete_sink, _delete_output_expr_ctxs,
55
518
                                                          nullptr, nullptr);
56
518
    RETURN_IF_ERROR(_table_writer->init_properties(pool, row_desc));
57
518
    RETURN_IF_ERROR(_delete_writer->init_properties(pool));
58
518
    return Status::OK();
59
518
}
60
61
518
Status VIcebergMergeSink::open(RuntimeState* state, RuntimeProfile* profile) {
62
518
    _state = state;
63
64
518
    _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
65
518
    _insert_rows_counter = ADD_COUNTER(profile, "InsertRows", TUnit::UNIT);
66
518
    _delete_rows_counter = ADD_COUNTER(profile, "DeleteRows", TUnit::UNIT);
67
518
    _send_data_timer = ADD_TIMER(profile, "SendDataTime");
68
518
    _open_timer = ADD_TIMER(profile, "OpenTime");
69
518
    _close_timer = ADD_TIMER(profile, "CloseTime");
70
71
518
    SCOPED_TIMER(_open_timer);
72
73
518
    RETURN_IF_ERROR(_prepare_output_layout());
74
75
516
    RuntimeProfile* table_profile = profile->create_child("IcebergMergeTableWriter", true, true);
76
516
    RuntimeProfile* delete_profile = profile->create_child("IcebergMergeDeleteWriter", true, true);
77
78
516
    RETURN_IF_ERROR(_table_writer->open(state, table_profile));
79
515
    RETURN_IF_ERROR(_delete_writer->open(state, delete_profile));
80
81
515
    return Status::OK();
82
515
}
83
84
125
Status VIcebergMergeSink::write(RuntimeState* state, Block& block) {
85
125
    SCOPED_TIMER(_send_data_timer);
86
125
    if (block.rows() == 0) {
87
0
        return Status::OK();
88
0
    }
89
90
125
    Block output_block;
91
125
    RETURN_IF_ERROR(_projection_block(block, &output_block));
92
125
    if (output_block.rows() == 0) {
93
0
        return Status::OK();
94
0
    }
95
96
125
    _row_count += output_block.rows();
97
98
125
    if (_operation_idx < 0 || _row_id_idx < 0) {
99
0
        return Status::InternalError("Iceberg merge sink missing operation/row_id columns");
100
0
    }
101
102
125
    const auto& op_column = output_block.get_by_position(_operation_idx).column;
103
125
    const auto* op_data = remove_nullable(op_column).get();
104
105
125
    IColumn::Filter delete_filter(output_block.rows(), 0);
106
125
    IColumn::Filter insert_filter(output_block.rows(), 0);
107
125
    bool has_delete = false;
108
125
    bool has_insert = false;
109
125
    size_t delete_rows = 0;
110
125
    size_t insert_rows = 0;
111
112
274
    for (size_t i = 0; i < output_block.rows(); ++i) {
113
150
        int8_t op = static_cast<int8_t>(op_data->get_int(i));
114
150
        bool delete_op = is_delete_op(op);
115
150
        bool insert_op = is_insert_op(op);
116
150
        if (!delete_op && !insert_op) {
117
1
            return Status::InternalError("Unknown Iceberg merge operation {}", op);
118
1
        }
119
149
        if (delete_op) {
120
75
            delete_filter[i] = 1;
121
75
            has_delete = true;
122
75
            ++_delete_row_count;
123
75
            ++delete_rows;
124
75
        }
125
149
        if (insert_op) {
126
75
            insert_filter[i] = 1;
127
75
            has_insert = true;
128
75
            ++_insert_row_count;
129
75
            ++insert_rows;
130
75
        }
131
149
    }
132
133
124
    bool skip_io = false;
134
#ifdef BE_TEST
135
    skip_io = _skip_io;
136
#endif
137
138
124
    if (has_delete && !skip_io) {
139
64
        Block delete_block = output_block;
140
64
        std::vector<int> delete_indices {_row_id_idx};
141
64
        delete_block.erase_not_in(delete_indices);
142
64
        Block::filter_block_internal(&delete_block, delete_filter);
143
64
        RETURN_IF_ERROR(_delete_writer->write(state, delete_block));
144
64
    }
145
146
124
    if (has_insert && !skip_io) {
147
64
        if (_data_column_indices.empty()) {
148
0
            return Status::InternalError("Iceberg merge sink has no data columns for insert");
149
0
        }
150
64
        Block insert_block = output_block;
151
64
        insert_block.erase_not_in(_data_column_indices);
152
64
        Block::filter_block_internal(&insert_block, insert_filter);
153
64
        RETURN_IF_ERROR(_table_writer->write_prepared_block(insert_block));
154
64
    }
155
156
124
    if (_written_rows_counter != nullptr) {
157
124
        COUNTER_UPDATE(_written_rows_counter, output_block.rows());
158
124
    }
159
124
    if (_insert_rows_counter != nullptr) {
160
124
        COUNTER_UPDATE(_insert_rows_counter, insert_rows);
161
124
    }
162
124
    if (_delete_rows_counter != nullptr) {
163
124
        COUNTER_UPDATE(_delete_rows_counter, delete_rows);
164
124
    }
165
166
124
    return Status::OK();
167
124
}
168
169
514
Status VIcebergMergeSink::close(Status close_status) {
170
514
    SCOPED_TIMER(_close_timer);
171
172
514
    if (!close_status.ok()) {
173
0
        LOG(WARNING) << fmt::format("VIcebergMergeSink close with error: {}",
174
0
                                    close_status.to_string());
175
0
        if (_table_writer) {
176
0
            static_cast<void>(_table_writer->close(close_status));
177
0
        }
178
0
        if (_delete_writer) {
179
0
            static_cast<void>(_delete_writer->close(close_status));
180
0
        }
181
0
        return close_status;
182
0
    }
183
184
514
    Status table_status = Status::OK();
185
514
    Status delete_status = Status::OK();
186
514
    if (_table_writer) {
187
514
        table_status = _table_writer->close(close_status);
188
514
    }
189
514
    if (_delete_writer) {
190
514
        delete_status = _delete_writer->close(close_status);
191
514
    }
192
193
514
    if (_written_rows_counter != nullptr) {
194
514
        COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
195
514
    }
196
514
    if (_insert_rows_counter != nullptr) {
197
514
        COUNTER_SET(_insert_rows_counter, static_cast<int64_t>(_insert_row_count));
198
514
    }
199
514
    if (_delete_rows_counter != nullptr) {
200
514
        COUNTER_SET(_delete_rows_counter, static_cast<int64_t>(_delete_row_count));
201
514
    }
202
203
514
    if (!table_status.ok()) {
204
0
        return table_status;
205
0
    }
206
514
    return delete_status;
207
514
}
208
209
518
Status VIcebergMergeSink::_build_inner_sinks() {
210
518
    if (!_t_sink.__isset.iceberg_merge_sink) {
211
0
        return Status::InternalError("Missing iceberg merge sink config");
212
0
    }
213
214
518
    const auto& merge_sink = _t_sink.iceberg_merge_sink;
215
216
518
    TIcebergTableSink table_sink;
217
518
    if (merge_sink.__isset.db_name) {
218
518
        table_sink.__set_db_name(merge_sink.db_name);
219
518
    }
220
518
    if (merge_sink.__isset.tb_name) {
221
518
        table_sink.__set_tb_name(merge_sink.tb_name);
222
518
    }
223
518
    if (merge_sink.__isset.schema_json) {
224
518
        table_sink.__set_schema_json(merge_sink.schema_json);
225
518
    }
226
518
    if (merge_sink.__isset.partition_specs_json) {
227
64
        table_sink.__set_partition_specs_json(merge_sink.partition_specs_json);
228
64
    }
229
518
    if (merge_sink.__isset.partition_spec_id) {
230
70
        table_sink.__set_partition_spec_id(merge_sink.partition_spec_id);
231
70
    }
232
518
    if (merge_sink.__isset.sort_fields) {
233
0
        table_sink.__set_sort_fields(merge_sink.sort_fields);
234
0
    }
235
518
    if (merge_sink.__isset.file_format) {
236
518
        table_sink.__set_file_format(merge_sink.file_format);
237
518
    }
238
518
    if (merge_sink.__isset.compression_type) {
239
518
        table_sink.__set_compression_type(merge_sink.compression_type);
240
518
    }
241
518
    if (merge_sink.__isset.output_path) {
242
518
        table_sink.__set_output_path(merge_sink.output_path);
243
518
    }
244
518
    if (merge_sink.__isset.original_output_path) {
245
518
        table_sink.__set_original_output_path(merge_sink.original_output_path);
246
518
    }
247
518
    if (merge_sink.__isset.hadoop_config) {
248
512
        table_sink.__set_hadoop_config(merge_sink.hadoop_config);
249
512
    }
250
518
    if (merge_sink.__isset.file_type) {
251
518
        table_sink.__set_file_type(merge_sink.file_type);
252
518
    }
253
518
    if (merge_sink.__isset.broker_addresses) {
254
0
        table_sink.__set_broker_addresses(merge_sink.broker_addresses);
255
0
    }
256
518
    _table_sink.__set_type(TDataSinkType::ICEBERG_TABLE_SINK);
257
518
    _table_sink.__set_iceberg_table_sink(table_sink);
258
259
518
    TIcebergDeleteSink delete_sink;
260
518
    if (merge_sink.__isset.db_name) {
261
518
        delete_sink.__set_db_name(merge_sink.db_name);
262
518
    }
263
518
    if (merge_sink.__isset.tb_name) {
264
518
        delete_sink.__set_tb_name(merge_sink.tb_name);
265
518
    }
266
518
    if (merge_sink.__isset.delete_type) {
267
518
        delete_sink.__set_delete_type(merge_sink.delete_type);
268
518
    }
269
518
    if (merge_sink.__isset.file_format) {
270
518
        delete_sink.__set_file_format(merge_sink.file_format);
271
518
    }
272
518
    if (merge_sink.__isset.compression_type) {
273
518
        delete_sink.__set_compress_type(merge_sink.compression_type);
274
518
    }
275
518
    if (merge_sink.__isset.output_path) {
276
518
        delete_sink.__set_output_path(merge_sink.output_path);
277
518
    }
278
518
    if (merge_sink.__isset.table_location) {
279
518
        delete_sink.__set_table_location(merge_sink.table_location);
280
518
    }
281
518
    if (merge_sink.__isset.hadoop_config) {
282
512
        delete_sink.__set_hadoop_config(merge_sink.hadoop_config);
283
512
    }
284
518
    if (merge_sink.__isset.file_type) {
285
518
        delete_sink.__set_file_type(merge_sink.file_type);
286
518
    }
287
518
    if (merge_sink.__isset.partition_spec_id_for_delete) {
288
70
        delete_sink.__set_partition_spec_id(merge_sink.partition_spec_id_for_delete);
289
70
    }
290
518
    if (merge_sink.__isset.partition_data_json_for_delete) {
291
0
        delete_sink.__set_partition_data_json(merge_sink.partition_data_json_for_delete);
292
0
    }
293
518
    if (merge_sink.__isset.broker_addresses) {
294
0
        delete_sink.__set_broker_addresses(merge_sink.broker_addresses);
295
0
    }
296
518
    _delete_sink.__set_type(TDataSinkType::ICEBERG_DELETE_SINK);
297
518
    _delete_sink.__set_iceberg_delete_sink(delete_sink);
298
299
518
    return Status::OK();
300
518
}
301
302
516
Status VIcebergMergeSink::_prepare_output_layout() {
303
516
    if (_vec_output_expr_ctxs.empty()) {
304
0
        return Status::InternalError("Iceberg merge sink has empty output expressions");
305
0
    }
306
307
516
    std::string row_id_name = doris::to_lower(BeConsts::ICEBERG_ROWID_COL);
308
516
    std::string op_name = doris::to_lower(kOperationColumnName);
309
310
516
    _operation_idx = -1;
311
516
    _row_id_idx = -1;
312
3.31k
    for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) {
313
2.79k
        std::string expr_name = doris::to_lower(_vec_output_expr_ctxs[i]->expr_name());
314
2.79k
        if (_operation_idx < 0 && expr_name == op_name) {
315
517
            _operation_idx = static_cast<int>(i);
316
2.28k
        } else if (_row_id_idx < 0 && expr_name == row_id_name) {
317
517
            _row_id_idx = static_cast<int>(i);
318
517
        }
319
2.79k
    }
320
321
516
    if (_operation_idx < 0) {
322
1
        return Status::InternalError("Iceberg merge sink missing operation column");
323
1
    }
324
515
    if (_row_id_idx < 0) {
325
1
        return Status::InternalError("Iceberg merge sink missing row_id column");
326
1
    }
327
328
514
    _data_column_indices.clear();
329
514
    _table_output_expr_ctxs.clear();
330
3.30k
    for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) {
331
2.79k
        if (static_cast<int>(i) == _operation_idx || static_cast<int>(i) == _row_id_idx) {
332
1.03k
            continue;
333
1.03k
        }
334
1.76k
        _data_column_indices.push_back(static_cast<int>(i));
335
1.76k
        _table_output_expr_ctxs.emplace_back(_vec_output_expr_ctxs[i]);
336
1.76k
    }
337
338
514
    return Status::OK();
339
515
}
340
341
} // namespace doris