Coverage Report

Created: 2026-04-14 20:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/transformer/merge_partitioner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/transformer/merge_partitioner.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
23
#include "common/cast_set.h"
24
#include "common/config.h"
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/column/column_const.h"
29
#include "core/column/column_nullable.h"
30
#include "core/column/column_vector.h"
31
#include "exec/sink/sink_common.h"
32
#include "format/transformer/iceberg_partition_function.h"
33
34
namespace doris {
35
36
namespace {
37
2
int64_t scale_threshold_by_task(int64_t value, int task_num) {
38
2
    if (task_num <= 0) {
39
2
        return value;
40
2
    }
41
0
    int64_t scaled = value / task_num;
42
0
    return scaled == 0 ? value : scaled;
43
2
}
44
} // namespace
45
46
MergePartitioner::MergePartitioner(size_t partition_count, const TMergePartitionInfo& merge_info,
47
                                   bool use_new_shuffle_hash_method)
48
5
        : PartitionerBase(static_cast<HashValType>(partition_count)),
49
5
          _merge_info(merge_info),
50
5
          _use_new_shuffle_hash_method(use_new_shuffle_hash_method),
51
5
          _insert_random(merge_info.insert_random) {}
52
53
5
Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) {
54
5
    VExprContextSPtr op_ctx;
55
5
    RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, op_ctx));
56
5
    _operation_expr_ctxs.emplace_back(std::move(op_ctx));
57
58
5
    std::vector<TExpr> insert_exprs;
59
5
    std::vector<TIcebergPartitionField> insert_fields;
60
5
    if (_merge_info.__isset.insert_partition_exprs) {
61
1
        insert_exprs = _merge_info.insert_partition_exprs;
62
1
    }
63
5
    if (_merge_info.__isset.insert_partition_fields) {
64
2
        insert_fields = _merge_info.insert_partition_fields;
65
2
    }
66
5
    if (!insert_exprs.empty() || !insert_fields.empty()) {
67
3
        _insert_partition_function = std::make_unique<IcebergInsertPartitionFunction>(
68
3
                _partition_count, _hash_method(), std::move(insert_exprs),
69
3
                std::move(insert_fields));
70
3
        RETURN_IF_ERROR(_insert_partition_function->init({}));
71
3
    }
72
73
5
    if (_merge_info.__isset.delete_partition_exprs && !_merge_info.delete_partition_exprs.empty()) {
74
1
        _delete_partition_function = std::make_unique<IcebergDeletePartitionFunction>(
75
1
                _partition_count, _hash_method(), _merge_info.delete_partition_exprs);
76
1
        RETURN_IF_ERROR(_delete_partition_function->init({}));
77
1
    }
78
5
    return Status::OK();
79
5
}
80
81
5
Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) {
82
5
    RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc));
83
5
    if (_insert_partition_function != nullptr) {
84
3
        RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc));
85
3
    }
86
5
    if (_delete_partition_function != nullptr) {
87
1
        RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc));
88
1
    }
89
5
    return Status::OK();
90
5
}
91
92
5
Status MergePartitioner::open(RuntimeState* state) {
93
5
    RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state));
94
5
    if (_insert_partition_function != nullptr) {
95
3
        RETURN_IF_ERROR(_insert_partition_function->open(state));
96
3
        if (auto* insert_function =
97
3
                    dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get());
98
3
            insert_function != nullptr && insert_function->fallback_to_random()) {
99
1
            _insert_random = true;
100
1
        }
101
3
    }
102
5
    if (_delete_partition_function != nullptr) {
103
1
        RETURN_IF_ERROR(_delete_partition_function->open(state));
104
1
    }
105
5
    _init_insert_scaling(state);
106
5
    return Status::OK();
107
5
}
108
109
5
Status MergePartitioner::close(RuntimeState* /*state*/) {
110
5
    return Status::OK();
111
5
}
112
113
5
Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) const {
114
5
    const size_t rows = block->rows();
115
5
    if (rows == 0) {
116
0
        _channel_ids.clear();
117
0
        return Status::OK();
118
0
    }
119
120
5
    const size_t column_to_keep = block->columns();
121
5
    if (_operation_expr_ctxs.empty()) {
122
0
        return Status::InternalError("Merge partitioning missing operation expression");
123
0
    }
124
125
5
    int op_idx = -1;
126
5
    RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx));
127
5
    if (op_idx < 0 || op_idx >= block->columns()) {
128
0
        return Status::InternalError("Merge partitioning missing operation column");
129
0
    }
130
5
    if (op_idx >= cast_set<int>(column_to_keep)) {
131
0
        return Status::InternalError("Merge partitioning requires operation column in input block");
132
0
    }
133
134
5
    const auto& op_column = block->get_by_position(op_idx).column;
135
5
    const auto* op_data = remove_nullable(op_column).get();
136
5
    std::vector<int8_t> ops(rows);
137
5
    bool has_insert = false;
138
5
    bool has_delete = false;
139
5
    bool has_update = false;
140
18
    for (size_t i = 0; i < rows; ++i) {
141
13
        int8_t op = static_cast<int8_t>(op_data->get_int(i));
142
13
        ops[i] = op;
143
13
        if (is_insert_op(op)) {
144
10
            has_insert = true;
145
10
        }
146
13
        if (is_delete_op(op)) {
147
4
            has_delete = true;
148
4
        }
149
13
        if (op == kUpdateOperation) {
150
1
            has_update = true;
151
1
        }
152
13
    }
153
154
5
    if (has_insert && !_insert_random && _insert_partition_function == nullptr) {
155
1
        return Status::InternalError("Merge partitioning insert exprs are empty");
156
1
    }
157
4
    if (has_delete && _delete_partition_function == nullptr) {
158
1
        return Status::InternalError("Merge partitioning delete exprs are empty");
159
1
    }
160
161
3
    std::vector<uint32_t> insert_hashes;
162
3
    std::vector<uint32_t> delete_hashes;
163
3
    const size_t insert_partition_count =
164
3
            _enable_insert_rebalance ? _insert_partition_count : _partition_count;
165
3
    if (has_insert && !_insert_random) {
166
2
        RETURN_IF_ERROR(_insert_partition_function->get_partitions(
167
2
                state, block, insert_partition_count, insert_hashes));
168
2
    }
169
3
    if (has_delete) {
170
1
        RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, block, _partition_count,
171
1
                                                                   delete_hashes));
172
1
    }
173
3
    if (has_insert) {
174
3
        if (_insert_random) {
175
1
            if (_non_partition_scaling_threshold > 0) {
176
0
                _insert_data_processed += static_cast<int64_t>(block->bytes());
177
0
                if (_insert_writer_count < static_cast<int>(_partition_count) &&
178
0
                    _insert_data_processed >=
179
0
                            _insert_writer_count * _non_partition_scaling_threshold) {
180
0
                    _insert_writer_count++;
181
0
                }
182
1
            } else {
183
1
                _insert_writer_count = static_cast<int>(_partition_count);
184
1
            }
185
2
        } else if (_enable_insert_rebalance) {
186
1
            _apply_insert_rebalance(ops, insert_hashes, block->bytes());
187
1
        }
188
3
    }
189
190
3
    Block::erase_useless_column(block, column_to_keep);
191
192
3
    _channel_ids.resize(rows);
193
14
    for (size_t i = 0; i < rows; ++i) {
194
11
        const int8_t op = ops[i];
195
11
        if (op == kUpdateOperation) {
196
1
            _channel_ids[i] = delete_hashes[i];
197
1
            continue;
198
1
        }
199
10
        if (is_insert_op(op)) {
200
8
            _channel_ids[i] = _insert_random ? _next_rr_channel() : insert_hashes[i];
201
8
        } else if (is_delete_op(op)) {
202
2
            _channel_ids[i] = delete_hashes[i];
203
2
        } else {
204
0
            return Status::InternalError("Unknown Iceberg merge operation {}", op);
205
0
        }
206
10
    }
207
208
3
    if (has_update) {
209
5
        for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) {
210
4
            block->replace_by_position_if_const(col_idx);
211
4
        }
212
213
1
        MutableColumns mutable_columns = block->mutate_columns();
214
1
        MutableColumnPtr& op_mut = mutable_columns[op_idx];
215
1
        ColumnInt8* op_values_col = nullptr;
216
1
        if (auto* nullable_col = check_and_get_column<ColumnNullable>(op_mut.get())) {
217
0
            op_values_col =
218
0
                    check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get());
219
1
        } else {
220
1
            op_values_col = check_and_get_column<ColumnInt8>(op_mut.get());
221
1
        }
222
1
        if (op_values_col == nullptr) {
223
0
            block->set_columns(std::move(mutable_columns));
224
0
            return Status::InternalError("Merge operation column must be tinyint");
225
0
        }
226
1
        auto& op_values = op_values_col->get_data();
227
        // First pass: collect update row indices and mark original rows as DELETE.
228
1
        std::vector<size_t> update_rows;
229
6
        for (size_t row = 0; row < rows; ++row) {
230
5
            if (ops[row] != kUpdateOperation) {
231
4
                continue;
232
4
            }
233
1
            op_values[row] = kUpdateDeleteOperation;
234
1
            update_rows.push_back(row);
235
1
        }
236
        // Second pass: extract only the update rows into a temporary column,
237
        // then batch-append from it. This avoids cloning the entire column.
238
5
        for (size_t col_idx = 0; col_idx < mutable_columns.size(); ++col_idx) {
239
4
            auto tmp = mutable_columns[col_idx]->clone_empty();
240
4
            for (size_t row : update_rows) {
241
4
                tmp->insert_from(*mutable_columns[col_idx], row);
242
4
            }
243
4
            mutable_columns[col_idx]->insert_range_from(*tmp, 0, tmp->size());
244
4
        }
245
        // Mark the newly appended rows as INSERT and assign their channels.
246
1
        DCHECK(_insert_random || !insert_hashes.empty());
247
1
        const size_t appended_update_begin = rows;
248
2
        for (size_t idx = 0; idx < update_rows.size(); ++idx) {
249
1
            const size_t row = update_rows[idx];
250
1
            op_values[appended_update_begin + idx] = kUpdateInsertOperation;
251
1
            const uint32_t insert_channel =
252
1
                    _insert_random ? _next_rr_channel() : insert_hashes[row];
253
1
            _channel_ids.push_back(insert_channel);
254
1
        }
255
1
        block->set_columns(std::move(mutable_columns));
256
1
    }
257
258
3
    return Status::OK();
259
3
}
260
261
0
Status MergePartitioner::clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) {
262
0
    auto* new_partitioner =
263
0
            new MergePartitioner(_partition_count, _merge_info, _use_new_shuffle_hash_method);
264
0
    partitioner.reset(new_partitioner);
265
0
    RETURN_IF_ERROR(
266
0
            _clone_expr_ctxs(state, _operation_expr_ctxs, new_partitioner->_operation_expr_ctxs));
267
0
    if (_insert_partition_function != nullptr) {
268
0
        RETURN_IF_ERROR(_insert_partition_function->clone(
269
0
                state, new_partitioner->_insert_partition_function));
270
0
    }
271
0
    if (_delete_partition_function != nullptr) {
272
0
        RETURN_IF_ERROR(_delete_partition_function->clone(
273
0
                state, new_partitioner->_delete_partition_function));
274
0
    }
275
0
    new_partitioner->_insert_random = _insert_random;
276
0
    new_partitioner->_rr_offset = _rr_offset;
277
0
    return Status::OK();
278
0
}
279
280
void MergePartitioner::_apply_insert_rebalance(const std::vector<int8_t>& ops,
281
                                               std::vector<uint32_t>& insert_hashes,
282
1
                                               size_t block_bytes) const {
283
1
    if (!_enable_insert_rebalance || _insert_writer_assigner == nullptr) {
284
0
        return;
285
0
    }
286
1
    if (insert_hashes.empty() || _insert_partition_count == 0) {
287
0
        return;
288
0
    }
289
1
    std::vector<uint8_t> mask(ops.size(), 0);
290
6
    for (size_t i = 0; i < ops.size(); ++i) {
291
5
        if (is_insert_op(ops[i])) {
292
3
            mask[i] = 1;
293
3
        }
294
5
    }
295
1
    _insert_writer_assigner->assign(insert_hashes, &mask, ops.size(), block_bytes, insert_hashes);
296
1
}
297
298
5
void MergePartitioner::_init_insert_scaling(RuntimeState* state) {
299
5
    _enable_insert_rebalance = false;
300
5
    _insert_partition_count = 0;
301
5
    _insert_data_processed = 0;
302
5
    _insert_writer_count = 1;
303
5
    _insert_writer_assigner.reset();
304
5
    _non_partition_scaling_threshold =
305
5
            config::table_sink_non_partition_write_scaling_data_processed_threshold;
306
307
5
    if (_partition_count == 0) {
308
0
        return;
309
0
    }
310
5
    if (_insert_random) {
311
2
        return;
312
2
    }
313
3
    if (_insert_partition_function == nullptr) {
314
1
        return;
315
1
    }
316
317
2
    int max_partitions_per_writer =
318
2
            config::table_sink_partition_write_max_partition_nums_per_writer;
319
2
    if (max_partitions_per_writer <= 0) {
320
1
        return;
321
1
    }
322
1
    _insert_partition_count = _partition_count * max_partitions_per_writer;
323
1
    if (_insert_partition_count == 0) {
324
0
        return;
325
0
    }
326
327
1
    int task_num = state == nullptr ? 0 : state->task_num();
328
1
    int64_t min_partition_threshold = scale_threshold_by_task(
329
1
            config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
330
1
            task_num);
331
1
    int64_t min_data_threshold = scale_threshold_by_task(
332
1
            config::table_sink_partition_write_min_data_processed_rebalance_threshold, task_num);
333
334
1
    _insert_writer_assigner = std::make_unique<SkewedWriterAssigner>(
335
1
            static_cast<int>(_insert_partition_count), static_cast<int>(_partition_count), 1,
336
1
            min_partition_threshold, min_data_threshold);
337
1
    _enable_insert_rebalance = true;
338
1
}
339
340
4
uint32_t MergePartitioner::_next_rr_channel() const {
341
4
    uint32_t writer_count = static_cast<uint32_t>(_partition_count);
342
4
    if (_insert_random && _insert_writer_count > 0) {
343
4
        writer_count = std::min<uint32_t>(static_cast<uint32_t>(_partition_count),
344
4
                                          static_cast<uint32_t>(_insert_writer_count));
345
4
    }
346
4
    if (writer_count == 0) {
347
0
        return 0;
348
0
    }
349
4
    const uint32_t channel = _rr_offset % writer_count;
350
4
    _rr_offset = (_rr_offset + 1) % writer_count;
351
4
    return channel;
352
4
}
353
354
Status MergePartitioner::_clone_expr_ctxs(RuntimeState* state, const VExprContextSPtrs& src,
355
0
                                          VExprContextSPtrs& dst) const {
356
0
    dst.resize(src.size());
357
0
    for (size_t i = 0; i < src.size(); ++i) {
358
0
        RETURN_IF_ERROR(src[i]->clone(state, dst[i]));
359
0
    }
360
0
    return Status::OK();
361
0
}
362
363
} // namespace doris