Coverage Report

Created: 2026-03-14 13:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/indexed_priority_queue.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is porting from
18
// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
19
// to cpp and modified by Doris
20
21
#pragma once
22
23
#pragma once
24
25
#include <functional>
26
#include <iostream>
27
#include <map>
28
#include <optional>
29
#include <set>
30
31
/**
32
 * A priority queue with constant time contains(E) and log time remove(E)
33
 * Ties are broken by insertion order.
34
 * LOW_TO_HIGH is the priority order from low to high,
35
 * HIGH_TO_LOW is the priority order from high to low.
36
 * Those with the same priority are arranged in order of insertion.
37
 */
38
39
namespace doris {
40
41
template <typename T>
42
struct IndexedPriorityQueueEntry {
43
    T value;
44
    long priority;
45
    long generation;
46
47
    IndexedPriorityQueueEntry(T val, long prio, long gen)
48
208
            : value(std::move(val)), priority(prio), generation(gen) {}
49
};
50
51
enum class IndexedPriorityQueuePriorityOrdering { LOW_TO_HIGH, HIGH_TO_LOW };
52
53
template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering>
54
struct IndexedPriorityQueueComparator {
55
    bool operator()(const IndexedPriorityQueueEntry<T>& lhs,
56
913
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
913
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
460
            if (lhs.priority != rhs.priority) {
59
181
                return lhs.priority < rhs.priority;
60
181
            }
61
279
            return lhs.generation < rhs.generation;
62
460
        } else {
63
453
            if (lhs.priority != rhs.priority) {
64
175
                return lhs.priority > rhs.priority;
65
175
            }
66
278
            return lhs.generation < rhs.generation;
67
453
        }
68
913
    }
_ZNK5doris30IndexedPriorityQueueComparatorINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EEclERKNS_25IndexedPriorityQueueEntryIS2_EES8_
Line
Count
Source
56
417
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
            if (lhs.priority != rhs.priority) {
59
                return lhs.priority < rhs.priority;
60
            }
61
            return lhs.generation < rhs.generation;
62
417
        } else {
63
417
            if (lhs.priority != rhs.priority) {
64
150
                return lhs.priority > rhs.priority;
65
150
            }
66
267
            return lhs.generation < rhs.generation;
67
417
        }
68
417
    }
_ZNK5doris30IndexedPriorityQueueComparatorINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EEclERKNS_25IndexedPriorityQueueEntryIS2_EES8_
Line
Count
Source
56
460
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
460
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
460
            if (lhs.priority != rhs.priority) {
59
181
                return lhs.priority < rhs.priority;
60
181
            }
61
279
            return lhs.generation < rhs.generation;
62
        } else {
63
            if (lhs.priority != rhs.priority) {
64
                return lhs.priority > rhs.priority;
65
            }
66
            return lhs.generation < rhs.generation;
67
        }
68
460
    }
_ZNK5doris30IndexedPriorityQueueComparatorIiLNS_36IndexedPriorityQueuePriorityOrderingE1EEclERKNS_25IndexedPriorityQueueEntryIiEES6_
Line
Count
Source
56
36
                    const IndexedPriorityQueueEntry<T>& rhs) const {
57
        if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
58
            if (lhs.priority != rhs.priority) {
59
                return lhs.priority < rhs.priority;
60
            }
61
            return lhs.generation < rhs.generation;
62
36
        } else {
63
36
            if (lhs.priority != rhs.priority) {
64
25
                return lhs.priority > rhs.priority;
65
25
            }
66
11
            return lhs.generation < rhs.generation;
67
36
        }
68
36
    }
69
};
70
71
template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering =
72
                              IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
73
class IndexedPriorityQueue {
74
public:
75
    struct Prioritized {
76
        T value;
77
        long priority;
78
    };
79
80
74
    IndexedPriorityQueue() = default;
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EEC2Ev
Line
Count
Source
80
58
    IndexedPriorityQueue() = default;
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EEC2Ev
Line
Count
Source
80
8
    IndexedPriorityQueue() = default;
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EEC2Ev
Line
Count
Source
80
8
    IndexedPriorityQueue() = default;
81
82
201
    bool add_or_update(T element, long priority) {
83
201
        auto it = _index.find(element);
84
201
        if (it != _index.end()) {
85
39
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
39
            _queue.erase(it->second);
89
39
        }
90
201
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
201
        _queue.insert(std::move(entry));
92
201
        _index.insert({entry.value, std::move(entry)});
93
201
        return true;
94
201
    }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EE13add_or_updateES2_l
Line
Count
Source
82
81
    bool add_or_update(T element, long priority) {
83
81
        auto it = _index.find(element);
84
81
        if (it != _index.end()) {
85
14
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
14
            _queue.erase(it->second);
89
14
        }
90
81
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
81
        _queue.insert(std::move(entry));
92
81
        _index.insert({entry.value, std::move(entry)});
93
81
        return true;
94
81
    }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE13add_or_updateES2_l
Line
Count
Source
82
81
    bool add_or_update(T element, long priority) {
83
81
        auto it = _index.find(element);
84
81
        if (it != _index.end()) {
85
24
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
24
            _queue.erase(it->second);
89
24
        }
90
81
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
81
        _queue.insert(std::move(entry));
92
81
        _index.insert({entry.value, std::move(entry)});
93
81
        return true;
94
81
    }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE13add_or_updateEil
Line
Count
Source
82
39
    bool add_or_update(T element, long priority) {
83
39
        auto it = _index.find(element);
84
39
        if (it != _index.end()) {
85
1
            if (it->second.priority == priority) {
86
0
                return false;
87
0
            }
88
1
            _queue.erase(it->second);
89
1
        }
90
39
        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++};
91
39
        _queue.insert(std::move(entry));
92
39
        _index.insert({entry.value, std::move(entry)});
93
39
        return true;
94
39
    }
95
96
    bool contains(const T& element) const { return _index.find(element) != _index.end(); }
97
98
    bool remove(const T& element) {
99
        auto it = _index.find(element);
100
        if (it != _index.end()) {
101
            _queue.erase(it->second);
102
            _index.erase(it);
103
            return true;
104
        }
105
        return false;
106
    }
107
108
76
    std::optional<T> poll() {
109
76
        if (_queue.empty()) {
110
12
            return std::nullopt;
111
12
        }
112
64
        T value = _queue.begin()->value;
113
64
        _index.erase(value);
114
64
        _queue.erase(_queue.begin());
115
64
        return value;
116
76
    }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EE4pollEv
Line
Count
Source
108
49
    std::optional<T> poll() {
109
49
        if (_queue.empty()) {
110
2
            return std::nullopt;
111
2
        }
112
47
        T value = _queue.begin()->value;
113
47
        _index.erase(value);
114
47
        _queue.erase(_queue.begin());
115
47
        return value;
116
49
    }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE4pollEv
Line
Count
Source
108
27
    std::optional<T> poll() {
109
27
        if (_queue.empty()) {
110
10
            return std::nullopt;
111
10
        }
112
17
        T value = _queue.begin()->value;
113
17
        _index.erase(value);
114
17
        _queue.erase(_queue.begin());
115
17
        return value;
116
27
    }
117
118
    std::optional<Prioritized> peek() const {
119
        if (_queue.empty()) {
120
            return std::nullopt;
121
        }
122
        const IndexedPriorityQueueEntry<T>& entry = *_queue.begin();
123
        return Prioritized {entry.value, entry.priority};
124
    }
125
126
    int size() const { return _queue.size(); }
127
128
47
    bool is_empty() const { return _queue.empty(); }
129
130
    class Iterator {
131
    public:
132
        using iterator_category = std::forward_iterator_tag;
133
        using value_type = T;
134
        using difference_type = std::ptrdiff_t;
135
        using pointer = T*;
136
        using reference = T&;
137
138
        Iterator() : _iter() {}
139
        explicit Iterator(
140
                typename std::set<
141
                        IndexedPriorityQueueEntry<T>,
142
                        IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator iter)
143
162
                : _iter(iter) {}
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorC2ESt23_Rb_tree_const_iteratorINS_25IndexedPriorityQueueEntryIiEEE
Line
Count
Source
143
120
                : _iter(iter) {}
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorC2ESt23_Rb_tree_const_iteratorINS_25IndexedPriorityQueueEntryIS2_EEE
Line
Count
Source
143
42
                : _iter(iter) {}
144
145
140
        const T& operator*() const { return _iter->value; }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratordeEv
Line
Count
Source
145
48
        const T& operator*() const { return _iter->value; }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratordeEv
Line
Count
Source
145
92
        const T& operator*() const { return _iter->value; }
146
147
        const T* operator->() const { return &(_iter->value); }
148
149
119
        Iterator& operator++() {
150
119
            ++_iter;
151
119
            return *this;
152
119
        }
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorppEv
Line
Count
Source
149
48
        Iterator& operator++() {
150
48
            ++_iter;
151
48
            return *this;
152
48
        }
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorppEv
Line
Count
Source
149
71
        Iterator& operator++() {
150
71
            ++_iter;
151
71
            return *this;
152
71
        }
153
154
        Iterator operator++(int) {
155
            Iterator tmp = *this;
156
            ++(*this);
157
            return tmp;
158
        }
159
160
200
        bool operator==(const Iterator& other) const { return _iter == other._iter; }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratoreqERKS3_
Line
Count
Source
160
108
        bool operator==(const Iterator& other) const { return _iter == other._iter; }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratoreqERKS5_
Line
Count
Source
160
92
        bool operator==(const Iterator& other) const { return _iter == other._iter; }
161
162
200
        bool operator!=(const Iterator& other) const { return !(*this == other); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorneERKS3_
Line
Count
Source
162
108
        bool operator!=(const Iterator& other) const { return !(*this == other); }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorneERKS5_
Line
Count
Source
162
92
        bool operator!=(const Iterator& other) const { return !(*this == other); }
163
164
    private:
165
        typename std::set<IndexedPriorityQueueEntry<T>,
166
                          IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator
167
                _iter;
168
    };
169
170
81
    Iterator begin() const { return Iterator(_queue.begin()); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE5beginEv
Line
Count
Source
170
60
    Iterator begin() const { return Iterator(_queue.begin()); }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE5beginEv
Line
Count
Source
170
21
    Iterator begin() const { return Iterator(_queue.begin()); }
171
172
81
    Iterator end() const { return Iterator(_queue.end()); }
_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE3endEv
Line
Count
Source
172
60
    Iterator end() const { return Iterator(_queue.end()); }
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE3endEv
Line
Count
Source
172
21
    Iterator end() const { return Iterator(_queue.end()); }
173
174
private:
175
    std::map<T, IndexedPriorityQueueEntry<T>> _index;
176
    std::set<IndexedPriorityQueueEntry<T>, IndexedPriorityQueueComparator<T, priority_ordering>>
177
            _queue;
178
179
    long generation = 0;
180
};
181
182
} // namespace doris