Coverage Report

Created: 2026-03-27 10:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/transformer/merge_partitioner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/transformer/merge_partitioner.h"
19
20
#include <algorithm>
21
#include <cstdint>
22
23
#include "common/cast_set.h"
24
#include "common/config.h"
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/column/column_const.h"
29
#include "core/column/column_nullable.h"
30
#include "core/column/column_vector.h"
31
#include "exec/sink/sink_common.h"
32
#include "format/transformer/iceberg_partition_function.h"
33
34
namespace doris {
35
#include "common/compile_check_begin.h"
36
37
namespace {
38
2
int64_t scale_threshold_by_task(int64_t value, int task_num) {
39
2
    if (task_num <= 0) {
40
2
        return value;
41
2
    }
42
0
    int64_t scaled = value / task_num;
43
0
    return scaled == 0 ? value : scaled;
44
2
}
45
} // namespace
46
47
MergePartitioner::MergePartitioner(size_t partition_count, const TMergePartitionInfo& merge_info,
48
                                   bool use_new_shuffle_hash_method)
49
5
        : PartitionerBase(static_cast<HashValType>(partition_count)),
50
5
          _merge_info(merge_info),
51
5
          _use_new_shuffle_hash_method(use_new_shuffle_hash_method),
52
5
          _insert_random(merge_info.insert_random) {}
53
54
5
Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) {
55
5
    VExprContextSPtr op_ctx;
56
5
    RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, op_ctx));
57
5
    _operation_expr_ctxs.emplace_back(std::move(op_ctx));
58
59
5
    std::vector<TExpr> insert_exprs;
60
5
    std::vector<TIcebergPartitionField> insert_fields;
61
5
    if (_merge_info.__isset.insert_partition_exprs) {
62
1
        insert_exprs = _merge_info.insert_partition_exprs;
63
1
    }
64
5
    if (_merge_info.__isset.insert_partition_fields) {
65
2
        insert_fields = _merge_info.insert_partition_fields;
66
2
    }
67
5
    if (!insert_exprs.empty() || !insert_fields.empty()) {
68
3
        _insert_partition_function = std::make_unique<IcebergInsertPartitionFunction>(
69
3
                _partition_count, _hash_method(), std::move(insert_exprs),
70
3
                std::move(insert_fields));
71
3
        RETURN_IF_ERROR(_insert_partition_function->init({}));
72
3
    }
73
74
5
    if (_merge_info.__isset.delete_partition_exprs && !_merge_info.delete_partition_exprs.empty()) {
75
1
        _delete_partition_function = std::make_unique<IcebergDeletePartitionFunction>(
76
1
                _partition_count, _hash_method(), _merge_info.delete_partition_exprs);
77
1
        RETURN_IF_ERROR(_delete_partition_function->init({}));
78
1
    }
79
5
    return Status::OK();
80
5
}
81
82
5
Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) {
83
5
    RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc));
84
5
    if (_insert_partition_function != nullptr) {
85
3
        RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc));
86
3
    }
87
5
    if (_delete_partition_function != nullptr) {
88
1
        RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc));
89
1
    }
90
5
    return Status::OK();
91
5
}
92
93
5
Status MergePartitioner::open(RuntimeState* state) {
94
5
    RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state));
95
5
    if (_insert_partition_function != nullptr) {
96
3
        RETURN_IF_ERROR(_insert_partition_function->open(state));
97
3
        if (auto* insert_function =
98
3
                    dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get());
99
3
            insert_function != nullptr && insert_function->fallback_to_random()) {
100
1
            _insert_random = true;
101
1
        }
102
3
    }
103
5
    if (_delete_partition_function != nullptr) {
104
1
        RETURN_IF_ERROR(_delete_partition_function->open(state));
105
1
    }
106
5
    _init_insert_scaling(state);
107
5
    return Status::OK();
108
5
}
109
110
5
Status MergePartitioner::close(RuntimeState* /*state*/) {
111
5
    return Status::OK();
112
5
}
113
114
5
Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) const {
115
5
    const size_t rows = block->rows();
116
5
    if (rows == 0) {
117
0
        _channel_ids.clear();
118
0
        return Status::OK();
119
0
    }
120
121
5
    const size_t column_to_keep = block->columns();
122
5
    if (_operation_expr_ctxs.empty()) {
123
0
        return Status::InternalError("Merge partitioning missing operation expression");
124
0
    }
125
126
5
    int op_idx = -1;
127
5
    RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx));
128
5
    if (op_idx < 0 || op_idx >= block->columns()) {
129
0
        return Status::InternalError("Merge partitioning missing operation column");
130
0
    }
131
5
    if (op_idx >= cast_set<int>(column_to_keep)) {
132
0
        return Status::InternalError("Merge partitioning requires operation column in input block");
133
0
    }
134
135
5
    const auto& op_column = block->get_by_position(op_idx).column;
136
5
    const auto* op_data = remove_nullable(op_column).get();
137
5
    std::vector<int8_t> ops(rows);
138
5
    bool has_insert = false;
139
5
    bool has_delete = false;
140
5
    bool has_update = false;
141
18
    for (size_t i = 0; i < rows; ++i) {
142
13
        int8_t op = static_cast<int8_t>(op_data->get_int(i));
143
13
        ops[i] = op;
144
13
        if (is_insert_op(op)) {
145
10
            has_insert = true;
146
10
        }
147
13
        if (is_delete_op(op)) {
148
4
            has_delete = true;
149
4
        }
150
13
        if (op == kUpdateOperation) {
151
1
            has_update = true;
152
1
        }
153
13
    }
154
155
5
    if (has_insert && !_insert_random && _insert_partition_function == nullptr) {
156
1
        return Status::InternalError("Merge partitioning insert exprs are empty");
157
1
    }
158
4
    if (has_delete && _delete_partition_function == nullptr) {
159
1
        return Status::InternalError("Merge partitioning delete exprs are empty");
160
1
    }
161
162
3
    std::vector<uint32_t> insert_hashes;
163
3
    std::vector<uint32_t> delete_hashes;
164
3
    const size_t insert_partition_count =
165
3
            _enable_insert_rebalance ? _insert_partition_count : _partition_count;
166
3
    if (has_insert && !_insert_random) {
167
2
        RETURN_IF_ERROR(_insert_partition_function->get_partitions(
168
2
                state, block, insert_partition_count, insert_hashes));
169
2
    }
170
3
    if (has_delete) {
171
1
        RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, block, _partition_count,
172
1
                                                                   delete_hashes));
173
1
    }
174
3
    if (has_insert) {
175
3
        if (_insert_random) {
176
1
            if (_non_partition_scaling_threshold > 0) {
177
0
                _insert_data_processed += static_cast<int64_t>(block->bytes());
178
0
                if (_insert_writer_count < static_cast<int>(_partition_count) &&
179
0
                    _insert_data_processed >=
180
0
                            _insert_writer_count * _non_partition_scaling_threshold) {
181
0
                    _insert_writer_count++;
182
0
                }
183
1
            } else {
184
1
                _insert_writer_count = static_cast<int>(_partition_count);
185
1
            }
186
2
        } else if (_enable_insert_rebalance) {
187
1
            _apply_insert_rebalance(ops, insert_hashes, block->bytes());
188
1
        }
189
3
    }
190
191
3
    Block::erase_useless_column(block, column_to_keep);
192
193
3
    _channel_ids.resize(rows);
194
14
    for (size_t i = 0; i < rows; ++i) {
195
11
        const int8_t op = ops[i];
196
11
        if (op == kUpdateOperation) {
197
1
            _channel_ids[i] = delete_hashes[i];
198
1
            continue;
199
1
        }
200
10
        if (is_insert_op(op)) {
201
8
            _channel_ids[i] = _insert_random ? _next_rr_channel() : insert_hashes[i];
202
8
        } else if (is_delete_op(op)) {
203
2
            _channel_ids[i] = delete_hashes[i];
204
2
        } else {
205
0
            return Status::InternalError("Unknown Iceberg merge operation {}", op);
206
0
        }
207
10
    }
208
209
3
    if (has_update) {
210
5
        for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) {
211
4
            block->replace_by_position_if_const(col_idx);
212
4
        }
213
214
1
        MutableColumns mutable_columns = block->mutate_columns();
215
1
        MutableColumnPtr& op_mut = mutable_columns[op_idx];
216
1
        ColumnInt8* op_values_col = nullptr;
217
1
        if (auto* nullable_col = check_and_get_column<ColumnNullable>(op_mut.get())) {
218
0
            op_values_col =
219
0
                    check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get());
220
1
        } else {
221
1
            op_values_col = check_and_get_column<ColumnInt8>(op_mut.get());
222
1
        }
223
1
        if (op_values_col == nullptr) {
224
0
            block->set_columns(std::move(mutable_columns));
225
0
            return Status::InternalError("Merge operation column must be tinyint");
226
0
        }
227
1
        auto& op_values = op_values_col->get_data();
228
        // First pass: collect update row indices and mark original rows as DELETE.
229
1
        std::vector<size_t> update_rows;
230
6
        for (size_t row = 0; row < rows; ++row) {
231
5
            if (ops[row] != kUpdateOperation) {
232
4
                continue;
233
4
            }
234
1
            op_values[row] = kUpdateDeleteOperation;
235
1
            update_rows.push_back(row);
236
1
        }
237
        // Second pass: extract only the update rows into a temporary column,
238
        // then batch-append from it. This avoids cloning the entire column.
239
5
        for (size_t col_idx = 0; col_idx < mutable_columns.size(); ++col_idx) {
240
4
            auto tmp = mutable_columns[col_idx]->clone_empty();
241
4
            for (size_t row : update_rows) {
242
4
                tmp->insert_from(*mutable_columns[col_idx], row);
243
4
            }
244
4
            mutable_columns[col_idx]->insert_range_from(*tmp, 0, tmp->size());
245
4
        }
246
        // Mark the newly appended rows as INSERT and assign their channels.
247
1
        DCHECK(_insert_random || !insert_hashes.empty());
248
1
        const size_t appended_update_begin = rows;
249
2
        for (size_t idx = 0; idx < update_rows.size(); ++idx) {
250
1
            const size_t row = update_rows[idx];
251
1
            op_values[appended_update_begin + idx] = kUpdateInsertOperation;
252
1
            const uint32_t insert_channel =
253
1
                    _insert_random ? _next_rr_channel() : insert_hashes[row];
254
1
            _channel_ids.push_back(insert_channel);
255
1
        }
256
1
        block->set_columns(std::move(mutable_columns));
257
1
    }
258
259
3
    return Status::OK();
260
3
}
261
262
0
Status MergePartitioner::clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) {
263
0
    auto* new_partitioner =
264
0
            new MergePartitioner(_partition_count, _merge_info, _use_new_shuffle_hash_method);
265
0
    partitioner.reset(new_partitioner);
266
0
    RETURN_IF_ERROR(
267
0
            _clone_expr_ctxs(state, _operation_expr_ctxs, new_partitioner->_operation_expr_ctxs));
268
0
    if (_insert_partition_function != nullptr) {
269
0
        RETURN_IF_ERROR(_insert_partition_function->clone(
270
0
                state, new_partitioner->_insert_partition_function));
271
0
    }
272
0
    if (_delete_partition_function != nullptr) {
273
0
        RETURN_IF_ERROR(_delete_partition_function->clone(
274
0
                state, new_partitioner->_delete_partition_function));
275
0
    }
276
0
    new_partitioner->_insert_random = _insert_random;
277
0
    new_partitioner->_rr_offset = _rr_offset;
278
0
    return Status::OK();
279
0
}
280
281
void MergePartitioner::_apply_insert_rebalance(const std::vector<int8_t>& ops,
282
                                               std::vector<uint32_t>& insert_hashes,
283
1
                                               size_t block_bytes) const {
284
1
    if (!_enable_insert_rebalance || _insert_writer_assigner == nullptr) {
285
0
        return;
286
0
    }
287
1
    if (insert_hashes.empty() || _insert_partition_count == 0) {
288
0
        return;
289
0
    }
290
1
    std::vector<uint8_t> mask(ops.size(), 0);
291
6
    for (size_t i = 0; i < ops.size(); ++i) {
292
5
        if (is_insert_op(ops[i])) {
293
3
            mask[i] = 1;
294
3
        }
295
5
    }
296
1
    _insert_writer_assigner->assign(insert_hashes, &mask, ops.size(), block_bytes, insert_hashes);
297
1
}
298
299
5
void MergePartitioner::_init_insert_scaling(RuntimeState* state) {
300
5
    _enable_insert_rebalance = false;
301
5
    _insert_partition_count = 0;
302
5
    _insert_data_processed = 0;
303
5
    _insert_writer_count = 1;
304
5
    _insert_writer_assigner.reset();
305
5
    _non_partition_scaling_threshold =
306
5
            config::table_sink_non_partition_write_scaling_data_processed_threshold;
307
308
5
    if (_partition_count == 0) {
309
0
        return;
310
0
    }
311
5
    if (_insert_random) {
312
2
        return;
313
2
    }
314
3
    if (_insert_partition_function == nullptr) {
315
1
        return;
316
1
    }
317
318
2
    int max_partitions_per_writer =
319
2
            config::table_sink_partition_write_max_partition_nums_per_writer;
320
2
    if (max_partitions_per_writer <= 0) {
321
1
        return;
322
1
    }
323
1
    _insert_partition_count = _partition_count * max_partitions_per_writer;
324
1
    if (_insert_partition_count == 0) {
325
0
        return;
326
0
    }
327
328
1
    int task_num = state == nullptr ? 0 : state->task_num();
329
1
    int64_t min_partition_threshold = scale_threshold_by_task(
330
1
            config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
331
1
            task_num);
332
1
    int64_t min_data_threshold = scale_threshold_by_task(
333
1
            config::table_sink_partition_write_min_data_processed_rebalance_threshold, task_num);
334
335
1
    _insert_writer_assigner = std::make_unique<SkewedWriterAssigner>(
336
1
            static_cast<int>(_insert_partition_count), static_cast<int>(_partition_count), 1,
337
1
            min_partition_threshold, min_data_threshold);
338
1
    _enable_insert_rebalance = true;
339
1
}
340
341
4
uint32_t MergePartitioner::_next_rr_channel() const {
342
4
    uint32_t writer_count = static_cast<uint32_t>(_partition_count);
343
4
    if (_insert_random && _insert_writer_count > 0) {
344
4
        writer_count = std::min<uint32_t>(static_cast<uint32_t>(_partition_count),
345
4
                                          static_cast<uint32_t>(_insert_writer_count));
346
4
    }
347
4
    if (writer_count == 0) {
348
0
        return 0;
349
0
    }
350
4
    const uint32_t channel = _rr_offset % writer_count;
351
4
    _rr_offset = (_rr_offset + 1) % writer_count;
352
4
    return channel;
353
4
}
354
355
Status MergePartitioner::_clone_expr_ctxs(RuntimeState* state, const VExprContextSPtrs& src,
356
0
                                          VExprContextSPtrs& dst) const {
357
0
    dst.resize(src.size());
358
0
    for (size_t i = 0; i < src.size(); ++i) {
359
0
        RETURN_IF_ERROR(src[i]->clone(state, dst[i]));
360
0
    }
361
0
    return Status::OK();
362
0
}
363
364
#include "common/compile_check_end.h"
365
} // namespace doris