be/src/exec/connector/skewed_partition_rebalancer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is porting from |
18 | | // https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java |
19 | | // to cpp and modified by Doris |
20 | | |
21 | | /** |
22 | | * Helps in distributing big or skewed partitions across available tasks to improve the performance of |
23 | | * partitioned writes. |
24 | | * <p> |
25 | | * This rebalancer initialize a bunch of buckets for each task based on a given taskBucketCount and then tries to |
26 | | * uniformly distribute partitions across those buckets. This helps to mitigate two problems: |
27 | | * 1. Mitigate skewness across tasks. |
28 | | * 2. Scale few big partitions across tasks even if there's no skewness among them. This will essentially speed the |
29 | | * local scaling without impacting much overall resource utilization. |
30 | | * <p> |
31 | | * Example: |
32 | | * <p> |
33 | | * Before: 3 tasks, 3 buckets per task, and 2 skewed partitions |
34 | | * Task1 Task2 Task3 |
35 | | * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 |
36 | | * Bucket2 Bucket2 Bucket2 |
37 | | * Bucket3 Bucket3 Bucket3 |
38 | | * <p> |
39 | | * After rebalancing: |
40 | | * Task1 Task2 Task3 |
41 | | * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 (Part 1) |
42 | | * Bucket2 (Part 2) Bucket2 (Part 1) Bucket2 (Part 2) |
43 | | * Bucket3 Bucket3 Bucket3 |
44 | | */ |
45 | | |
46 | | #pragma once |
47 | | |
48 | | #include <glog/logging.h> |
49 | | |
50 | | #include <vector> |
51 | | |
52 | | #include "util/indexed_priority_queue.hpp" |
53 | | |
54 | | namespace doris { |
55 | | class SkewedPartitionRebalancer { |
56 | | private: |
57 | | struct TaskBucket { |
58 | | // `task_bucket_count_` is always 1. |
59 | | int task_id; |
60 | | int id; |
61 | | |
62 | | TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_) |
63 | 135 | : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) { |
64 | 135 | DCHECK_LT(bucket_id_, task_bucket_count_); |
65 | 135 | } |
66 | | |
67 | 24 | bool operator==(const TaskBucket& other) const { return id == other.id; } |
68 | | |
69 | 1.33k | bool operator<(const TaskBucket& other) const { return id < other.id; } |
70 | | |
71 | 0 | bool operator>(const TaskBucket& other) const { return id > other.id; } |
72 | | }; |
73 | | |
74 | | public: |
75 | | SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count, |
76 | | long min_partition_data_processed_rebalance_threshold, |
77 | | long min_data_processed_rebalance_threshold); |
78 | | |
79 | | int get_task_id(uint32_t partition_id, int64_t index); |
80 | | void add_data_processed(long data_size); |
81 | | void add_partition_row_count(int partition, long row_count); |
82 | | void rebalance(); |
83 | | |
84 | | private: |
85 | | void _calculate_partition_data_size(long data_processed); |
86 | | long _calculate_task_bucket_data_size_since_last_rebalance( |
87 | | IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& |
88 | | max_partitions); |
89 | | void _rebalance_based_on_task_bucket_skewness( |
90 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& |
91 | | max_task_buckets, |
92 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& |
93 | | min_task_buckets, |
94 | | std::vector< |
95 | | IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& |
96 | | task_bucket_max_partitions); |
97 | | std::vector<TaskBucket> _find_skewed_min_task_buckets( |
98 | | const TaskBucket& max_task_bucket, |
99 | | const IndexedPriorityQueue<TaskBucket, |
100 | | IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& |
101 | | min_task_buckets); |
102 | | bool _rebalance_partition( |
103 | | int partition_id, const TaskBucket& to_task_bucket, |
104 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& |
105 | | max_task_buckets, |
106 | | IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& |
107 | | min_task_buckets); |
108 | | |
109 | | bool _should_rebalance(long data_processed); |
110 | | void _rebalance_partitions(long data_processed); |
111 | | |
112 | | static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7; |
113 | | |
114 | | // One or more tasks in one partition. `_task_count` equals to the number of channels and `_task_bucket_count` is always 1. |
115 | | const uint32_t _partition_count; |
116 | | const int _task_count; |
117 | | const int _task_bucket_count; |
118 | | long _min_partition_data_processed_rebalance_threshold; |
119 | | long _min_data_processed_rebalance_threshold; |
120 | | std::vector<long> _partition_row_count; |
121 | | long _data_processed; |
122 | | long _data_processed_at_last_rebalance; |
123 | | std::vector<long> _partition_data_size; |
124 | | std::vector<long> _partition_data_size_at_last_rebalance; |
125 | | std::vector<long> _partition_data_size_since_last_rebalance_per_task; |
126 | | std::vector<long> _estimated_task_bucket_data_size_since_last_rebalance; |
127 | | |
128 | | std::vector<std::vector<TaskBucket>> _partition_assignments; |
129 | | }; |
130 | | } // namespace doris |