Coverage Report

Created: 2026-04-15 18:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/transformer/merge_partitioner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/transformer/merge_partitioner.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
23
#include "common/cast_set.h"
24
#include "common/config.h"
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/column/column_const.h"
29
#include "core/column/column_nullable.h"
30
#include "core/column/column_vector.h"
31
#include "exec/sink/sink_common.h"
32
#include "format/transformer/iceberg_partition_function.h"
33
34
namespace doris {
35
36
namespace {
37
4
int64_t scale_threshold_by_task(int64_t value, int task_num) {
38
4
    if (task_num <= 0) {
39
4
        return value;
40
4
    }
41
0
    int64_t scaled = value / task_num;
42
0
    return scaled == 0 ? value : scaled;
43
4
}
44
} // namespace
45
46
MergePartitioner::MergePartitioner(size_t partition_count, const TMergePartitionInfo& merge_info,
47
                                   bool use_new_shuffle_hash_method)
48
10
        : PartitionerBase(static_cast<HashValType>(partition_count)),
49
10
          _merge_info(merge_info),
50
10
          _use_new_shuffle_hash_method(use_new_shuffle_hash_method),
51
10
          _insert_random(merge_info.insert_random) {}
52
53
10
Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) {
54
10
    VExprContextSPtr op_ctx;
55
10
    RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, op_ctx));
56
10
    _operation_expr_ctxs.emplace_back(std::move(op_ctx));
57
58
10
    std::vector<TExpr> insert_exprs;
59
10
    std::vector<TIcebergPartitionField> insert_fields;
60
10
    if (_merge_info.__isset.insert_partition_exprs) {
61
2
        insert_exprs = _merge_info.insert_partition_exprs;
62
2
    }
63
10
    if (_merge_info.__isset.insert_partition_fields) {
64
4
        insert_fields = _merge_info.insert_partition_fields;
65
4
    }
66
10
    if (!insert_exprs.empty() || !insert_fields.empty()) {
67
6
        _insert_partition_function = std::make_unique<IcebergInsertPartitionFunction>(
68
6
                _partition_count, _hash_method(), std::move(insert_exprs),
69
6
                std::move(insert_fields));
70
6
        RETURN_IF_ERROR(_insert_partition_function->init({}));
71
6
    }
72
73
10
    if (_merge_info.__isset.delete_partition_exprs && !_merge_info.delete_partition_exprs.empty()) {
74
2
        _delete_partition_function = std::make_unique<IcebergDeletePartitionFunction>(
75
2
                _partition_count, _hash_method(), _merge_info.delete_partition_exprs);
76
2
        RETURN_IF_ERROR(_delete_partition_function->init({}));
77
2
    }
78
10
    return Status::OK();
79
10
}
80
81
10
Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) {
82
10
    RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc));
83
10
    if (_insert_partition_function != nullptr) {
84
6
        RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc));
85
6
    }
86
10
    if (_delete_partition_function != nullptr) {
87
2
        RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc));
88
2
    }
89
10
    return Status::OK();
90
10
}
91
92
10
Status MergePartitioner::open(RuntimeState* state) {
93
10
    RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state));
94
10
    if (_insert_partition_function != nullptr) {
95
6
        RETURN_IF_ERROR(_insert_partition_function->open(state));
96
6
        if (auto* insert_function =
97
6
                    dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get());
98
6
            insert_function != nullptr && insert_function->fallback_to_random()) {
99
2
            _insert_random = true;
100
2
        }
101
6
    }
102
10
    if (_delete_partition_function != nullptr) {
103
2
        RETURN_IF_ERROR(_delete_partition_function->open(state));
104
2
    }
105
10
    _init_insert_scaling(state);
106
10
    return Status::OK();
107
10
}
108
109
10
Status MergePartitioner::close(RuntimeState* /*state*/) {
110
10
    return Status::OK();
111
10
}
112
113
10
Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) const {
114
10
    const size_t rows = block->rows();
115
10
    if (rows == 0) {
116
0
        _channel_ids.clear();
117
0
        return Status::OK();
118
0
    }
119
120
10
    const size_t column_to_keep = block->columns();
121
10
    if (_operation_expr_ctxs.empty()) {
122
0
        return Status::InternalError("Merge partitioning missing operation expression");
123
0
    }
124
125
10
    int op_idx = -1;
126
10
    RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx));
127
10
    if (op_idx < 0 || op_idx >= block->columns()) {
128
0
        return Status::InternalError("Merge partitioning missing operation column");
129
0
    }
130
10
    if (op_idx >= cast_set<int>(column_to_keep)) {
131
0
        return Status::InternalError("Merge partitioning requires operation column in input block");
132
0
    }
133
134
10
    const auto& op_column = block->get_by_position(op_idx).column;
135
10
    const auto* op_data = remove_nullable(op_column).get();
136
10
    std::vector<int8_t> ops(rows);
137
10
    bool has_insert = false;
138
10
    bool has_delete = false;
139
10
    bool has_update = false;
140
36
    for (size_t i = 0; i < rows; ++i) {
141
26
        int8_t op = static_cast<int8_t>(op_data->get_int(i));
142
26
        ops[i] = op;
143
26
        if (is_insert_op(op)) {
144
20
            has_insert = true;
145
20
        }
146
26
        if (is_delete_op(op)) {
147
8
            has_delete = true;
148
8
        }
149
26
        if (op == kUpdateOperation) {
150
2
            has_update = true;
151
2
        }
152
26
    }
153
154
10
    if (has_insert && !_insert_random && _insert_partition_function == nullptr) {
155
2
        return Status::InternalError("Merge partitioning insert exprs are empty");
156
2
    }
157
8
    if (has_delete && _delete_partition_function == nullptr) {
158
2
        return Status::InternalError("Merge partitioning delete exprs are empty");
159
2
    }
160
161
6
    std::vector<uint32_t> insert_hashes;
162
6
    std::vector<uint32_t> delete_hashes;
163
6
    const size_t insert_partition_count =
164
6
            _enable_insert_rebalance ? _insert_partition_count : _partition_count;
165
6
    if (has_insert && !_insert_random) {
166
4
        RETURN_IF_ERROR(_insert_partition_function->get_partitions(
167
4
                state, block, insert_partition_count, insert_hashes));
168
4
    }
169
6
    if (has_delete) {
170
2
        RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, block, _partition_count,
171
2
                                                                   delete_hashes));
172
2
    }
173
6
    if (has_insert) {
174
6
        if (_insert_random) {
175
2
            if (_non_partition_scaling_threshold > 0) {
176
0
                _insert_data_processed += static_cast<int64_t>(block->bytes());
177
0
                if (_insert_writer_count < static_cast<int>(_partition_count) &&
178
0
                    _insert_data_processed >=
179
0
                            _insert_writer_count * _non_partition_scaling_threshold) {
180
0
                    _insert_writer_count++;
181
0
                }
182
2
            } else {
183
2
                _insert_writer_count = static_cast<int>(_partition_count);
184
2
            }
185
4
        } else if (_enable_insert_rebalance) {
186
2
            _apply_insert_rebalance(ops, insert_hashes, block->bytes());
187
2
        }
188
6
    }
189
190
6
    Block::erase_useless_column(block, column_to_keep);
191
192
6
    _channel_ids.resize(rows);
193
28
    for (size_t i = 0; i < rows; ++i) {
194
22
        const int8_t op = ops[i];
195
22
        if (op == kUpdateOperation) {
196
2
            _channel_ids[i] = delete_hashes[i];
197
2
            continue;
198
2
        }
199
20
        if (is_insert_op(op)) {
200
16
            _channel_ids[i] = _insert_random ? _next_rr_channel() : insert_hashes[i];
201
16
        } else if (is_delete_op(op)) {
202
4
            _channel_ids[i] = delete_hashes[i];
203
4
        } else {
204
0
            return Status::InternalError("Unknown Iceberg merge operation {}", op);
205
0
        }
206
20
    }
207
208
6
    if (has_update) {
209
10
        for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) {
210
8
            block->replace_by_position_if_const(col_idx);
211
8
        }
212
213
2
        MutableColumns mutable_columns = block->mutate_columns();
214
2
        MutableColumnPtr& op_mut = mutable_columns[op_idx];
215
2
        ColumnInt8* op_values_col = nullptr;
216
2
        if (auto* nullable_col = check_and_get_column<ColumnNullable>(op_mut.get())) {
217
0
            op_values_col =
218
0
                    check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get());
219
2
        } else {
220
2
            op_values_col = check_and_get_column<ColumnInt8>(op_mut.get());
221
2
        }
222
2
        if (op_values_col == nullptr) {
223
0
            block->set_columns(std::move(mutable_columns));
224
0
            return Status::InternalError("Merge operation column must be tinyint");
225
0
        }
226
2
        auto& op_values = op_values_col->get_data();
227
        // First pass: collect update row indices and mark original rows as DELETE.
228
2
        std::vector<size_t> update_rows;
229
12
        for (size_t row = 0; row < rows; ++row) {
230
10
            if (ops[row] != kUpdateOperation) {
231
8
                continue;
232
8
            }
233
2
            op_values[row] = kUpdateDeleteOperation;
234
2
            update_rows.push_back(row);
235
2
        }
236
        // Second pass: extract only the update rows into a temporary column,
237
        // then batch-append from it. This avoids cloning the entire column.
238
10
        for (size_t col_idx = 0; col_idx < mutable_columns.size(); ++col_idx) {
239
8
            auto tmp = mutable_columns[col_idx]->clone_empty();
240
8
            for (size_t row : update_rows) {
241
8
                tmp->insert_from(*mutable_columns[col_idx], row);
242
8
            }
243
8
            mutable_columns[col_idx]->insert_range_from(*tmp, 0, tmp->size());
244
8
        }
245
        // Mark the newly appended rows as INSERT and assign their channels.
246
2
        DCHECK(_insert_random || !insert_hashes.empty());
247
2
        const size_t appended_update_begin = rows;
248
4
        for (size_t idx = 0; idx < update_rows.size(); ++idx) {
249
2
            const size_t row = update_rows[idx];
250
2
            op_values[appended_update_begin + idx] = kUpdateInsertOperation;
251
2
            const uint32_t insert_channel =
252
2
                    _insert_random ? _next_rr_channel() : insert_hashes[row];
253
2
            _channel_ids.push_back(insert_channel);
254
2
        }
255
2
        block->set_columns(std::move(mutable_columns));
256
2
    }
257
258
6
    return Status::OK();
259
6
}
260
261
0
Status MergePartitioner::clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) {
262
0
    auto* new_partitioner =
263
0
            new MergePartitioner(_partition_count, _merge_info, _use_new_shuffle_hash_method);
264
0
    partitioner.reset(new_partitioner);
265
0
    RETURN_IF_ERROR(
266
0
            _clone_expr_ctxs(state, _operation_expr_ctxs, new_partitioner->_operation_expr_ctxs));
267
0
    if (_insert_partition_function != nullptr) {
268
0
        RETURN_IF_ERROR(_insert_partition_function->clone(
269
0
                state, new_partitioner->_insert_partition_function));
270
0
    }
271
0
    if (_delete_partition_function != nullptr) {
272
0
        RETURN_IF_ERROR(_delete_partition_function->clone(
273
0
                state, new_partitioner->_delete_partition_function));
274
0
    }
275
0
    new_partitioner->_insert_random = _insert_random;
276
0
    new_partitioner->_rr_offset = _rr_offset;
277
0
    return Status::OK();
278
0
}
279
280
void MergePartitioner::_apply_insert_rebalance(const std::vector<int8_t>& ops,
281
                                               std::vector<uint32_t>& insert_hashes,
282
2
                                               size_t block_bytes) const {
283
2
    if (!_enable_insert_rebalance || _insert_writer_assigner == nullptr) {
284
0
        return;
285
0
    }
286
2
    if (insert_hashes.empty() || _insert_partition_count == 0) {
287
0
        return;
288
0
    }
289
2
    std::vector<uint8_t> mask(ops.size(), 0);
290
12
    for (size_t i = 0; i < ops.size(); ++i) {
291
10
        if (is_insert_op(ops[i])) {
292
6
            mask[i] = 1;
293
6
        }
294
10
    }
295
2
    _insert_writer_assigner->assign(insert_hashes, &mask, ops.size(), block_bytes, insert_hashes);
296
2
}
297
298
10
void MergePartitioner::_init_insert_scaling(RuntimeState* state) {
299
10
    _enable_insert_rebalance = false;
300
10
    _insert_partition_count = 0;
301
10
    _insert_data_processed = 0;
302
10
    _insert_writer_count = 1;
303
10
    _insert_writer_assigner.reset();
304
10
    _non_partition_scaling_threshold =
305
10
            config::table_sink_non_partition_write_scaling_data_processed_threshold;
306
307
10
    if (_partition_count == 0) {
308
0
        return;
309
0
    }
310
10
    if (_insert_random) {
311
4
        return;
312
4
    }
313
6
    if (_insert_partition_function == nullptr) {
314
2
        return;
315
2
    }
316
317
4
    int max_partitions_per_writer =
318
4
            config::table_sink_partition_write_max_partition_nums_per_writer;
319
4
    if (max_partitions_per_writer <= 0) {
320
2
        return;
321
2
    }
322
2
    _insert_partition_count = _partition_count * max_partitions_per_writer;
323
2
    if (_insert_partition_count == 0) {
324
0
        return;
325
0
    }
326
327
2
    int task_num = state == nullptr ? 0 : state->task_num();
328
2
    int64_t min_partition_threshold = scale_threshold_by_task(
329
2
            config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
330
2
            task_num);
331
2
    int64_t min_data_threshold = scale_threshold_by_task(
332
2
            config::table_sink_partition_write_min_data_processed_rebalance_threshold, task_num);
333
334
2
    _insert_writer_assigner = std::make_unique<SkewedWriterAssigner>(
335
2
            static_cast<int>(_insert_partition_count), static_cast<int>(_partition_count), 1,
336
2
            min_partition_threshold, min_data_threshold);
337
2
    _enable_insert_rebalance = true;
338
2
}
339
340
8
uint32_t MergePartitioner::_next_rr_channel() const {
341
8
    uint32_t writer_count = static_cast<uint32_t>(_partition_count);
342
8
    if (_insert_random && _insert_writer_count > 0) {
343
8
        writer_count = std::min<uint32_t>(static_cast<uint32_t>(_partition_count),
344
8
                                          static_cast<uint32_t>(_insert_writer_count));
345
8
    }
346
8
    if (writer_count == 0) {
347
0
        return 0;
348
0
    }
349
8
    const uint32_t channel = _rr_offset % writer_count;
350
8
    _rr_offset = (_rr_offset + 1) % writer_count;
351
8
    return channel;
352
8
}
353
354
Status MergePartitioner::_clone_expr_ctxs(RuntimeState* state, const VExprContextSPtrs& src,
355
0
                                          VExprContextSPtrs& dst) const {
356
0
    dst.resize(src.size());
357
0
    for (size_t i = 0; i < src.size(); ++i) {
358
0
        RETURN_IF_ERROR(src[i]->clone(state, dst[i]));
359
0
    }
360
0
    return Status::OK();
361
0
}
362
363
} // namespace doris