be/src/util/indexed_priority_queue.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is porting from |
18 | | // https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java |
19 | | // to cpp and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #pragma once |
24 | | |
25 | | #include <functional> |
26 | | #include <iostream> |
27 | | #include <map> |
28 | | #include <optional> |
29 | | #include <set> |
30 | | |
31 | | /** |
32 | | * A priority queue with constant time contains(E) and log time remove(E) |
33 | | * Ties are broken by insertion order. |
34 | | * LOW_TO_HIGH is the priority order from low to high, |
35 | | * HIGH_TO_LOW is the priority order from high to low. |
36 | | * Those with the same priority are arranged in order of insertion. |
37 | | */ |
38 | | |
39 | | namespace doris { |
40 | | |
41 | | template <typename T> |
42 | | struct IndexedPriorityQueueEntry { |
43 | | T value; |
44 | | long priority; |
45 | | long generation; |
46 | | |
47 | | IndexedPriorityQueueEntry(T val, long prio, long gen) |
48 | 208 | : value(std::move(val)), priority(prio), generation(gen) {} |
49 | | }; |
50 | | |
51 | | enum class IndexedPriorityQueuePriorityOrdering { LOW_TO_HIGH, HIGH_TO_LOW }; |
52 | | |
53 | | template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering> |
54 | | struct IndexedPriorityQueueComparator { |
55 | | bool operator()(const IndexedPriorityQueueEntry<T>& lhs, |
56 | 913 | const IndexedPriorityQueueEntry<T>& rhs) const { |
57 | 913 | if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) { |
58 | 460 | if (lhs.priority != rhs.priority) { |
59 | 181 | return lhs.priority < rhs.priority; |
60 | 181 | } |
61 | 279 | return lhs.generation < rhs.generation; |
62 | 460 | } else { |
63 | 453 | if (lhs.priority != rhs.priority) { |
64 | 175 | return lhs.priority > rhs.priority; |
65 | 175 | } |
66 | 278 | return lhs.generation < rhs.generation; |
67 | 453 | } |
68 | 913 | } _ZNK5doris30IndexedPriorityQueueComparatorINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EEclERKNS_25IndexedPriorityQueueEntryIS2_EES8_ Line | Count | Source | 56 | 417 | const IndexedPriorityQueueEntry<T>& rhs) const { | 57 | | if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) { | 58 | | if (lhs.priority != rhs.priority) { | 59 | | return lhs.priority < rhs.priority; | 60 | | } | 61 | | return lhs.generation < rhs.generation; | 62 | 417 | } else { | 63 | 417 | if (lhs.priority != rhs.priority) { | 64 | 150 | return lhs.priority > rhs.priority; | 65 | 150 | } | 66 | 267 | return lhs.generation < rhs.generation; | 67 | 417 | } | 68 | 417 | } |
_ZNK5doris30IndexedPriorityQueueComparatorINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EEclERKNS_25IndexedPriorityQueueEntryIS2_EES8_ Line | Count | Source | 56 | 460 | const IndexedPriorityQueueEntry<T>& rhs) const { | 57 | 460 | if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) { | 58 | 460 | if (lhs.priority != rhs.priority) { | 59 | 181 | return lhs.priority < rhs.priority; | 60 | 181 | } | 61 | 279 | return lhs.generation < rhs.generation; | 62 | | } else { | 63 | | if (lhs.priority != rhs.priority) { | 64 | | return lhs.priority > rhs.priority; | 65 | | } | 66 | | return lhs.generation < rhs.generation; | 67 | | } | 68 | 460 | } |
_ZNK5doris30IndexedPriorityQueueComparatorIiLNS_36IndexedPriorityQueuePriorityOrderingE1EEclERKNS_25IndexedPriorityQueueEntryIiEES6_ Line | Count | Source | 56 | 36 | const IndexedPriorityQueueEntry<T>& rhs) const { | 57 | | if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) { | 58 | | if (lhs.priority != rhs.priority) { | 59 | | return lhs.priority < rhs.priority; | 60 | | } | 61 | | return lhs.generation < rhs.generation; | 62 | 36 | } else { | 63 | 36 | if (lhs.priority != rhs.priority) { | 64 | 25 | return lhs.priority > rhs.priority; | 65 | 25 | } | 66 | 11 | return lhs.generation < rhs.generation; | 67 | 36 | } | 68 | 36 | } |
|
69 | | }; |
70 | | |
71 | | template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering = |
72 | | IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> |
73 | | class IndexedPriorityQueue { |
74 | | public: |
75 | | struct Prioritized { |
76 | | T value; |
77 | | long priority; |
78 | | }; |
79 | | |
80 | 74 | IndexedPriorityQueue() = default; _ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EEC2Ev Line | Count | Source | 80 | 58 | IndexedPriorityQueue() = default; |
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EEC2Ev Line | Count | Source | 80 | 8 | IndexedPriorityQueue() = default; |
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EEC2Ev Line | Count | Source | 80 | 8 | IndexedPriorityQueue() = default; |
|
81 | | |
82 | 201 | bool add_or_update(T element, long priority) { |
83 | 201 | auto it = _index.find(element); |
84 | 201 | if (it != _index.end()) { |
85 | 39 | if (it->second.priority == priority) { |
86 | 0 | return false; |
87 | 0 | } |
88 | 39 | _queue.erase(it->second); |
89 | 39 | } |
90 | 201 | IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++}; |
91 | 201 | _queue.insert(std::move(entry)); |
92 | 201 | _index.insert({entry.value, std::move(entry)}); |
93 | 201 | return true; |
94 | 201 | } _ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EE13add_or_updateES2_l Line | Count | Source | 82 | 81 | bool add_or_update(T element, long priority) { | 83 | 81 | auto it = _index.find(element); | 84 | 81 | if (it != _index.end()) { | 85 | 14 | if (it->second.priority == priority) { | 86 | 0 | return false; | 87 | 0 | } | 88 | 14 | _queue.erase(it->second); | 89 | 14 | } | 90 | 81 | IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++}; | 91 | 81 | _queue.insert(std::move(entry)); | 92 | 81 | _index.insert({entry.value, std::move(entry)}); | 93 | 81 | return true; | 94 | 81 | } |
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE13add_or_updateES2_l Line | Count | Source | 82 | 81 | bool add_or_update(T element, long priority) { | 83 | 81 | auto it = _index.find(element); | 84 | 81 | if (it != _index.end()) { | 85 | 24 | if (it->second.priority == priority) { | 86 | 0 | return false; | 87 | 0 | } | 88 | 24 | _queue.erase(it->second); | 89 | 24 | } | 90 | 81 | IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++}; | 91 | 81 | _queue.insert(std::move(entry)); | 92 | 81 | _index.insert({entry.value, std::move(entry)}); | 93 | 81 | return true; | 94 | 81 | } |
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE13add_or_updateEil Line | Count | Source | 82 | 39 | bool add_or_update(T element, long priority) { | 83 | 39 | auto it = _index.find(element); | 84 | 39 | if (it != _index.end()) { | 85 | 1 | if (it->second.priority == priority) { | 86 | 0 | return false; | 87 | 0 | } | 88 | 1 | _queue.erase(it->second); | 89 | 1 | } | 90 | 39 | IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++}; | 91 | 39 | _queue.insert(std::move(entry)); | 92 | 39 | _index.insert({entry.value, std::move(entry)}); | 93 | 39 | return true; | 94 | 39 | } |
|
95 | | |
96 | | bool contains(const T& element) const { return _index.find(element) != _index.end(); } |
97 | | |
98 | | bool remove(const T& element) { |
99 | | auto it = _index.find(element); |
100 | | if (it != _index.end()) { |
101 | | _queue.erase(it->second); |
102 | | _index.erase(it); |
103 | | return true; |
104 | | } |
105 | | return false; |
106 | | } |
107 | | |
108 | 76 | std::optional<T> poll() { |
109 | 76 | if (_queue.empty()) { |
110 | 12 | return std::nullopt; |
111 | 12 | } |
112 | 64 | T value = _queue.begin()->value; |
113 | 64 | _index.erase(value); |
114 | 64 | _queue.erase(_queue.begin()); |
115 | 64 | return value; |
116 | 76 | } _ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE1EE4pollEv Line | Count | Source | 108 | 49 | std::optional<T> poll() { | 109 | 49 | if (_queue.empty()) { | 110 | 2 | return std::nullopt; | 111 | 2 | } | 112 | 47 | T value = _queue.begin()->value; | 113 | 47 | _index.erase(value); | 114 | 47 | _queue.erase(_queue.begin()); | 115 | 47 | return value; | 116 | 49 | } |
_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE4pollEv Line | Count | Source | 108 | 27 | std::optional<T> poll() { | 109 | 27 | if (_queue.empty()) { | 110 | 10 | return std::nullopt; | 111 | 10 | } | 112 | 17 | T value = _queue.begin()->value; | 113 | 17 | _index.erase(value); | 114 | 17 | _queue.erase(_queue.begin()); | 115 | 17 | return value; | 116 | 27 | } |
|
117 | | |
118 | | std::optional<Prioritized> peek() const { |
119 | | if (_queue.empty()) { |
120 | | return std::nullopt; |
121 | | } |
122 | | const IndexedPriorityQueueEntry<T>& entry = *_queue.begin(); |
123 | | return Prioritized {entry.value, entry.priority}; |
124 | | } |
125 | | |
126 | | int size() const { return _queue.size(); } |
127 | | |
128 | 47 | bool is_empty() const { return _queue.empty(); } |
129 | | |
130 | | class Iterator { |
131 | | public: |
132 | | using iterator_category = std::forward_iterator_tag; |
133 | | using value_type = T; |
134 | | using difference_type = std::ptrdiff_t; |
135 | | using pointer = T*; |
136 | | using reference = T&; |
137 | | |
138 | | Iterator() : _iter() {} |
139 | | explicit Iterator( |
140 | | typename std::set< |
141 | | IndexedPriorityQueueEntry<T>, |
142 | | IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator iter) |
143 | 162 | : _iter(iter) {}_ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorC2ESt23_Rb_tree_const_iteratorINS_25IndexedPriorityQueueEntryIiEEE Line | Count | Source | 143 | 120 | : _iter(iter) {} |
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorC2ESt23_Rb_tree_const_iteratorINS_25IndexedPriorityQueueEntryIS2_EEE Line | Count | Source | 143 | 42 | : _iter(iter) {} |
|
144 | | |
145 | 140 | const T& operator*() const { return _iter->value; }_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratordeEv Line | Count | Source | 145 | 48 | const T& operator*() const { return _iter->value; } |
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratordeEv Line | Count | Source | 145 | 92 | const T& operator*() const { return _iter->value; } |
|
146 | | |
147 | | const T* operator->() const { return &(_iter->value); } |
148 | | |
149 | 119 | Iterator& operator++() { |
150 | 119 | ++_iter; |
151 | 119 | return *this; |
152 | 119 | } _ZN5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorppEv Line | Count | Source | 149 | 48 | Iterator& operator++() { | 150 | 48 | ++_iter; | 151 | 48 | return *this; | 152 | 48 | } |
_ZN5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorppEv Line | Count | Source | 149 | 71 | Iterator& operator++() { | 150 | 71 | ++_iter; | 151 | 71 | return *this; | 152 | 71 | } |
|
153 | | |
154 | | Iterator operator++(int) { |
155 | | Iterator tmp = *this; |
156 | | ++(*this); |
157 | | return tmp; |
158 | | } |
159 | | |
160 | 200 | bool operator==(const Iterator& other) const { return _iter == other._iter; }_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratoreqERKS3_ Line | Count | Source | 160 | 108 | bool operator==(const Iterator& other) const { return _iter == other._iter; } |
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratoreqERKS5_ Line | Count | Source | 160 | 92 | bool operator==(const Iterator& other) const { return _iter == other._iter; } |
|
161 | | |
162 | 200 | bool operator!=(const Iterator& other) const { return !(*this == other); }_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE8IteratorneERKS3_ Line | Count | Source | 162 | 108 | bool operator!=(const Iterator& other) const { return !(*this == other); } |
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE8IteratorneERKS5_ Line | Count | Source | 162 | 92 | bool operator!=(const Iterator& other) const { return !(*this == other); } |
|
163 | | |
164 | | private: |
165 | | typename std::set<IndexedPriorityQueueEntry<T>, |
166 | | IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator |
167 | | _iter; |
168 | | }; |
169 | | |
170 | 81 | Iterator begin() const { return Iterator(_queue.begin()); }_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE5beginEv Line | Count | Source | 170 | 60 | Iterator begin() const { return Iterator(_queue.begin()); } |
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE5beginEv Line | Count | Source | 170 | 21 | Iterator begin() const { return Iterator(_queue.begin()); } |
|
171 | | |
172 | 81 | Iterator end() const { return Iterator(_queue.end()); }_ZNK5doris20IndexedPriorityQueueIiLNS_36IndexedPriorityQueuePriorityOrderingE1EE3endEv Line | Count | Source | 172 | 60 | Iterator end() const { return Iterator(_queue.end()); } |
_ZNK5doris20IndexedPriorityQueueINS_25SkewedPartitionRebalancer10TaskBucketELNS_36IndexedPriorityQueuePriorityOrderingE0EE3endEv Line | Count | Source | 172 | 21 | Iterator end() const { return Iterator(_queue.end()); } |
|
173 | | |
174 | | private: |
175 | | std::map<T, IndexedPriorityQueueEntry<T>> _index; |
176 | | std::set<IndexedPriorityQueueEntry<T>, IndexedPriorityQueueComparator<T, priority_ordering>> |
177 | | _queue; |
178 | | |
179 | | long generation = 0; |
180 | | }; |
181 | | |
182 | | } // namespace doris |