be/src/format/transformer/merge_partitioner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/transformer/merge_partitioner.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cstdint> |
22 | | |
23 | | #include "common/cast_set.h" |
24 | | #include "common/config.h" |
25 | | #include "common/logging.h" |
26 | | #include "common/status.h" |
27 | | #include "core/block/block.h" |
28 | | #include "core/column/column_const.h" |
29 | | #include "core/column/column_nullable.h" |
30 | | #include "core/column/column_vector.h" |
31 | | #include "exec/sink/sink_common.h" |
32 | | #include "format/transformer/iceberg_partition_function.h" |
33 | | |
34 | | namespace doris { |
35 | | #include "common/compile_check_begin.h" |
36 | | |
37 | | namespace { |
38 | 2 | int64_t scale_threshold_by_task(int64_t value, int task_num) { |
39 | 2 | if (task_num <= 0) { |
40 | 2 | return value; |
41 | 2 | } |
42 | 0 | int64_t scaled = value / task_num; |
43 | 0 | return scaled == 0 ? value : scaled; |
44 | 2 | } |
45 | | } // namespace |
46 | | |
47 | | MergePartitioner::MergePartitioner(size_t partition_count, const TMergePartitionInfo& merge_info, |
48 | | bool use_new_shuffle_hash_method) |
49 | 5 | : PartitionerBase(static_cast<HashValType>(partition_count)), |
50 | 5 | _merge_info(merge_info), |
51 | 5 | _use_new_shuffle_hash_method(use_new_shuffle_hash_method), |
52 | 5 | _insert_random(merge_info.insert_random) {} |
53 | | |
54 | 5 | Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) { |
55 | 5 | VExprContextSPtr op_ctx; |
56 | 5 | RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, op_ctx)); |
57 | 5 | _operation_expr_ctxs.emplace_back(std::move(op_ctx)); |
58 | | |
59 | 5 | std::vector<TExpr> insert_exprs; |
60 | 5 | std::vector<TIcebergPartitionField> insert_fields; |
61 | 5 | if (_merge_info.__isset.insert_partition_exprs) { |
62 | 1 | insert_exprs = _merge_info.insert_partition_exprs; |
63 | 1 | } |
64 | 5 | if (_merge_info.__isset.insert_partition_fields) { |
65 | 2 | insert_fields = _merge_info.insert_partition_fields; |
66 | 2 | } |
67 | 5 | if (!insert_exprs.empty() || !insert_fields.empty()) { |
68 | 3 | _insert_partition_function = std::make_unique<IcebergInsertPartitionFunction>( |
69 | 3 | _partition_count, _hash_method(), std::move(insert_exprs), |
70 | 3 | std::move(insert_fields)); |
71 | 3 | RETURN_IF_ERROR(_insert_partition_function->init({})); |
72 | 3 | } |
73 | | |
74 | 5 | if (_merge_info.__isset.delete_partition_exprs && !_merge_info.delete_partition_exprs.empty()) { |
75 | 1 | _delete_partition_function = std::make_unique<IcebergDeletePartitionFunction>( |
76 | 1 | _partition_count, _hash_method(), _merge_info.delete_partition_exprs); |
77 | 1 | RETURN_IF_ERROR(_delete_partition_function->init({})); |
78 | 1 | } |
79 | 5 | return Status::OK(); |
80 | 5 | } |
81 | | |
82 | 5 | Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) { |
83 | 5 | RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc)); |
84 | 5 | if (_insert_partition_function != nullptr) { |
85 | 3 | RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc)); |
86 | 3 | } |
87 | 5 | if (_delete_partition_function != nullptr) { |
88 | 1 | RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc)); |
89 | 1 | } |
90 | 5 | return Status::OK(); |
91 | 5 | } |
92 | | |
93 | 5 | Status MergePartitioner::open(RuntimeState* state) { |
94 | 5 | RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state)); |
95 | 5 | if (_insert_partition_function != nullptr) { |
96 | 3 | RETURN_IF_ERROR(_insert_partition_function->open(state)); |
97 | 3 | if (auto* insert_function = |
98 | 3 | dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get()); |
99 | 3 | insert_function != nullptr && insert_function->fallback_to_random()) { |
100 | 1 | _insert_random = true; |
101 | 1 | } |
102 | 3 | } |
103 | 5 | if (_delete_partition_function != nullptr) { |
104 | 1 | RETURN_IF_ERROR(_delete_partition_function->open(state)); |
105 | 1 | } |
106 | 5 | _init_insert_scaling(state); |
107 | 5 | return Status::OK(); |
108 | 5 | } |
109 | | |
110 | 5 | Status MergePartitioner::close(RuntimeState* /*state*/) { |
111 | 5 | return Status::OK(); |
112 | 5 | } |
113 | | |
114 | 5 | Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) const { |
115 | 5 | const size_t rows = block->rows(); |
116 | 5 | if (rows == 0) { |
117 | 0 | _channel_ids.clear(); |
118 | 0 | return Status::OK(); |
119 | 0 | } |
120 | | |
121 | 5 | const size_t column_to_keep = block->columns(); |
122 | 5 | if (_operation_expr_ctxs.empty()) { |
123 | 0 | return Status::InternalError("Merge partitioning missing operation expression"); |
124 | 0 | } |
125 | | |
126 | 5 | int op_idx = -1; |
127 | 5 | RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx)); |
128 | 5 | if (op_idx < 0 || op_idx >= block->columns()) { |
129 | 0 | return Status::InternalError("Merge partitioning missing operation column"); |
130 | 0 | } |
131 | 5 | if (op_idx >= cast_set<int>(column_to_keep)) { |
132 | 0 | return Status::InternalError("Merge partitioning requires operation column in input block"); |
133 | 0 | } |
134 | | |
135 | 5 | const auto& op_column = block->get_by_position(op_idx).column; |
136 | 5 | const auto* op_data = remove_nullable(op_column).get(); |
137 | 5 | std::vector<int8_t> ops(rows); |
138 | 5 | bool has_insert = false; |
139 | 5 | bool has_delete = false; |
140 | 5 | bool has_update = false; |
141 | 18 | for (size_t i = 0; i < rows; ++i) { |
142 | 13 | int8_t op = static_cast<int8_t>(op_data->get_int(i)); |
143 | 13 | ops[i] = op; |
144 | 13 | if (is_insert_op(op)) { |
145 | 10 | has_insert = true; |
146 | 10 | } |
147 | 13 | if (is_delete_op(op)) { |
148 | 4 | has_delete = true; |
149 | 4 | } |
150 | 13 | if (op == kUpdateOperation) { |
151 | 1 | has_update = true; |
152 | 1 | } |
153 | 13 | } |
154 | | |
155 | 5 | if (has_insert && !_insert_random && _insert_partition_function == nullptr) { |
156 | 1 | return Status::InternalError("Merge partitioning insert exprs are empty"); |
157 | 1 | } |
158 | 4 | if (has_delete && _delete_partition_function == nullptr) { |
159 | 1 | return Status::InternalError("Merge partitioning delete exprs are empty"); |
160 | 1 | } |
161 | | |
162 | 3 | std::vector<uint32_t> insert_hashes; |
163 | 3 | std::vector<uint32_t> delete_hashes; |
164 | 3 | const size_t insert_partition_count = |
165 | 3 | _enable_insert_rebalance ? _insert_partition_count : _partition_count; |
166 | 3 | if (has_insert && !_insert_random) { |
167 | 2 | RETURN_IF_ERROR(_insert_partition_function->get_partitions( |
168 | 2 | state, block, insert_partition_count, insert_hashes)); |
169 | 2 | } |
170 | 3 | if (has_delete) { |
171 | 1 | RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, block, _partition_count, |
172 | 1 | delete_hashes)); |
173 | 1 | } |
174 | 3 | if (has_insert) { |
175 | 3 | if (_insert_random) { |
176 | 1 | if (_non_partition_scaling_threshold > 0) { |
177 | 0 | _insert_data_processed += static_cast<int64_t>(block->bytes()); |
178 | 0 | if (_insert_writer_count < static_cast<int>(_partition_count) && |
179 | 0 | _insert_data_processed >= |
180 | 0 | _insert_writer_count * _non_partition_scaling_threshold) { |
181 | 0 | _insert_writer_count++; |
182 | 0 | } |
183 | 1 | } else { |
184 | 1 | _insert_writer_count = static_cast<int>(_partition_count); |
185 | 1 | } |
186 | 2 | } else if (_enable_insert_rebalance) { |
187 | 1 | _apply_insert_rebalance(ops, insert_hashes, block->bytes()); |
188 | 1 | } |
189 | 3 | } |
190 | | |
191 | 3 | Block::erase_useless_column(block, column_to_keep); |
192 | | |
193 | 3 | _channel_ids.resize(rows); |
194 | 14 | for (size_t i = 0; i < rows; ++i) { |
195 | 11 | const int8_t op = ops[i]; |
196 | 11 | if (op == kUpdateOperation) { |
197 | 1 | _channel_ids[i] = delete_hashes[i]; |
198 | 1 | continue; |
199 | 1 | } |
200 | 10 | if (is_insert_op(op)) { |
201 | 8 | _channel_ids[i] = _insert_random ? _next_rr_channel() : insert_hashes[i]; |
202 | 8 | } else if (is_delete_op(op)) { |
203 | 2 | _channel_ids[i] = delete_hashes[i]; |
204 | 2 | } else { |
205 | 0 | return Status::InternalError("Unknown Iceberg merge operation {}", op); |
206 | 0 | } |
207 | 10 | } |
208 | | |
209 | 3 | if (has_update) { |
210 | 5 | for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) { |
211 | 4 | block->replace_by_position_if_const(col_idx); |
212 | 4 | } |
213 | | |
214 | 1 | MutableColumns mutable_columns = block->mutate_columns(); |
215 | 1 | MutableColumnPtr& op_mut = mutable_columns[op_idx]; |
216 | 1 | ColumnInt8* op_values_col = nullptr; |
217 | 1 | if (auto* nullable_col = check_and_get_column<ColumnNullable>(op_mut.get())) { |
218 | 0 | op_values_col = |
219 | 0 | check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get()); |
220 | 1 | } else { |
221 | 1 | op_values_col = check_and_get_column<ColumnInt8>(op_mut.get()); |
222 | 1 | } |
223 | 1 | if (op_values_col == nullptr) { |
224 | 0 | block->set_columns(std::move(mutable_columns)); |
225 | 0 | return Status::InternalError("Merge operation column must be tinyint"); |
226 | 0 | } |
227 | 1 | auto& op_values = op_values_col->get_data(); |
228 | | // First pass: collect update row indices and mark original rows as DELETE. |
229 | 1 | std::vector<size_t> update_rows; |
230 | 6 | for (size_t row = 0; row < rows; ++row) { |
231 | 5 | if (ops[row] != kUpdateOperation) { |
232 | 4 | continue; |
233 | 4 | } |
234 | 1 | op_values[row] = kUpdateDeleteOperation; |
235 | 1 | update_rows.push_back(row); |
236 | 1 | } |
237 | | // Second pass: extract only the update rows into a temporary column, |
238 | | // then batch-append from it. This avoids cloning the entire column. |
239 | 5 | for (size_t col_idx = 0; col_idx < mutable_columns.size(); ++col_idx) { |
240 | 4 | auto tmp = mutable_columns[col_idx]->clone_empty(); |
241 | 4 | for (size_t row : update_rows) { |
242 | 4 | tmp->insert_from(*mutable_columns[col_idx], row); |
243 | 4 | } |
244 | 4 | mutable_columns[col_idx]->insert_range_from(*tmp, 0, tmp->size()); |
245 | 4 | } |
246 | | // Mark the newly appended rows as INSERT and assign their channels. |
247 | 1 | DCHECK(_insert_random || !insert_hashes.empty()); |
248 | 1 | const size_t appended_update_begin = rows; |
249 | 2 | for (size_t idx = 0; idx < update_rows.size(); ++idx) { |
250 | 1 | const size_t row = update_rows[idx]; |
251 | 1 | op_values[appended_update_begin + idx] = kUpdateInsertOperation; |
252 | 1 | const uint32_t insert_channel = |
253 | 1 | _insert_random ? _next_rr_channel() : insert_hashes[row]; |
254 | 1 | _channel_ids.push_back(insert_channel); |
255 | 1 | } |
256 | 1 | block->set_columns(std::move(mutable_columns)); |
257 | 1 | } |
258 | | |
259 | 3 | return Status::OK(); |
260 | 3 | } |
261 | | |
262 | 0 | Status MergePartitioner::clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) { |
263 | 0 | auto* new_partitioner = |
264 | 0 | new MergePartitioner(_partition_count, _merge_info, _use_new_shuffle_hash_method); |
265 | 0 | partitioner.reset(new_partitioner); |
266 | 0 | RETURN_IF_ERROR( |
267 | 0 | _clone_expr_ctxs(state, _operation_expr_ctxs, new_partitioner->_operation_expr_ctxs)); |
268 | 0 | if (_insert_partition_function != nullptr) { |
269 | 0 | RETURN_IF_ERROR(_insert_partition_function->clone( |
270 | 0 | state, new_partitioner->_insert_partition_function)); |
271 | 0 | } |
272 | 0 | if (_delete_partition_function != nullptr) { |
273 | 0 | RETURN_IF_ERROR(_delete_partition_function->clone( |
274 | 0 | state, new_partitioner->_delete_partition_function)); |
275 | 0 | } |
276 | 0 | new_partitioner->_insert_random = _insert_random; |
277 | 0 | new_partitioner->_rr_offset = _rr_offset; |
278 | 0 | return Status::OK(); |
279 | 0 | } |
280 | | |
281 | | void MergePartitioner::_apply_insert_rebalance(const std::vector<int8_t>& ops, |
282 | | std::vector<uint32_t>& insert_hashes, |
283 | 1 | size_t block_bytes) const { |
284 | 1 | if (!_enable_insert_rebalance || _insert_writer_assigner == nullptr) { |
285 | 0 | return; |
286 | 0 | } |
287 | 1 | if (insert_hashes.empty() || _insert_partition_count == 0) { |
288 | 0 | return; |
289 | 0 | } |
290 | 1 | std::vector<uint8_t> mask(ops.size(), 0); |
291 | 6 | for (size_t i = 0; i < ops.size(); ++i) { |
292 | 5 | if (is_insert_op(ops[i])) { |
293 | 3 | mask[i] = 1; |
294 | 3 | } |
295 | 5 | } |
296 | 1 | _insert_writer_assigner->assign(insert_hashes, &mask, ops.size(), block_bytes, insert_hashes); |
297 | 1 | } |
298 | | |
299 | 5 | void MergePartitioner::_init_insert_scaling(RuntimeState* state) { |
300 | 5 | _enable_insert_rebalance = false; |
301 | 5 | _insert_partition_count = 0; |
302 | 5 | _insert_data_processed = 0; |
303 | 5 | _insert_writer_count = 1; |
304 | 5 | _insert_writer_assigner.reset(); |
305 | 5 | _non_partition_scaling_threshold = |
306 | 5 | config::table_sink_non_partition_write_scaling_data_processed_threshold; |
307 | | |
308 | 5 | if (_partition_count == 0) { |
309 | 0 | return; |
310 | 0 | } |
311 | 5 | if (_insert_random) { |
312 | 2 | return; |
313 | 2 | } |
314 | 3 | if (_insert_partition_function == nullptr) { |
315 | 1 | return; |
316 | 1 | } |
317 | | |
318 | 2 | int max_partitions_per_writer = |
319 | 2 | config::table_sink_partition_write_max_partition_nums_per_writer; |
320 | 2 | if (max_partitions_per_writer <= 0) { |
321 | 1 | return; |
322 | 1 | } |
323 | 1 | _insert_partition_count = _partition_count * max_partitions_per_writer; |
324 | 1 | if (_insert_partition_count == 0) { |
325 | 0 | return; |
326 | 0 | } |
327 | | |
328 | 1 | int task_num = state == nullptr ? 0 : state->task_num(); |
329 | 1 | int64_t min_partition_threshold = scale_threshold_by_task( |
330 | 1 | config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold, |
331 | 1 | task_num); |
332 | 1 | int64_t min_data_threshold = scale_threshold_by_task( |
333 | 1 | config::table_sink_partition_write_min_data_processed_rebalance_threshold, task_num); |
334 | | |
335 | 1 | _insert_writer_assigner = std::make_unique<SkewedWriterAssigner>( |
336 | 1 | static_cast<int>(_insert_partition_count), static_cast<int>(_partition_count), 1, |
337 | 1 | min_partition_threshold, min_data_threshold); |
338 | 1 | _enable_insert_rebalance = true; |
339 | 1 | } |
340 | | |
341 | 4 | uint32_t MergePartitioner::_next_rr_channel() const { |
342 | 4 | uint32_t writer_count = static_cast<uint32_t>(_partition_count); |
343 | 4 | if (_insert_random && _insert_writer_count > 0) { |
344 | 4 | writer_count = std::min<uint32_t>(static_cast<uint32_t>(_partition_count), |
345 | 4 | static_cast<uint32_t>(_insert_writer_count)); |
346 | 4 | } |
347 | 4 | if (writer_count == 0) { |
348 | 0 | return 0; |
349 | 0 | } |
350 | 4 | const uint32_t channel = _rr_offset % writer_count; |
351 | 4 | _rr_offset = (_rr_offset + 1) % writer_count; |
352 | 4 | return channel; |
353 | 4 | } |
354 | | |
355 | | Status MergePartitioner::_clone_expr_ctxs(RuntimeState* state, const VExprContextSPtrs& src, |
356 | 0 | VExprContextSPtrs& dst) const { |
357 | 0 | dst.resize(src.size()); |
358 | 0 | for (size_t i = 0; i < src.size(); ++i) { |
359 | 0 | RETURN_IF_ERROR(src[i]->clone(state, dst[i])); |
360 | 0 | } |
361 | 0 | return Status::OK(); |
362 | 0 | } |
363 | | |
364 | | #include "common/compile_check_end.h" |
365 | | } // namespace doris |