be/src/exec/sink/viceberg_merge_sink.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/viceberg_merge_sink.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | |
22 | | #include "common/consts.h" |
23 | | #include "common/exception.h" |
24 | | #include "common/logging.h" |
25 | | #include "core/block/block.h" |
26 | | #include "core/column/column_nullable.h" |
27 | | #include "core/column/column_vector.h" |
28 | | #include "exec/sink/sink_common.h" |
29 | | #include "exec/sink/viceberg_delete_sink.h" |
30 | | #include "exec/sink/writer/iceberg/viceberg_table_writer.h" |
31 | | #include "exprs/vexpr_context.h" |
32 | | #include "format/table/iceberg/schema.h" |
33 | | #include "format/table/iceberg/schema_parser.h" |
34 | | #include "runtime/runtime_state.h" |
35 | | #include "util/string_util.h" |
36 | | |
37 | | namespace doris { |
38 | | |
39 | | namespace {} // namespace |
40 | | |
41 | | VIcebergMergeSink::VIcebergMergeSink(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
42 | | std::shared_ptr<Dependency> dep, |
43 | | std::shared_ptr<Dependency> fin_dep) |
44 | 646 | : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) { |
45 | 646 | DCHECK(_t_sink.__isset.iceberg_merge_sink); |
46 | 646 | } |
47 | | |
48 | 646 | VIcebergMergeSink::~VIcebergMergeSink() = default; |
49 | | |
50 | 646 | Status VIcebergMergeSink::init_properties(ObjectPool* pool, const RowDescriptor& row_desc) { |
51 | 646 | RETURN_IF_ERROR(_build_inner_sinks()); |
52 | | |
53 | 646 | _table_writer = std::make_unique<VIcebergTableWriter>(_table_sink, _table_output_expr_ctxs, |
54 | 646 | nullptr, nullptr); |
55 | 646 | _delete_writer = std::make_unique<VIcebergDeleteSink>(_delete_sink, _delete_output_expr_ctxs, |
56 | 646 | nullptr, nullptr); |
57 | 646 | RETURN_IF_ERROR(_table_writer->init_properties(pool, row_desc)); |
58 | 646 | RETURN_IF_ERROR(_delete_writer->init_properties(pool)); |
59 | 646 | return Status::OK(); |
60 | 646 | } |
61 | | |
62 | 646 | Status VIcebergMergeSink::open(RuntimeState* state, RuntimeProfile* profile) { |
63 | 646 | _state = state; |
64 | | |
65 | 646 | _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT); |
66 | 646 | _insert_rows_counter = ADD_COUNTER(profile, "InsertRows", TUnit::UNIT); |
67 | 646 | _delete_rows_counter = ADD_COUNTER(profile, "DeleteRows", TUnit::UNIT); |
68 | 646 | _send_data_timer = ADD_TIMER(profile, "SendDataTime"); |
69 | 646 | _open_timer = ADD_TIMER(profile, "OpenTime"); |
70 | 646 | _close_timer = ADD_TIMER(profile, "CloseTime"); |
71 | | |
72 | 646 | SCOPED_TIMER(_open_timer); |
73 | | |
74 | 646 | RETURN_IF_ERROR(_prepare_output_layout()); |
75 | | |
76 | 644 | RuntimeProfile* table_profile = profile->create_child("IcebergMergeTableWriter", true, true); |
77 | 644 | RuntimeProfile* delete_profile = profile->create_child("IcebergMergeDeleteWriter", true, true); |
78 | | |
79 | 644 | RETURN_IF_ERROR(_table_writer->open(state, table_profile)); |
80 | 643 | RETURN_IF_ERROR(_delete_writer->open(state, delete_profile)); |
81 | | |
82 | 643 | return Status::OK(); |
83 | 643 | } |
84 | | |
85 | 151 | Status VIcebergMergeSink::write(RuntimeState* state, Block& block) { |
86 | 151 | SCOPED_TIMER(_send_data_timer); |
87 | 151 | if (block.rows() == 0) { |
88 | 0 | return Status::OK(); |
89 | 0 | } |
90 | | |
91 | 151 | Block output_block; |
92 | 151 | RETURN_IF_ERROR(_projection_block(block, &output_block)); |
93 | 151 | if (output_block.rows() == 0) { |
94 | 0 | return Status::OK(); |
95 | 0 | } |
96 | | |
97 | 151 | _row_count += output_block.rows(); |
98 | | |
99 | 151 | if (_operation_idx < 0 || _row_id_idx < 0) { |
100 | 0 | return Status::InternalError("Iceberg merge sink missing operation/row_id columns"); |
101 | 0 | } |
102 | | |
103 | 151 | const auto& op_column = output_block.get_by_position(_operation_idx).column; |
104 | 151 | const auto* op_data = remove_nullable(op_column).get(); |
105 | | |
106 | 151 | IColumn::Filter delete_filter(output_block.rows(), 0); |
107 | 151 | IColumn::Filter insert_filter(output_block.rows(), 0); |
108 | 151 | bool has_delete = false; |
109 | 151 | bool has_insert = false; |
110 | 151 | size_t delete_rows = 0; |
111 | 151 | size_t insert_rows = 0; |
112 | | |
113 | 340 | for (size_t i = 0; i < output_block.rows(); ++i) { |
114 | 190 | int8_t op = static_cast<int8_t>(op_data->get_int(i)); |
115 | 190 | bool delete_op = is_delete_op(op); |
116 | 190 | bool insert_op = is_insert_op(op); |
117 | 190 | if (!delete_op && !insert_op) { |
118 | 1 | return Status::InternalError("Unknown Iceberg merge operation {}", op); |
119 | 1 | } |
120 | 189 | if (delete_op) { |
121 | 95 | delete_filter[i] = 1; |
122 | 95 | has_delete = true; |
123 | 95 | ++_delete_row_count; |
124 | 95 | ++delete_rows; |
125 | 95 | } |
126 | 189 | if (insert_op) { |
127 | 95 | insert_filter[i] = 1; |
128 | 95 | has_insert = true; |
129 | 95 | ++_insert_row_count; |
130 | 95 | ++insert_rows; |
131 | 95 | } |
132 | 189 | } |
133 | | |
134 | 150 | bool skip_io = false; |
135 | | #ifdef BE_TEST |
136 | | skip_io = _skip_io; |
137 | | #endif |
138 | | |
139 | 150 | if (has_delete && !skip_io) { |
140 | 84 | Block delete_block = output_block; |
141 | 84 | std::vector<int> delete_indices {_row_id_idx}; |
142 | 84 | delete_block.erase_not_in(delete_indices); |
143 | 84 | Block::filter_block_internal(&delete_block, delete_filter); |
144 | 84 | RETURN_IF_ERROR(_delete_writer->write(state, delete_block)); |
145 | 84 | } |
146 | | |
147 | 150 | if (has_insert && !skip_io) { |
148 | 84 | if (_data_column_indices.empty()) { |
149 | 0 | return Status::InternalError("Iceberg merge sink has no data columns for insert"); |
150 | 0 | } |
151 | 84 | Block insert_block = output_block; |
152 | 84 | insert_block.erase_not_in(_data_column_indices); |
153 | 84 | Block::filter_block_internal(&insert_block, insert_filter); |
154 | 84 | RETURN_IF_ERROR(_table_writer->write_prepared_block(insert_block)); |
155 | 84 | } |
156 | | |
157 | 150 | if (_written_rows_counter != nullptr) { |
158 | 150 | COUNTER_UPDATE(_written_rows_counter, output_block.rows()); |
159 | 150 | } |
160 | 150 | if (_insert_rows_counter != nullptr) { |
161 | 150 | COUNTER_UPDATE(_insert_rows_counter, insert_rows); |
162 | 150 | } |
163 | 150 | if (_delete_rows_counter != nullptr) { |
164 | 150 | COUNTER_UPDATE(_delete_rows_counter, delete_rows); |
165 | 150 | } |
166 | | |
167 | 150 | return Status::OK(); |
168 | 150 | } |
169 | | |
170 | 642 | Status VIcebergMergeSink::close(Status close_status) { |
171 | 642 | SCOPED_TIMER(_close_timer); |
172 | | |
173 | 642 | if (!close_status.ok()) { |
174 | 0 | LOG(WARNING) << fmt::format("VIcebergMergeSink close with error: {}", |
175 | 0 | close_status.to_string()); |
176 | 0 | if (_table_writer) { |
177 | 0 | static_cast<void>(_table_writer->close(close_status)); |
178 | 0 | } |
179 | 0 | if (_delete_writer) { |
180 | 0 | static_cast<void>(_delete_writer->close(close_status)); |
181 | 0 | } |
182 | 0 | return close_status; |
183 | 0 | } |
184 | | |
185 | 642 | Status table_status = Status::OK(); |
186 | 642 | Status delete_status = Status::OK(); |
187 | 642 | if (_table_writer) { |
188 | 642 | table_status = _table_writer->close(close_status); |
189 | 642 | } |
190 | 642 | if (_delete_writer) { |
191 | 642 | delete_status = _delete_writer->close(close_status); |
192 | 642 | } |
193 | | |
194 | 642 | if (_written_rows_counter != nullptr) { |
195 | 642 | COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count)); |
196 | 642 | } |
197 | 642 | if (_insert_rows_counter != nullptr) { |
198 | 642 | COUNTER_SET(_insert_rows_counter, static_cast<int64_t>(_insert_row_count)); |
199 | 642 | } |
200 | 642 | if (_delete_rows_counter != nullptr) { |
201 | 642 | COUNTER_SET(_delete_rows_counter, static_cast<int64_t>(_delete_row_count)); |
202 | 642 | } |
203 | | |
204 | 642 | if (!table_status.ok()) { |
205 | 0 | return table_status; |
206 | 0 | } |
207 | 642 | return delete_status; |
208 | 642 | } |
209 | | |
210 | 646 | Status VIcebergMergeSink::_build_inner_sinks() { |
211 | 646 | if (!_t_sink.__isset.iceberg_merge_sink) { |
212 | 0 | return Status::InternalError("Missing iceberg merge sink config"); |
213 | 0 | } |
214 | | |
215 | 646 | const auto& merge_sink = _t_sink.iceberg_merge_sink; |
216 | | |
217 | 646 | TIcebergTableSink table_sink; |
218 | 646 | if (merge_sink.__isset.db_name) { |
219 | 646 | table_sink.__set_db_name(merge_sink.db_name); |
220 | 646 | } |
221 | 646 | if (merge_sink.__isset.tb_name) { |
222 | 646 | table_sink.__set_tb_name(merge_sink.tb_name); |
223 | 646 | } |
224 | 646 | if (merge_sink.__isset.schema_json) { |
225 | 646 | table_sink.__set_schema_json(merge_sink.schema_json); |
226 | 646 | } |
227 | 646 | if (merge_sink.__isset.partition_specs_json) { |
228 | 160 | table_sink.__set_partition_specs_json(merge_sink.partition_specs_json); |
229 | 160 | } |
230 | 646 | if (merge_sink.__isset.partition_spec_id) { |
231 | 166 | table_sink.__set_partition_spec_id(merge_sink.partition_spec_id); |
232 | 166 | } |
233 | 646 | if (merge_sink.__isset.sort_fields) { |
234 | 0 | table_sink.__set_sort_fields(merge_sink.sort_fields); |
235 | 0 | } |
236 | 646 | if (merge_sink.__isset.file_format) { |
237 | 646 | table_sink.__set_file_format(merge_sink.file_format); |
238 | 646 | } |
239 | 646 | if (merge_sink.__isset.compression_type) { |
240 | 646 | table_sink.__set_compression_type(merge_sink.compression_type); |
241 | 646 | } |
242 | 646 | if (merge_sink.__isset.output_path) { |
243 | 646 | table_sink.__set_output_path(merge_sink.output_path); |
244 | 646 | } |
245 | 646 | if (merge_sink.__isset.original_output_path) { |
246 | 646 | table_sink.__set_original_output_path(merge_sink.original_output_path); |
247 | 646 | } |
248 | 646 | if (merge_sink.__isset.hadoop_config) { |
249 | 640 | table_sink.__set_hadoop_config(merge_sink.hadoop_config); |
250 | 640 | } |
251 | 646 | if (merge_sink.__isset.file_type) { |
252 | 646 | table_sink.__set_file_type(merge_sink.file_type); |
253 | 646 | } |
254 | 646 | if (merge_sink.__isset.broker_addresses) { |
255 | 0 | table_sink.__set_broker_addresses(merge_sink.broker_addresses); |
256 | 0 | } |
257 | 646 | _table_sink.__set_type(TDataSinkType::ICEBERG_TABLE_SINK); |
258 | 646 | _table_sink.__set_iceberg_table_sink(table_sink); |
259 | | |
260 | 646 | TIcebergDeleteSink delete_sink; |
261 | 646 | if (merge_sink.__isset.db_name) { |
262 | 646 | delete_sink.__set_db_name(merge_sink.db_name); |
263 | 646 | } |
264 | 646 | if (merge_sink.__isset.tb_name) { |
265 | 646 | delete_sink.__set_tb_name(merge_sink.tb_name); |
266 | 646 | } |
267 | 646 | if (merge_sink.__isset.delete_type) { |
268 | 646 | delete_sink.__set_delete_type(merge_sink.delete_type); |
269 | 646 | } |
270 | 646 | if (merge_sink.__isset.file_format) { |
271 | 646 | delete_sink.__set_file_format(merge_sink.file_format); |
272 | 646 | } |
273 | 646 | if (merge_sink.__isset.compression_type) { |
274 | 646 | delete_sink.__set_compress_type(merge_sink.compression_type); |
275 | 646 | } |
276 | 646 | if (merge_sink.__isset.output_path) { |
277 | 646 | delete_sink.__set_output_path(merge_sink.output_path); |
278 | 646 | } |
279 | 646 | if (merge_sink.__isset.table_location) { |
280 | 646 | delete_sink.__set_table_location(merge_sink.table_location); |
281 | 646 | } |
282 | 646 | if (merge_sink.__isset.hadoop_config) { |
283 | 640 | delete_sink.__set_hadoop_config(merge_sink.hadoop_config); |
284 | 640 | } |
285 | 646 | if (merge_sink.__isset.file_type) { |
286 | 646 | delete_sink.__set_file_type(merge_sink.file_type); |
287 | 646 | } |
288 | 646 | if (merge_sink.__isset.partition_spec_id_for_delete) { |
289 | 166 | delete_sink.__set_partition_spec_id(merge_sink.partition_spec_id_for_delete); |
290 | 166 | } |
291 | 646 | if (merge_sink.__isset.partition_data_json_for_delete) { |
292 | 0 | delete_sink.__set_partition_data_json(merge_sink.partition_data_json_for_delete); |
293 | 0 | } |
294 | 646 | if (merge_sink.__isset.broker_addresses) { |
295 | 0 | delete_sink.__set_broker_addresses(merge_sink.broker_addresses); |
296 | 0 | } |
297 | 646 | if (merge_sink.__isset.format_version) { |
298 | 640 | delete_sink.__set_format_version(merge_sink.format_version); |
299 | 640 | } |
300 | 646 | if (merge_sink.__isset.rewritable_delete_file_sets) { |
301 | 640 | delete_sink.__set_rewritable_delete_file_sets(merge_sink.rewritable_delete_file_sets); |
302 | 640 | } |
303 | 646 | _delete_sink.__set_type(TDataSinkType::ICEBERG_DELETE_SINK); |
304 | 646 | _delete_sink.__set_iceberg_delete_sink(delete_sink); |
305 | | |
306 | 646 | return Status::OK(); |
307 | 646 | } |
308 | | |
309 | 646 | Status VIcebergMergeSink::_prepare_output_layout() { |
310 | 646 | if (_vec_output_expr_ctxs.empty()) { |
311 | 0 | return Status::InternalError("Iceberg merge sink has empty output expressions"); |
312 | 0 | } |
313 | | |
314 | 646 | std::string row_id_name = doris::to_lower(BeConsts::ICEBERG_ROWID_COL); |
315 | 646 | std::string op_name = doris::to_lower(kOperationColumnName); |
316 | | |
317 | 646 | _operation_idx = -1; |
318 | 646 | _row_id_idx = -1; |
319 | 4.44k | for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) { |
320 | 3.79k | std::string expr_name = doris::to_lower(_vec_output_expr_ctxs[i]->expr_name()); |
321 | 3.79k | if (_operation_idx < 0 && expr_name == op_name) { |
322 | 645 | _operation_idx = static_cast<int>(i); |
323 | 3.15k | } else if (_row_id_idx < 0 && expr_name == row_id_name) { |
324 | 645 | _row_id_idx = static_cast<int>(i); |
325 | 645 | } |
326 | 3.79k | } |
327 | | |
328 | 646 | if (_operation_idx < 0) { |
329 | 1 | return Status::InternalError("Iceberg merge sink missing operation column"); |
330 | 1 | } |
331 | 645 | if (_row_id_idx < 0) { |
332 | 1 | return Status::InternalError("Iceberg merge sink missing row_id column"); |
333 | 1 | } |
334 | | |
335 | 644 | _data_column_indices.clear(); |
336 | 644 | _table_output_expr_ctxs.clear(); |
337 | 4.43k | for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) { |
338 | 3.78k | if (static_cast<int>(i) == _operation_idx || static_cast<int>(i) == _row_id_idx) { |
339 | 1.28k | continue; |
340 | 1.28k | } |
341 | 2.49k | _data_column_indices.push_back(static_cast<int>(i)); |
342 | 2.49k | _table_output_expr_ctxs.emplace_back(_vec_output_expr_ctxs[i]); |
343 | 2.49k | } |
344 | | |
345 | 644 | return Status::OK(); |
346 | 645 | } |
347 | | |
348 | | } // namespace doris |