be/src/util/blocking_priority_queue.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-priority-queue.hpp |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <unistd.h> |
24 | | |
25 | | #include <cassert> |
26 | | #include <condition_variable> |
27 | | #include <cstdint> |
28 | | #include <mutex> |
29 | | #include <queue> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "util/stopwatch.hpp" |
33 | | |
34 | | namespace doris { |
35 | | // Fixed capacity FIFO queue, where both blocking_get and blocking_put operations block |
36 | | // if the queue is empty or full, respectively. |
37 | | template <typename T> |
38 | | class BlockingPriorityQueue { |
39 | | public: |
40 | | BlockingPriorityQueue(uint32_t max_elements) |
41 | 119 | : _shutdown(false), |
42 | 119 | _max_element(max_elements), |
43 | 119 | _upgrade_counter(0), |
44 | 119 | _total_get_wait_time(0), |
45 | 119 | _total_put_wait_time(0), |
46 | 119 | _get_waiting(0), |
47 | 119 | _put_waiting(0) {} |
48 | | |
49 | | // Get an element from the queue, waiting indefinitely (or until timeout) for one to become available. |
50 | | // Returns false if we were shut down prior to getting the element, and there |
51 | | // are no more elements available. |
52 | | // -- timeout_ms: 0 means wait indefinitely |
53 | 123k | bool blocking_get(T* out, uint32_t timeout_ms = 0) { |
54 | 123k | MonotonicStopWatch timer; |
55 | 123k | timer.start(); |
56 | 123k | std::unique_lock unique_lock(_lock); |
57 | 123k | bool wait_successful = false; |
58 | 123k | if (timeout_ms > 0) { |
59 | 0 | while (!(_shutdown || !_queue.empty())) { |
60 | 0 | ++_get_waiting; |
61 | 0 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) == |
62 | 0 | std::cv_status::timeout) { |
63 | | // timeout |
64 | 0 | wait_successful = _shutdown || !_queue.empty(); |
65 | 0 | break; |
66 | 0 | } |
67 | 0 | } |
68 | 123k | } else { |
69 | 123k | while (!(_shutdown || !_queue.empty())) { |
70 | 306 | ++_get_waiting; |
71 | 306 | _get_cv.wait(unique_lock); |
72 | 306 | } |
73 | 123k | wait_successful = true; |
74 | 123k | } |
75 | 123k | _total_get_wait_time += timer.elapsed_time(); |
76 | 123k | if (wait_successful) { |
77 | 123k | if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { |
78 | 240 | std::priority_queue<T> tmp_queue; |
79 | 480 | while (!_queue.empty()) { |
80 | 240 | T v = _queue.top(); |
81 | 240 | _queue.pop(); |
82 | 240 | ++v; |
83 | 240 | tmp_queue.push(v); |
84 | 240 | } |
85 | 240 | swap(_queue, tmp_queue); |
86 | 240 | _upgrade_counter = 0; |
87 | 240 | } |
88 | 123k | if (!_queue.empty()) { |
89 | 123k | *out = _queue.top(); |
90 | 123k | _queue.pop(); |
91 | 123k | ++_upgrade_counter; |
92 | 123k | if (_put_waiting > 0) { |
93 | 0 | --_put_waiting; |
94 | 0 | unique_lock.unlock(); |
95 | 0 | _put_cv.notify_one(); |
96 | 0 | } |
97 | 123k | return true; |
98 | 123k | } else { |
99 | 202 | assert(_shutdown); |
100 | 202 | return false; |
101 | 202 | } |
102 | 123k | } else { |
103 | | //time out |
104 | 35 | assert(!_shutdown); |
105 | 0 | return false; |
106 | 35 | } |
107 | 123k | } |
108 | | |
109 | | bool non_blocking_get(T* out) { |
110 | | MonotonicStopWatch timer; |
111 | | timer.start(); |
112 | | std::unique_lock unique_lock(_lock); |
113 | | |
114 | | if (!_queue.empty()) { |
115 | | // 定期提高队列中残留的任务优先级 |
116 | | // 保证优先级较低的大查询不至于完全饿死 |
117 | | if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { |
118 | | std::priority_queue<T> tmp_queue; |
119 | | while (!_queue.empty()) { |
120 | | T v = _queue.top(); |
121 | | _queue.pop(); |
122 | | ++v; |
123 | | tmp_queue.push(v); |
124 | | } |
125 | | swap(_queue, tmp_queue); |
126 | | _upgrade_counter = 0; |
127 | | } |
128 | | *out = _queue.top(); |
129 | | _queue.pop(); |
130 | | ++_upgrade_counter; |
131 | | _total_get_wait_time += timer.elapsed_time(); |
132 | | if (_put_waiting > 0) { |
133 | | --_put_waiting; |
134 | | unique_lock.unlock(); |
135 | | _put_cv.notify_one(); |
136 | | } |
137 | | return true; |
138 | | } |
139 | | |
140 | | return false; |
141 | | } |
142 | | |
143 | | // Puts an element into the queue, waiting indefinitely until there is space. |
144 | | // If the queue is shut down, returns false. |
145 | 123k | bool blocking_put(const T& val) { |
146 | 123k | MonotonicStopWatch timer; |
147 | 123k | timer.start(); |
148 | 123k | std::unique_lock unique_lock(_lock); |
149 | 123k | while (!(_shutdown || _queue.size() < _max_element)) { |
150 | 0 | ++_put_waiting; |
151 | 0 | _put_cv.wait(unique_lock); |
152 | 0 | } |
153 | 123k | _total_put_wait_time += timer.elapsed_time(); |
154 | | |
155 | 123k | if (_shutdown) { |
156 | 0 | return false; |
157 | 0 | } |
158 | | |
159 | 123k | _queue.push(val); |
160 | 123k | if (_get_waiting > 0) { |
161 | 69 | --_get_waiting; |
162 | 69 | unique_lock.unlock(); |
163 | 69 | _get_cv.notify_one(); |
164 | 69 | } |
165 | 123k | return true; |
166 | 123k | } |
167 | | |
168 | | // Return false if queue full or has been shutdown. |
169 | 0 | bool try_put(const T& val) { |
170 | 0 | std::unique_lock unique_lock(_lock); |
171 | 0 | if (_queue.size() < _max_element && !_shutdown) { |
172 | 0 | _queue.push(val); |
173 | 0 | if (_get_waiting > 0) { |
174 | 0 | --_get_waiting; |
175 | 0 | unique_lock.unlock(); |
176 | 0 | _get_cv.notify_one(); |
177 | 0 | } |
178 | 0 | return true; |
179 | 0 | } |
180 | 0 | return false; |
181 | 0 | } |
182 | | |
183 | | // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. |
184 | 185 | void shutdown() { |
185 | 185 | { |
186 | 185 | std::lock_guard l(_lock); |
187 | 185 | _shutdown = true; |
188 | 185 | } |
189 | 185 | _get_cv.notify_all(); |
190 | 185 | _put_cv.notify_all(); |
191 | 185 | } |
192 | | |
193 | 123k | uint32_t get_size() const { |
194 | 123k | std::lock_guard l(_lock); |
195 | 123k | return static_cast<uint32_t>(_queue.size()); |
196 | 123k | } |
197 | | |
198 | 439 | uint32_t get_capacity() const { return _max_element; } |
199 | | |
200 | | // Returns the total amount of time threads have blocked in blocking_get. |
201 | 439 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
202 | | |
203 | | // Returns the total amount of time threads have blocked in blocking_put. |
204 | 439 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
205 | | |
206 | | private: |
207 | | bool _shutdown; |
208 | | const int _max_element; |
209 | | std::condition_variable _get_cv; // 'get' callers wait on this |
210 | | std::condition_variable _put_cv; // 'put' callers wait on this |
211 | | // _lock guards access to _queue, total_get_wait_time, and total_put_wait_time |
212 | | mutable std::mutex _lock; |
213 | | std::priority_queue<T> _queue; |
214 | | int _upgrade_counter; |
215 | | std::atomic<uint64_t> _total_get_wait_time; |
216 | | std::atomic<uint64_t> _total_put_wait_time; |
217 | | size_t _get_waiting; |
218 | | size_t _put_waiting; |
219 | | }; |
220 | | } // namespace doris |