Coverage Report

Created: 2026-04-01 13:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/indexed_priority_queue.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is porting from
18
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
19
// to cpp and modified by Doris
20
21
#pragma once
22
23
#pragma once
24
25
#include <functional>
26
#include <iostream>
27
#include <map>
28
#include <optional>
29
#include <set>
30
31
/**
32
 * A priority queue with constant time contains(E) and log time remove(E)
33
 * Ties are broken by insertion order.
34
 * LOW_TO_HIGH is the priority order from low to high,
35
 * HIGH_TO_LOW is the priority order from high to low.
36
 * Those with the same priority are arranged in order of insertion.
37
 */
38
39
namespace doris {
40
41
template <typename T>
42
struct IndexedPriorityQueueEntry {
43
    T value;
44
    long priority;
45
    long generation;
46
47
    IndexedPriorityQueueEntry(T val, long prio, long gen)
48
416
            : value(std::move(val)), priority(prio), generation(gen) {}
49
};
50
51
enum class IndexedPriorityQueuePriorityOrdering { LOW_TO_HIGH, HIGH_TO_LOW };
52
53
template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering>
54
struct IndexedPriorityQueueComparator {
55
    bool operator()(const IndexedPriorityQueueEntry<T>& lhs,
56
1.88k
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
1.88k
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
982
            if (lhs.priority != rhs.priority) {
59
408
                return lhs.priority < rhs.priority;
60
408
            }
61
574
            return lhs.generation < rhs.generation;
62
982
        } else {
63
906
            if (lhs.priority != rhs.priority) {
64
350
                return lhs.priority > rhs.priority;
65
350
            }
66
556
            return lhs.generation < rhs.generation;
67
906
        }
68
1.88k
    }
_ZNK5doris30IndexedPriorityQueueComparatorIiLNS_36IndexedPriorityQueuePriorityOrderingE1EEclERKNS_25IndexedPriorityQueueEntryIiEES6_
Line
Count
Source
56
72
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
            if (lhs.priority != rhs.priority) {
59
                return lhs.priority < rhs.priority;
60
            }
61
            return lhs.generation < rhs.generation;
62
72
        } else {
63
72
            if (lhs.priority != rhs.priority) {
64
50
                return lhs.priority > rhs.priority;
65
50
            }
66
22
            return lhs.generation < rhs.generation;
67
72
        }
68
72
    }
_ZNK5doris30IndexedPriorityQueueComparatorIiLNS_36IndexedPriorityQueuePriorityOrderingE0EEclERKNS_25IndexedPriorityQueueEntryIiEES6_
Line
Count
Source
56
62
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
62
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
62
            if (lhs.priority != rhs.priority) {
59
46
                return lhs.priority < rhs.priority;
60
46
            }
61
16
            return lhs.generation < rhs.generation;
62
        } else {
63
            if (lhs.priority != rhs.priority) {
64
                return lhs.priority > rhs.priority;
65
            }
66
            return lhs.generation < rhs.generation;
67
        }
68
62
    }
_ZNK5doris30IndexedPriorityQueueComparatorINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EEclERKNS_25IndexedPriorityQueueEntryIS2_EES8_
Line
Count
Source
56
834
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
            if (lhs.priority != rhs.priority) {
59
                return lhs.priority < rhs.priority;
60
            }
61
            return lhs.generation < rhs.generation;
62
834
        } else {
63
834
            if (lhs.priority != rhs.priority) {
64
300
                return lhs.priority > rhs.priority;
65
300
            }
66
534
            return lhs.generation < rhs.generation;
67
834
        }
68
834
    }
_ZNK5doris30IndexedPriorityQueueComparatorINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EEclERKNS_25IndexedPriorityQueueEntryIS2_EES8_
Line
Count
Source
56
920
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
920
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
920
            if (lhs.priority != rhs.priority) {
59
362
                return lhs.priority < rhs.priority;
60
362
            }
61
558
            return lhs.generation < rhs.generation;
62
        } else {
63
            if (lhs.priority != rhs.priority) {
64
                return lhs.priority > rhs.priority;
65
            }
66
            return lhs.generation < rhs.generation;
67
        }
68
920
    }
69
};
70
71
template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering =
72
                              IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
73
class IndexedPriorityQueue {
74
public:
75
    struct Prioritized {
76
        T value;
77
        long priority;
78
    };
79
80
150
    IndexedPriorityQueue() = default;
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EEC2Ev
Line
Count
Source
80
116
    IndexedPriorityQueue() = default;
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EEC2Ev
Line
Count
Source
80
2
    IndexedPriorityQueue() = default;
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EEC2Ev
Line
Count
Source
80
16
    IndexedPriorityQueue() = default;
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EEC2Ev
Line
Count
Source
80
16
    IndexedPriorityQueue() = default;
81
82
416
    bool add_or_update(T element, long priority) {
83
416
        auto it = _index.find(element);
84
416
        if (it != _index.end()) {
85
80
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
80
            _queue.erase(it->second);
89
80
        }
90
416
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
416
        _queue.insert(std::move(entry));
92
416
        _index.insert({entry.value, std::move(entry)});
93
416
        return true;
94
416
    }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE13add_or_updateEil
Line
Count
Source
82
78
    bool add_or_update(T element, long priority) {
83
78
        auto it = _index.find(element);
84
78
        if (it != _index.end()) {
85
2
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
2
            _queue.erase(it->second);
89
2
        }
90
78
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
78
        _queue.insert(std::move(entry));
92
78
        _index.insert({entry.value, std::move(entry)});
93
78
        return true;
94
78
    }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE13add_or_updateEil
Line
Count
Source
82
14
    bool add_or_update(T element, long priority) {
83
14
        auto it = _index.find(element);
84
14
        if (it != _index.end()) {
85
2
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
2
            _queue.erase(it->second);
89
2
        }
90
14
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
14
        _queue.insert(std::move(entry));
92
14
        _index.insert({entry.value, std::move(entry)});
93
14
        return true;
94
14
    }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EE13add_or_updateES2_l
Line
Count
Source
82
162
    bool add_or_update(T element, long priority) {
83
162
        auto it = _index.find(element);
84
162
        if (it != _index.end()) {
85
28
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
28
            _queue.erase(it->second);
89
28
        }
90
162
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
162
        _queue.insert(std::move(entry));
92
162
        _index.insert({entry.value, std::move(entry)});
93
162
        return true;
94
162
    }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE13add_or_updateES2_l
Line
Count
Source
82
162
    bool add_or_update(T element, long priority) {
83
162
        auto it = _index.find(element);
84
162
        if (it != _index.end()) {
85
48
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
48
            _queue.erase(it->second);
89
48
        }
90
162
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
162
        _queue.insert(std::move(entry));
92
162
        _index.insert({entry.value, std::move(entry)});
93
162
        return true;
94
162
    }
95
96
    bool contains(const T& element) const { return _index.find(element) != _index.end(); }
97
98
4
    bool remove(const T& element) {
99
4
        auto it = _index.find(element);
100
4
        if (it != _index.end()) {
101
4
            _queue.erase(it->second);
102
4
            _index.erase(it);
103
4
            return true;
104
4
        }
105
0
        return false;
106
4
    }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE6removeERKi
Line
Count
Source
98
2
    bool remove(const T& element) {
99
2
        auto it = _index.find(element);
100
2
        if (it != _index.end()) {
101
2
            _queue.erase(it->second);
102
2
            _index.erase(it);
103
2
            return true;
104
2
        }
105
0
        return false;
106
2
    }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE6removeERKi
Line
Count
Source
98
2
    bool remove(const T& element) {
99
2
        auto it = _index.find(element);
100
2
        if (it != _index.end()) {
101
2
            _queue.erase(it->second);
102
2
            _index.erase(it);
103
2
            return true;
104
2
        }
105
0
        return false;
106
2
    }
107
108
152
    std::optional<T> poll() {
109
152
        if (_queue.empty()) {
110
24
            return std::nullopt;
111
24
        }
112
128
        T value = _queue.begin()->value;
113
128
        _index.erase(value);
114
128
        _queue.erase(_queue.begin());
115
128
        return value;
116
152
    }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EE4pollEv
Line
Count
Source
108
98
    std::optional<T> poll() {
109
98
        if (_queue.empty()) {
110
4
            return std::nullopt;
111
4
        }
112
94
        T value = _queue.begin()->value;
113
94
        _index.erase(value);
114
94
        _queue.erase(_queue.begin());
115
94
        return value;
116
98
    }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE4pollEv
Line
Count
Source
108
54
    std::optional<T> poll() {
109
54
        if (_queue.empty()) {
110
20
            return std::nullopt;
111
20
        }
112
34
        T value = _queue.begin()->value;
113
34
        _index.erase(value);
114
34
        _queue.erase(_queue.begin());
115
34
        return value;
116
54
    }
117
118
    std::optional<Prioritized> peek() const {
119
        if (_queue.empty()) {
120
            return std::nullopt;
121
        }
122
        const IndexedPriorityQueueEntry<T>& entry = *_queue.begin();
123
        return Prioritized {entry.value, entry.priority};
124
    }
125
126
    int size() const { return _queue.size(); }
127
128
94
    bool is_empty() const { return _queue.empty(); }
129
130
    class Iterator {
131
    public:
132
        using iterator_category = std::forward_iterator_tag;
133
        using value_type = T;
134
        using difference_type = std::ptrdiff_t;
135
        using pointer = T*;
136
        using reference = T&;
137
138
        Iterator() : _iter() {}
139
        explicit Iterator(
140
                typename std::set<
141
                        IndexedPriorityQueueEntry<T>,
142
                        IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator iter)
143
336
                : _iter(iter) {}
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorC2ESt23_Rb_tree_const_iteratorINS_25IndexedPriorityQueueEntryIiEEE
Line
Count
Source
143
240
                : _iter(iter) {}
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorC2ESt23_Rb_tree_const_iteratorINS_25IndexedPriorityQueueEntryIiEEE
Line
Count
Source
143
12
                : _iter(iter) {}
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorC2ESt23_Rb_tree_const_iteratorINS_25IndexedPriorityQueueEntryIS2_EEE
Line
Count
Source
143
84
                : _iter(iter) {}
144
145
312
        const T& operator*() const { return _iter->value; }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratordeEv
Line
Count
Source
145
96
        const T& operator*() const { return _iter->value; }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratordeEv
Line
Count
Source
145
32
        const T& operator*() const { return _iter->value; }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratordeEv
Line
Count
Source
145
184
        const T& operator*() const { return _iter->value; }
146
147
        const T* operator->() const { return &(_iter->value); }
148
149
270
        Iterator& operator++() {
150
270
            ++_iter;
151
270
            return *this;
152
270
        }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorppEv
Line
Count
Source
149
96
        Iterator& operator++() {
150
96
            ++_iter;
151
96
            return *this;
152
96
        }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorppEv
Line
Count
Source
149
32
        Iterator& operator++() {
150
32
            ++_iter;
151
32
            return *this;
152
32
        }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorppEv
Line
Count
Source
149
142
        Iterator& operator++() {
150
142
            ++_iter;
151
142
            return *this;
152
142
        }
153
154
        Iterator operator++(int) {
155
            Iterator tmp = *this;
156
            ++(*this);
157
            return tmp;
158
        }
159
160
438
        bool operator==(const Iterator& other) const { return _iter == other._iter; }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratoreqERKS3_
Line
Count
Source
160
216
        bool operator==(const Iterator& other) const { return _iter == other._iter; }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratoreqERKS3_
Line
Count
Source
160
38
        bool operator==(const Iterator& other) const { return _iter == other._iter; }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratoreqERKS5_
Line
Count
Source
160
184
        bool operator==(const Iterator& other) const { return _iter == other._iter; }
161
162
438
        bool operator!=(const Iterator& other) const { return !(*this == other); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorneERKS3_
Line
Count
Source
162
216
        bool operator!=(const Iterator& other) const { return !(*this == other); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorneERKS3_
Line
Count
Source
162
38
        bool operator!=(const Iterator& other) const { return !(*this == other); }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorneERKS5_
Line
Count
Source
162
184
        bool operator!=(const Iterator& other) const { return !(*this == other); }
163
164
    private:
165
        typename std::set<IndexedPriorityQueueEntry<T>,
166
                          IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator
167
                _iter;
168
    };
169
170
168
    Iterator begin() const { return Iterator(_queue.begin()); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE5beginEv
Line
Count
Source
170
120
    Iterator begin() const { return Iterator(_queue.begin()); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE5beginEv
Line
Count
Source
170
6
    Iterator begin() const { return Iterator(_queue.begin()); }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE5beginEv
Line
Count
Source
170
42
    Iterator begin() const { return Iterator(_queue.begin()); }
171
172
168
    Iterator end() const { return Iterator(_queue.end()); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE3endEv
Line
Count
Source
172
120
    Iterator end() const { return Iterator(_queue.end()); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE0EE3endEv
Line
Count
Source
172
6
    Iterator end() const { return Iterator(_queue.end()); }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE3endEv
Line
Count
Source
172
42
    Iterator end() const { return Iterator(_queue.end()); }
173
174
private:
175
    std::map<T, IndexedPriorityQueueEntry<T>> _index;
176
    std::set<IndexedPriorityQueueEntry<T>, IndexedPriorityQueueComparator<T, priority_ordering>>
177
            _queue;
178
179
    long generation = 0;
180
};
181
182
} // namespace doris