be/src/util/blocking_priority_queue.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-priority-queue.hpp |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <unistd.h> |
24 | | |
25 | | #include <cassert> |
26 | | #include <condition_variable> |
27 | | #include <cstdint> |
28 | | #include <mutex> |
29 | | #include <queue> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "util/stopwatch.hpp" |
33 | | |
34 | | namespace doris { |
35 | | #include "common/compile_check_begin.h" |
36 | | // Fixed capacity FIFO queue, where both blocking_get and blocking_put operations block |
37 | | // if the queue is empty or full, respectively. |
38 | | template <typename T> |
39 | | class BlockingPriorityQueue { |
40 | | public: |
41 | | BlockingPriorityQueue(uint32_t max_elements) |
42 | 119 | : _shutdown(false), |
43 | 119 | _max_element(max_elements), |
44 | 119 | _upgrade_counter(0), |
45 | 119 | _total_get_wait_time(0), |
46 | 119 | _total_put_wait_time(0), |
47 | 119 | _get_waiting(0), |
48 | 119 | _put_waiting(0) {} |
49 | | |
50 | | // Get an element from the queue, waiting indefinitely (or until timeout) for one to become available. |
51 | | // Returns false if we were shut down prior to getting the element, and there |
52 | | // are no more elements available. |
53 | | // -- timeout_ms: 0 means wait indefinitely |
54 | 123k | bool blocking_get(T* out, uint32_t timeout_ms = 0) { |
55 | 123k | MonotonicStopWatch timer; |
56 | 123k | timer.start(); |
57 | 123k | std::unique_lock unique_lock(_lock); |
58 | 123k | bool wait_successful = false; |
59 | 123k | if (timeout_ms > 0) { |
60 | 0 | while (!(_shutdown || !_queue.empty())) { |
61 | 0 | ++_get_waiting; |
62 | 0 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) == |
63 | 0 | std::cv_status::timeout) { |
64 | | // timeout |
65 | 0 | wait_successful = _shutdown || !_queue.empty(); |
66 | 0 | break; |
67 | 0 | } |
68 | 0 | } |
69 | 123k | } else { |
70 | 123k | while (!(_shutdown || !_queue.empty())) { |
71 | 308 | ++_get_waiting; |
72 | 308 | _get_cv.wait(unique_lock); |
73 | 308 | } |
74 | 123k | wait_successful = true; |
75 | 123k | } |
76 | 123k | _total_get_wait_time += timer.elapsed_time(); |
77 | 123k | if (wait_successful) { |
78 | 123k | if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { |
79 | 238 | std::priority_queue<T> tmp_queue; |
80 | 476 | while (!_queue.empty()) { |
81 | 238 | T v = _queue.top(); |
82 | 238 | _queue.pop(); |
83 | 238 | ++v; |
84 | 238 | tmp_queue.push(v); |
85 | 238 | } |
86 | 238 | swap(_queue, tmp_queue); |
87 | 238 | _upgrade_counter = 0; |
88 | 238 | } |
89 | 123k | if (!_queue.empty()) { |
90 | 122k | *out = _queue.top(); |
91 | 122k | _queue.pop(); |
92 | 122k | ++_upgrade_counter; |
93 | 122k | if (_put_waiting > 0) { |
94 | 0 | --_put_waiting; |
95 | 0 | unique_lock.unlock(); |
96 | 0 | _put_cv.notify_one(); |
97 | 0 | } |
98 | 122k | return true; |
99 | 122k | } else { |
100 | 202 | assert(_shutdown); |
101 | 202 | return false; |
102 | 202 | } |
103 | 123k | } else { |
104 | | //time out |
105 | 35 | assert(!_shutdown); |
106 | 0 | return false; |
107 | 35 | } |
108 | 123k | } |
109 | | |
110 | | bool non_blocking_get(T* out) { |
111 | | MonotonicStopWatch timer; |
112 | | timer.start(); |
113 | | std::unique_lock unique_lock(_lock); |
114 | | |
115 | | if (!_queue.empty()) { |
116 | | // 定期提高队列中残留的任务优先级 |
117 | | // 保证优先级较低的大查询不至于完全饿死 |
118 | | if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { |
119 | | std::priority_queue<T> tmp_queue; |
120 | | while (!_queue.empty()) { |
121 | | T v = _queue.top(); |
122 | | _queue.pop(); |
123 | | ++v; |
124 | | tmp_queue.push(v); |
125 | | } |
126 | | swap(_queue, tmp_queue); |
127 | | _upgrade_counter = 0; |
128 | | } |
129 | | *out = _queue.top(); |
130 | | _queue.pop(); |
131 | | ++_upgrade_counter; |
132 | | _total_get_wait_time += timer.elapsed_time(); |
133 | | if (_put_waiting > 0) { |
134 | | --_put_waiting; |
135 | | unique_lock.unlock(); |
136 | | _put_cv.notify_one(); |
137 | | } |
138 | | return true; |
139 | | } |
140 | | |
141 | | return false; |
142 | | } |
143 | | |
144 | | // Puts an element into the queue, waiting indefinitely until there is space. |
145 | | // If the queue is shut down, returns false. |
146 | 122k | bool blocking_put(const T& val) { |
147 | 122k | MonotonicStopWatch timer; |
148 | 122k | timer.start(); |
149 | 122k | std::unique_lock unique_lock(_lock); |
150 | 122k | while (!(_shutdown || _queue.size() < _max_element)) { |
151 | 0 | ++_put_waiting; |
152 | 0 | _put_cv.wait(unique_lock); |
153 | 0 | } |
154 | 122k | _total_put_wait_time += timer.elapsed_time(); |
155 | | |
156 | 122k | if (_shutdown) { |
157 | 0 | return false; |
158 | 0 | } |
159 | | |
160 | 122k | _queue.push(val); |
161 | 122k | if (_get_waiting > 0) { |
162 | 71 | --_get_waiting; |
163 | 71 | unique_lock.unlock(); |
164 | 71 | _get_cv.notify_one(); |
165 | 71 | } |
166 | 122k | return true; |
167 | 122k | } |
168 | | |
169 | | // Return false if queue full or has been shutdown. |
170 | 0 | bool try_put(const T& val) { |
171 | 0 | std::unique_lock unique_lock(_lock); |
172 | 0 | if (_queue.size() < _max_element && !_shutdown) { |
173 | 0 | _queue.push(val); |
174 | 0 | if (_get_waiting > 0) { |
175 | 0 | --_get_waiting; |
176 | 0 | unique_lock.unlock(); |
177 | 0 | _get_cv.notify_one(); |
178 | 0 | } |
179 | 0 | return true; |
180 | 0 | } |
181 | 0 | return false; |
182 | 0 | } |
183 | | |
184 | | // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. |
185 | 185 | void shutdown() { |
186 | 185 | { |
187 | 185 | std::lock_guard l(_lock); |
188 | 185 | _shutdown = true; |
189 | 185 | } |
190 | 185 | _get_cv.notify_all(); |
191 | 185 | _put_cv.notify_all(); |
192 | 185 | } |
193 | | |
194 | 123k | uint32_t get_size() const { |
195 | 123k | std::lock_guard l(_lock); |
196 | 123k | return static_cast<uint32_t>(_queue.size()); |
197 | 123k | } |
198 | | |
199 | 438 | uint32_t get_capacity() const { return _max_element; } |
200 | | |
201 | | // Returns the total amount of time threads have blocked in blocking_get. |
202 | 439 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
203 | | |
204 | | // Returns the total amount of time threads have blocked in blocking_put. |
205 | 439 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
206 | | |
207 | | private: |
208 | | bool _shutdown; |
209 | | const int _max_element; |
210 | | std::condition_variable _get_cv; // 'get' callers wait on this |
211 | | std::condition_variable _put_cv; // 'put' callers wait on this |
212 | | // _lock guards access to _queue, total_get_wait_time, and total_put_wait_time |
213 | | mutable std::mutex _lock; |
214 | | std::priority_queue<T> _queue; |
215 | | int _upgrade_counter; |
216 | | std::atomic<uint64_t> _total_get_wait_time; |
217 | | std::atomic<uint64_t> _total_put_wait_time; |
218 | | size_t _get_waiting; |
219 | | size_t _put_waiting; |
220 | | }; |
221 | | #include "common/compile_check_end.h" |
222 | | } // namespace doris |