Coverage Report

Created: 2026-03-16 01:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/connector/skewed_partition_rebalancer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is porting from
18
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
19
// to cpp and modified by Doris
20
21
/**
22
 * Helps in distributing big or skewed partitions across available tasks to improve the performance of
23
 * partitioned writes.
24
 * <p>
25
 * This rebalancer initialize a bunch of buckets for each task based on a given taskBucketCount and then tries to
26
 * uniformly distribute partitions across those buckets. This helps to mitigate two problems:
27
 * 1. Mitigate skewness across tasks.
28
 * 2. Scale few big partitions across tasks even if there's no skewness among them. This will essentially speed the
29
 *    local scaling without impacting much overall resource utilization.
30
 * <p>
31
 * Example:
32
 * <p>
33
 * Before: 3 tasks, 3 buckets per task, and 2 skewed partitions
34
 * Task1                Task2               Task3
35
 * Bucket1 (Part 1)     Bucket1 (Part 2)    Bucket1
36
 * Bucket2              Bucket2             Bucket2
37
 * Bucket3              Bucket3             Bucket3
38
 * <p>
39
 * After rebalancing:
40
 * Task1                Task2               Task3
41
 * Bucket1 (Part 1)     Bucket1 (Part 2)    Bucket1 (Part 1)
42
 * Bucket2 (Part 2)     Bucket2 (Part 1)    Bucket2 (Part 2)
43
 * Bucket3              Bucket3             Bucket3
44
 */
45
46
#pragma once
47
48
#include <glog/logging.h>
49
50
#include <vector>
51
52
#include "util/indexed_priority_queue.hpp"
53
54
namespace doris {
55
class SkewedPartitionRebalancer {
56
private:
57
    struct TaskBucket {
58
        // `task_bucket_count_` is always 1.
59
        int task_id;
60
        int id;
61
62
        TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_)
63
135
                : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {
64
135
            DCHECK_LT(bucket_id_, task_bucket_count_);
65
135
        }
66
67
24
        bool operator==(const TaskBucket& other) const { return id == other.id; }
68
69
1.33k
        bool operator<(const TaskBucket& other) const { return id < other.id; }
70
71
0
        bool operator>(const TaskBucket& other) const { return id > other.id; }
72
    };
73
74
public:
75
    SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count,
76
                              long min_partition_data_processed_rebalance_threshold,
77
                              long min_data_processed_rebalance_threshold);
78
79
    int get_task_id(uint32_t partition_id, int64_t index);
80
    void add_data_processed(long data_size);
81
    void add_partition_row_count(int partition, long row_count);
82
    void rebalance();
83
84
private:
85
    void _calculate_partition_data_size(long data_processed);
86
    long _calculate_task_bucket_data_size_since_last_rebalance(
87
            IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
88
                    max_partitions);
89
    void _rebalance_based_on_task_bucket_skewness(
90
            IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
91
                    max_task_buckets,
92
            IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
93
                    min_task_buckets,
94
            std::vector<
95
                    IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
96
                    task_bucket_max_partitions);
97
    std::vector<TaskBucket> _find_skewed_min_task_buckets(
98
            const TaskBucket& max_task_bucket,
99
            const IndexedPriorityQueue<TaskBucket,
100
                                       IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
101
                    min_task_buckets);
102
    bool _rebalance_partition(
103
            int partition_id, const TaskBucket& to_task_bucket,
104
            IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
105
                    max_task_buckets,
106
            IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
107
                    min_task_buckets);
108
109
    bool _should_rebalance(long data_processed);
110
    void _rebalance_partitions(long data_processed);
111
112
    static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7;
113
114
    // One or more tasks in one partition. `_task_count` equals to the number of channels and `_task_bucket_count` is always 1.
115
    const uint32_t _partition_count;
116
    const int _task_count;
117
    const int _task_bucket_count;
118
    long _min_partition_data_processed_rebalance_threshold;
119
    long _min_data_processed_rebalance_threshold;
120
    std::vector<long> _partition_row_count;
121
    long _data_processed;
122
    long _data_processed_at_last_rebalance;
123
    std::vector<long> _partition_data_size;
124
    std::vector<long> _partition_data_size_at_last_rebalance;
125
    std::vector<long> _partition_data_size_since_last_rebalance_per_task;
126
    std::vector<long> _estimated_task_bucket_data_size_since_last_rebalance;
127
128
    std::vector<std::vector<TaskBucket>> _partition_assignments;
129
};
130
} // namespace doris