Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/connector/skewed_partition_rebalancer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is porting from
18
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
19
// to cpp and modified by Doris
20
21
#include "exec/connector/skewed_partition_rebalancer.h"
22
23
#include <cmath>
24
25
namespace doris {
26
#include "common/compile_check_avoid_begin.h"
27
28
SkewedPartitionRebalancer::SkewedPartitionRebalancer(
29
        int partition_count, int task_count, int task_bucket_count,
30
        long min_partition_data_processed_rebalance_threshold,
31
        long min_data_processed_rebalance_threshold)
32
6
        : _partition_count(partition_count),
33
6
          _task_count(task_count),
34
6
          _task_bucket_count(task_bucket_count),
35
          _min_partition_data_processed_rebalance_threshold(
36
6
                  min_partition_data_processed_rebalance_threshold),
37
          _min_data_processed_rebalance_threshold(
38
6
                  std::max(min_partition_data_processed_rebalance_threshold,
39
6
                           min_data_processed_rebalance_threshold)),
40
6
          _partition_row_count(partition_count, 0),
41
6
          _data_processed(0),
42
6
          _data_processed_at_last_rebalance(0),
43
6
          _partition_data_size(partition_count, 0),
44
6
          _partition_data_size_at_last_rebalance(partition_count, 0),
45
6
          _partition_data_size_since_last_rebalance_per_task(partition_count, 0),
46
6
          _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0),
47
6
          _partition_assignments(partition_count) {
48
6
    std::vector<int> task_bucket_ids(task_count, 0);
49
50
27
    for (int partition = 0; partition < partition_count; partition++) {
51
21
        int task_id = partition % task_count;
52
21
        int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count;
53
21
        TaskBucket task_bucket(task_id, bucket_id, task_bucket_count);
54
21
        _partition_assignments[partition].emplace_back(task_bucket);
55
21
    }
56
6
}
57
58
137
int SkewedPartitionRebalancer::get_task_id(uint32_t partition_id, int64_t index) {
59
137
    const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id];
60
137
    return task_ids[index % task_ids.size()].task_id;
61
137
}
62
63
10
void SkewedPartitionRebalancer::add_data_processed(long data_size) {
64
10
    _data_processed += data_size;
65
10
}
66
67
33
void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) {
68
33
    _partition_row_count[partition] += row_count;
69
33
}
70
71
10
void SkewedPartitionRebalancer::rebalance() {
72
10
    long current_data_processed = _data_processed;
73
10
    if (_should_rebalance(current_data_processed)) {
74
8
        _rebalance_partitions(current_data_processed);
75
8
    }
76
10
}
77
78
8
void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) {
79
8
    long total_partition_row_count = 0;
80
35
    for (uint32_t partition = 0; partition < _partition_count; partition++) {
81
27
        total_partition_row_count += _partition_row_count[partition];
82
27
    }
83
84
35
    for (uint32_t partition = 0; partition < _partition_count; partition++) {
85
27
        _partition_data_size[partition] = std::max(
86
27
                (_partition_row_count[partition] * data_processed) / total_partition_row_count,
87
27
                _partition_data_size[partition]);
88
27
    }
89
8
}
90
91
long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance(
92
        IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
93
57
                max_partitions) {
94
57
    long estimated_data_size_since_last_rebalance = 0;
95
57
    for (const auto& elem : max_partitions) {
96
32
        estimated_data_size_since_last_rebalance +=
97
32
                _partition_data_size_since_last_rebalance_per_task[elem];
98
32
    }
99
57
    return estimated_data_size_since_last_rebalance;
100
57
}
101
102
void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness(
103
        IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
104
                max_task_buckets,
105
        IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
106
                min_task_buckets,
107
        std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
108
8
                task_bucket_max_partitions) {
109
8
    std::vector<int> scaled_partitions;
110
49
    while (true) {
111
49
        std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll();
112
49
        if (!max_task_bucket.has_value()) {
113
2
            break;
114
2
        }
115
116
47
        IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
117
47
                max_partitions = task_bucket_max_partitions[max_task_bucket->id];
118
47
        if (max_partitions.is_empty()) {
119
26
            continue;
120
26
        }
121
122
        // All `TaskBucket`s are skewed.
123
21
        std::vector<TaskBucket> min_skewed_task_buckets =
124
21
                _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets);
125
21
        if (min_skewed_task_buckets.empty()) {
126
6
            break;
127
6
        }
128
129
27
        while (true) {
130
27
            std::optional<int> max_partition = max_partitions.poll();
131
27
            if (!max_partition.has_value()) {
132
10
                break;
133
10
            }
134
17
            int max_partition_value = max_partition.value();
135
136
17
            if (std::find(scaled_partitions.begin(), scaled_partitions.end(),
137
17
                          max_partition_value) != scaled_partitions.end()) {
138
                // already scaled
139
1
                continue;
140
1
            }
141
142
16
            int total_assigned_tasks = _partition_assignments[max_partition_value].size();
143
16
            if (_partition_data_size[max_partition_value] >=
144
16
                (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) {
145
15
                for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) {
146
15
                    if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets,
147
15
                                             min_task_buckets)) {
148
10
                        scaled_partitions.push_back(max_partition_value);
149
10
                        break;
150
10
                    }
151
15
                }
152
11
            } else {
153
5
                break;
154
5
            }
155
16
        }
156
15
    }
157
8
}
158
159
std::vector<SkewedPartitionRebalancer::TaskBucket>
160
SkewedPartitionRebalancer::_find_skewed_min_task_buckets(
161
        const TaskBucket& max_task_bucket,
162
        const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
163
21
                min_task_buckets) {
164
21
    std::vector<TaskBucket> min_skewed_task_buckets;
165
166
92
    for (const auto& min_task_bucket : min_task_buckets) {
167
92
        double skewness =
168
92
                static_cast<double>(
169
92
                        _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] -
170
92
                        _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) /
171
92
                _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id];
172
92
        if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) {
173
21
            break;
174
21
        }
175
71
        if (max_task_bucket.task_id != min_task_bucket.task_id) {
176
52
            min_skewed_task_buckets.push_back(min_task_bucket);
177
52
        }
178
71
    }
179
21
    return min_skewed_task_buckets;
180
21
}
181
182
bool SkewedPartitionRebalancer::_rebalance_partition(
183
        int partition_id, const TaskBucket& to_task_bucket,
184
        IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
185
                max_task_buckets,
186
        IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
187
15
                min_task_buckets) {
188
15
    std::vector<TaskBucket>& assignments = _partition_assignments[partition_id];
189
15
    if (std::any_of(assignments.begin(), assignments.end(),
190
22
                    [&to_task_bucket](const TaskBucket& task_bucket) {
191
22
                        return task_bucket.task_id == to_task_bucket.task_id;
192
22
                    })) {
193
5
        return false;
194
5
    }
195
196
10
    assignments.push_back(to_task_bucket);
197
198
10
    int new_task_count = assignments.size();
199
10
    int old_task_count = new_task_count - 1;
200
24
    for (const TaskBucket& task_bucket : assignments) {
201
24
        if (task_bucket == to_task_bucket) {
202
10
            _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] +=
203
10
                    (_partition_data_size_since_last_rebalance_per_task[partition_id] *
204
10
                     old_task_count) /
205
10
                    new_task_count;
206
14
        } else {
207
14
            _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -=
208
14
                    _partition_data_size_since_last_rebalance_per_task[partition_id] /
209
14
                    new_task_count;
210
14
        }
211
24
        max_task_buckets.add_or_update(
212
24
                task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]);
213
24
        min_task_buckets.add_or_update(
214
24
                task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]);
215
24
    }
216
217
10
    return true;
218
15
}
219
220
18
bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) {
221
18
    return (data_processed - _data_processed_at_last_rebalance) >=
222
18
           _min_data_processed_rebalance_threshold;
223
18
}
224
225
8
void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) {
226
8
    DCHECK(_should_rebalance(data_processed));
227
228
8
    _calculate_partition_data_size(data_processed);
229
230
35
    for (int partition = 0; partition < _partition_count; partition++) {
231
27
        long data_size = _partition_data_size[partition];
232
27
        auto delta = data_size - _partition_data_size_at_last_rebalance[partition];
233
27
        _partition_data_size_since_last_rebalance_per_task[partition] =
234
27
                delta / _partition_assignments[partition].size();
235
27
        _partition_data_size_at_last_rebalance[partition] = data_size;
236
27
    }
237
238
8
    std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>
239
8
            task_bucket_max_partitions;
240
65
    for (int i = 0; i < _task_count * _task_bucket_count; ++i) {
241
57
        task_bucket_max_partitions.emplace_back();
242
57
    }
243
244
35
    for (uint32_t partition = 0; partition < _partition_count; partition++) {
245
27
        auto& task_assignments = _partition_assignments[partition];
246
32
        for (const auto& task_bucket : task_assignments) {
247
32
            auto& queue = task_bucket_max_partitions[task_bucket.id];
248
32
            queue.add_or_update(partition,
249
32
                                _partition_data_size_since_last_rebalance_per_task[partition]);
250
32
        }
251
27
    }
252
253
8
    IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
254
8
            max_task_buckets;
255
8
    IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>
256
8
            min_task_buckets;
257
258
32
    for (int task_id = 0; task_id < _task_count; task_id++) {
259
81
        for (int bucket_id = 0; bucket_id < _task_bucket_count; bucket_id++) {
260
57
            TaskBucket task_bucket1(task_id, bucket_id, _task_bucket_count);
261
57
            TaskBucket task_bucket2(task_id, bucket_id, _task_bucket_count);
262
57
            _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] =
263
57
                    _calculate_task_bucket_data_size_since_last_rebalance(
264
57
                            task_bucket_max_partitions[task_bucket1.id]);
265
57
            max_task_buckets.add_or_update(
266
57
                    task_bucket1,
267
57
                    _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]);
268
57
            min_task_buckets.add_or_update(
269
57
                    task_bucket2,
270
57
                    _estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]);
271
57
        }
272
24
    }
273
274
8
    _rebalance_based_on_task_bucket_skewness(max_task_buckets, min_task_buckets,
275
8
                                             task_bucket_max_partitions);
276
8
    _data_processed_at_last_rebalance = data_processed;
277
8
}
278
#include "common/compile_check_avoid_end.h"
279
} // namespace doris