be/src/exec/connector/skewed_partition_rebalancer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is porting from |
18 | | // https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java |
19 | | // to cpp and modified by Doris |
20 | | |
21 | | #include "exec/connector/skewed_partition_rebalancer.h" |
22 | | |
23 | | #include <cmath> |
24 | | |
25 | | namespace doris { |
26 | | #include "common/compile_check_avoid_begin.h" |
27 | | |
28 | | SkewedPartitionRebalancer::SkewedPartitionRebalancer( |
29 | | int partition_count, int task_count, int task_bucket_count, |
30 | | long min_partition_data_processed_rebalance_threshold, |
31 | | long min_data_processed_rebalance_threshold) |
32 | 6 | : _partition_count(partition_count), |
33 | 6 | _task_count(task_count), |
34 | 6 | _task_bucket_count(task_bucket_count), |
35 | | _min_partition_data_processed_rebalance_threshold( |
36 | 6 | min_partition_data_processed_rebalance_threshold), |
37 | | _min_data_processed_rebalance_threshold( |
38 | 6 | std::max(min_partition_data_processed_rebalance_threshold, |
39 | 6 | min_data_processed_rebalance_threshold)), |
40 | 6 | _partition_row_count(partition_count, 0), |
41 | 6 | _data_processed(0), |
42 | 6 | _data_processed_at_last_rebalance(0), |
43 | 6 | _partition_data_size(partition_count, 0), |
44 | 6 | _partition_data_size_at_last_rebalance(partition_count, 0), |
45 | 6 | _partition_data_size_since_last_rebalance_per_task(partition_count, 0), |
46 | 6 | _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), |
47 | 6 | _partition_assignments(partition_count) { |
48 | 6 | std::vector<int> task_bucket_ids(task_count, 0); |
49 | | |
50 | 27 | for (int partition = 0; partition < partition_count; partition++) { |
51 | 21 | int task_id = partition % task_count; |
52 | 21 | int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; |
53 | 21 | TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); |
54 | 21 | _partition_assignments[partition].emplace_back(task_bucket); |
55 | 21 | } |
56 | 6 | } |
57 | | |
58 | 137 | int SkewedPartitionRebalancer::get_task_id(uint32_t partition_id, int64_t index) { |
59 | 137 | const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; |
60 | 137 | return task_ids[index % task_ids.size()].task_id; |
61 | 137 | } |
62 | | |
63 | 10 | void SkewedPartitionRebalancer::add_data_processed(long data_size) { |
64 | 10 | _data_processed += data_size; |
65 | 10 | } |
66 | | |
67 | 33 | void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { |
68 | 33 | _partition_row_count[partition] += row_count; |
69 | 33 | } |
70 | | |
71 | 10 | void SkewedPartitionRebalancer::rebalance() { |
72 | 10 | long current_data_processed = _data_processed; |
73 | 10 | if (_should_rebalance(current_data_processed)) { |
74 | 8 | _rebalance_partitions(current_data_processed); |
75 | 8 | } |
76 | 10 | } |
77 | | |
78 | 8 | void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { |
79 | 8 | long total_partition_row_count = 0; |
80 | 35 | for (uint32_t partition = 0; partition < _partition_count; partition++) { |
81 | 27 | total_partition_row_count += _partition_row_count[partition]; |
82 | 27 | } |
83 | | |
84 | 35 | for (uint32_t partition = 0; partition < _partition_count; partition++) { |
85 | 27 | _partition_data_size[partition] = std::max( |
86 | 27 | (_partition_row_count[partition] * data_processed) / total_partition_row_count, |
87 | 27 | _partition_data_size[partition]); |
88 | 27 | } |
89 | 8 | } |
90 | | |
91 | | long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( |
92 | | IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& |
93 | 57 | max_partitions) { |
94 | 57 | long estimated_data_size_since_last_rebalance = 0; |
95 | 57 | for (const auto& elem : max_partitions) { |
96 | 32 | estimated_data_size_since_last_rebalance += |
97 | 32 | _partition_data_size_since_last_rebalance_per_task[elem]; |
98 | 32 | } |
99 | 57 | return estimated_data_size_since_last_rebalance; |
100 | 57 | } |
101 | | |
102 | | void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( |
103 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& |
104 | | max_task_buckets, |
105 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& |
106 | | min_task_buckets, |
107 | | std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& |
108 | 8 | task_bucket_max_partitions) { |
109 | 8 | std::vector<int> scaled_partitions; |
110 | 49 | while (true) { |
111 | 49 | std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); |
112 | 49 | if (!max_task_bucket.has_value()) { |
113 | 2 | break; |
114 | 2 | } |
115 | | |
116 | 47 | IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& |
117 | 47 | max_partitions = task_bucket_max_partitions[max_task_bucket->id]; |
118 | 47 | if (max_partitions.is_empty()) { |
119 | 26 | continue; |
120 | 26 | } |
121 | | |
122 | | // All `TaskBucket`s are skewed. |
123 | 21 | std::vector<TaskBucket> min_skewed_task_buckets = |
124 | 21 | _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); |
125 | 21 | if (min_skewed_task_buckets.empty()) { |
126 | 6 | break; |
127 | 6 | } |
128 | | |
129 | 27 | while (true) { |
130 | 27 | std::optional<int> max_partition = max_partitions.poll(); |
131 | 27 | if (!max_partition.has_value()) { |
132 | 10 | break; |
133 | 10 | } |
134 | 17 | int max_partition_value = max_partition.value(); |
135 | | |
136 | 17 | if (std::find(scaled_partitions.begin(), scaled_partitions.end(), |
137 | 17 | max_partition_value) != scaled_partitions.end()) { |
138 | | // already scaled |
139 | 1 | continue; |
140 | 1 | } |
141 | | |
142 | 16 | int total_assigned_tasks = _partition_assignments[max_partition_value].size(); |
143 | 16 | if (_partition_data_size[max_partition_value] >= |
144 | 16 | (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { |
145 | 15 | for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { |
146 | 15 | if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, |
147 | 15 | min_task_buckets)) { |
148 | 10 | scaled_partitions.push_back(max_partition_value); |
149 | 10 | break; |
150 | 10 | } |
151 | 15 | } |
152 | 11 | } else { |
153 | 5 | break; |
154 | 5 | } |
155 | 16 | } |
156 | 15 | } |
157 | 8 | } |
158 | | |
159 | | std::vector<SkewedPartitionRebalancer::TaskBucket> |
160 | | SkewedPartitionRebalancer::_find_skewed_min_task_buckets( |
161 | | const TaskBucket& max_task_bucket, |
162 | | const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& |
163 | 21 | min_task_buckets) { |
164 | 21 | std::vector<TaskBucket> min_skewed_task_buckets; |
165 | | |
166 | 92 | for (const auto& min_task_bucket : min_task_buckets) { |
167 | 92 | double skewness = |
168 | 92 | static_cast<double>( |
169 | 92 | _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - |
170 | 92 | _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / |
171 | 92 | _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; |
172 | 92 | if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { |
173 | 21 | break; |
174 | 21 | } |
175 | 71 | if (max_task_bucket.task_id != min_task_bucket.task_id) { |
176 | 52 | min_skewed_task_buckets.push_back(min_task_bucket); |
177 | 52 | } |
178 | 71 | } |
179 | 21 | return min_skewed_task_buckets; |
180 | 21 | } |
181 | | |
182 | | bool SkewedPartitionRebalancer::_rebalance_partition( |
183 | | int partition_id, const TaskBucket& to_task_bucket, |
184 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& |
185 | | max_task_buckets, |
186 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& |
187 | 15 | min_task_buckets) { |
188 | 15 | std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; |
189 | 15 | if (std::any_of(assignments.begin(), assignments.end(), |
190 | 22 | [&to_task_bucket](const TaskBucket& task_bucket) { |
191 | 22 | return task_bucket.task_id == to_task_bucket.task_id; |
192 | 22 | })) { |
193 | 5 | return false; |
194 | 5 | } |
195 | | |
196 | 10 | assignments.push_back(to_task_bucket); |
197 | | |
198 | 10 | int new_task_count = assignments.size(); |
199 | 10 | int old_task_count = new_task_count - 1; |
200 | 24 | for (const TaskBucket& task_bucket : assignments) { |
201 | 24 | if (task_bucket == to_task_bucket) { |
202 | 10 | _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += |
203 | 10 | (_partition_data_size_since_last_rebalance_per_task[partition_id] * |
204 | 10 | old_task_count) / |
205 | 10 | new_task_count; |
206 | 14 | } else { |
207 | 14 | _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= |
208 | 14 | _partition_data_size_since_last_rebalance_per_task[partition_id] / |
209 | 14 | new_task_count; |
210 | 14 | } |
211 | 24 | max_task_buckets.add_or_update( |
212 | 24 | task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); |
213 | 24 | min_task_buckets.add_or_update( |
214 | 24 | task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); |
215 | 24 | } |
216 | | |
217 | 10 | return true; |
218 | 15 | } |
219 | | |
220 | 18 | bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { |
221 | 18 | return (data_processed - _data_processed_at_last_rebalance) >= |
222 | 18 | _min_data_processed_rebalance_threshold; |
223 | 18 | } |
224 | | |
225 | 8 | void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { |
226 | 8 | DCHECK(_should_rebalance(data_processed)); |
227 | | |
228 | 8 | _calculate_partition_data_size(data_processed); |
229 | | |
230 | 35 | for (int partition = 0; partition < _partition_count; partition++) { |
231 | 27 | long data_size = _partition_data_size[partition]; |
232 | 27 | auto delta = data_size - _partition_data_size_at_last_rebalance[partition]; |
233 | 27 | _partition_data_size_since_last_rebalance_per_task[partition] = |
234 | 27 | delta / _partition_assignments[partition].size(); |
235 | 27 | _partition_data_size_at_last_rebalance[partition] = data_size; |
236 | 27 | } |
237 | | |
238 | 8 | std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>> |
239 | 8 | task_bucket_max_partitions; |
240 | 65 | for (int i = 0; i < _task_count * _task_bucket_count; ++i) { |
241 | 57 | task_bucket_max_partitions.emplace_back(); |
242 | 57 | } |
243 | | |
244 | 35 | for (uint32_t partition = 0; partition < _partition_count; partition++) { |
245 | 27 | auto& task_assignments = _partition_assignments[partition]; |
246 | 32 | for (const auto& task_bucket : task_assignments) { |
247 | 32 | auto& queue = task_bucket_max_partitions[task_bucket.id]; |
248 | 32 | queue.add_or_update(partition, |
249 | 32 | _partition_data_size_since_last_rebalance_per_task[partition]); |
250 | 32 | } |
251 | 27 | } |
252 | | |
253 | 8 | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> |
254 | 8 | max_task_buckets; |
255 | 8 | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> |
256 | 8 | min_task_buckets; |
257 | | |
258 | 32 | for (int task_id = 0; task_id < _task_count; task_id++) { |
259 | 81 | for (int bucket_id = 0; bucket_id < _task_bucket_count; bucket_id++) { |
260 | 57 | TaskBucket task_bucket1(task_id, bucket_id, _task_bucket_count); |
261 | 57 | TaskBucket task_bucket2(task_id, bucket_id, _task_bucket_count); |
262 | 57 | _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = |
263 | 57 | _calculate_task_bucket_data_size_since_last_rebalance( |
264 | 57 | task_bucket_max_partitions[task_bucket1.id]); |
265 | 57 | max_task_buckets.add_or_update( |
266 | 57 | task_bucket1, |
267 | 57 | _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]); |
268 | 57 | min_task_buckets.add_or_update( |
269 | 57 | task_bucket2, |
270 | 57 | _estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]); |
271 | 57 | } |
272 | 24 | } |
273 | | |
274 | 8 | _rebalance_based_on_task_bucket_skewness(max_task_buckets, min_task_buckets, |
275 | 8 | task_bucket_max_partitions); |
276 | 8 | _data_processed_at_last_rebalance = data_processed; |
277 | 8 | } |
278 | | #include "common/compile_check_avoid_end.h" |
279 | | } // namespace doris |