be/src/util/blocking_queue.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-queue.hpp |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <unistd.h> |
24 | | |
25 | | #include <atomic> |
26 | | #include <condition_variable> |
27 | | #include <list> |
28 | | #include <mutex> |
29 | | |
30 | | #include "common/logging.h" |
31 | | #include "util/stopwatch.hpp" |
32 | | |
33 | | namespace doris { |
34 | | // Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block |
35 | | // if the queue is empty or full, respectively. |
36 | | template <typename T> |
37 | | class BlockingQueue { |
38 | | public: |
39 | | BlockingQueue(uint32_t max_elements) |
40 | 129 | : _shutdown(false), |
41 | 129 | _max_elements(max_elements), |
42 | 129 | _total_get_wait_time(0), |
43 | 129 | _total_put_wait_time(0), |
44 | 129 | _get_waiting(0), |
45 | 129 | _put_waiting(0) {}_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEEC2Ej Line | Count | Source | 40 | 9 | : _shutdown(false), | 41 | 9 | _max_elements(max_elements), | 42 | 9 | _total_get_wait_time(0), | 43 | 9 | _total_put_wait_time(0), | 44 | 9 | _get_waiting(0), | 45 | 9 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEEC2Ej Line | Count | Source | 40 | 49 | : _shutdown(false), | 41 | 49 | _max_elements(max_elements), | 42 | 49 | _total_get_wait_time(0), | 43 | 49 | _total_put_wait_time(0), | 44 | 49 | _get_waiting(0), | 45 | 49 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEEC2Ej Line | Count | Source | 40 | 71 | : _shutdown(false), | 41 | 71 | _max_elements(max_elements), | 42 | 71 | _total_get_wait_time(0), | 43 | 71 | _total_put_wait_time(0), | 44 | 71 | _get_waiting(0), | 45 | 71 | _put_waiting(0) {} |
|
46 | | |
47 | | // Get an element from the queue, waiting indefinitely for one to become available. |
48 | | // Returns false if we were shut down prior to getting the element, and there |
49 | | // are no more elements available. |
50 | 654k | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_getEPS4_ Line | Count | Source | 50 | 7 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_getEPS3_ Line | Count | Source | 50 | 654k | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_getEPS3_ Line | Count | Source | 50 | 80 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
|
51 | | |
52 | | // Blocking_get and blocking_put may cause deadlock, |
53 | | // but we still don't find root cause, |
54 | | // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily. |
55 | 654k | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { |
56 | 654k | MonotonicStopWatch timer; |
57 | 654k | timer.start(); |
58 | 654k | std::unique_lock<std::mutex> unique_lock(_lock); |
59 | 1.31M | while (!(_shutdown || !_list.empty())) { |
60 | 656k | ++_get_waiting; |
61 | 656k | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == |
62 | 656k | std::cv_status::timeout) { |
63 | 514 | _get_waiting--; |
64 | 514 | } |
65 | 656k | } |
66 | 654k | _total_get_wait_time += timer.elapsed_time(); |
67 | | |
68 | 654k | if (!_list.empty()) { |
69 | 651k | *out = _list.front(); |
70 | 651k | _list.pop_front(); |
71 | 651k | if (_put_waiting > 0) { |
72 | 0 | --_put_waiting; |
73 | 0 | unique_lock.unlock(); |
74 | 0 | _put_cv.notify_one(); |
75 | 0 | } |
76 | 651k | return true; |
77 | 651k | } else { |
78 | 3.87k | assert(_shutdown); |
79 | 2.55k | return false; |
80 | 3.87k | } |
81 | 654k | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_getEPS4_l Line | Count | Source | 55 | 7 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 56 | 7 | MonotonicStopWatch timer; | 57 | 7 | timer.start(); | 58 | 7 | std::unique_lock<std::mutex> unique_lock(_lock); | 59 | 10 | while (!(_shutdown || !_list.empty())) { | 60 | 3 | ++_get_waiting; | 61 | 3 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 62 | 3 | std::cv_status::timeout) { | 63 | 0 | _get_waiting--; | 64 | 0 | } | 65 | 3 | } | 66 | 7 | _total_get_wait_time += timer.elapsed_time(); | 67 | | | 68 | 7 | if (!_list.empty()) { | 69 | 7 | *out = _list.front(); | 70 | 7 | _list.pop_front(); | 71 | 7 | if (_put_waiting > 0) { | 72 | 0 | --_put_waiting; | 73 | 0 | unique_lock.unlock(); | 74 | 0 | _put_cv.notify_one(); | 75 | 0 | } | 76 | 7 | return true; | 77 | 7 | } else { | 78 | 0 | assert(_shutdown); | 79 | 0 | return false; | 80 | 0 | } | 81 | 7 | } |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_getEPS3_l Line | Count | Source | 55 | 654k | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 56 | 654k | MonotonicStopWatch timer; | 57 | 654k | timer.start(); | 58 | 654k | std::unique_lock<std::mutex> unique_lock(_lock); | 59 | 1.31M | while (!(_shutdown || !_list.empty())) { | 60 | 656k | ++_get_waiting; | 61 | 656k | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 62 | 656k | std::cv_status::timeout) { | 63 | 509 | _get_waiting--; | 64 | 509 | } | 65 | 656k | } | 66 | 654k | _total_get_wait_time += timer.elapsed_time(); | 67 | | | 68 | 654k | if (!_list.empty()) { | 69 | 650k | *out = _list.front(); | 70 | 650k | _list.pop_front(); | 71 | 650k | if (_put_waiting > 0) { | 72 | 0 | --_put_waiting; | 73 | 0 | unique_lock.unlock(); | 74 | 0 | _put_cv.notify_one(); | 75 | 0 | } | 76 | 650k | return true; | 77 | 650k | } else { | 78 | 3.73k | assert(_shutdown); | 79 | 2.41k | return false; | 80 | 3.73k | } | 81 | 654k | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_getEPS3_l Line | Count | Source | 55 | 876 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 56 | 876 | MonotonicStopWatch timer; | 57 | 876 | timer.start(); | 58 | 876 | std::unique_lock<std::mutex> unique_lock(_lock); | 59 | 1.07k | while (!(_shutdown || !_list.empty())) { | 60 | 194 | ++_get_waiting; | 61 | 194 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 62 | 194 | std::cv_status::timeout) { | 63 | 5 | _get_waiting--; | 64 | 5 | } | 65 | 194 | } | 66 | 876 | _total_get_wait_time += timer.elapsed_time(); | 67 | | | 68 | 876 | if (!_list.empty()) { | 69 | 735 | *out = _list.front(); | 70 | 735 | _list.pop_front(); | 71 | 735 | if (_put_waiting > 0) { | 72 | 0 | --_put_waiting; | 73 | 0 | unique_lock.unlock(); | 74 | 0 | _put_cv.notify_one(); | 75 | 0 | } | 76 | 735 | return true; | 77 | 735 | } else { | 78 | 141 | assert(_shutdown); | 79 | 141 | return false; | 80 | 141 | } | 81 | 876 | } |
|
82 | | |
83 | | // Puts an element into the queue, waiting indefinitely until there is space. |
84 | | // If the queue is shut down, returns false. |
85 | 12 | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_putERKS4_ Line | Count | Source | 85 | 12 | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); } |
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_putERKS3_ |
86 | | |
87 | | // Blocking_get and blocking_put may cause deadlock, |
88 | | // but we still don't find root cause, |
89 | | // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily. |
90 | 747 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { |
91 | 747 | MonotonicStopWatch timer; |
92 | 747 | timer.start(); |
93 | 747 | std::unique_lock<std::mutex> unique_lock(_lock); |
94 | 747 | while (!(_shutdown || _list.size() < _max_elements)) { |
95 | 0 | ++_put_waiting; |
96 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == |
97 | 0 | std::cv_status::timeout) { |
98 | 0 | _put_waiting--; |
99 | 0 | } |
100 | 0 | } |
101 | 747 | _total_put_wait_time += timer.elapsed_time(); |
102 | | |
103 | 747 | if (_shutdown) { |
104 | 0 | return false; |
105 | 0 | } |
106 | | |
107 | 747 | _list.push_back(val); |
108 | 747 | if (_get_waiting > 0) { |
109 | 134 | --_get_waiting; |
110 | 134 | unique_lock.unlock(); |
111 | 134 | _get_cv.notify_one(); |
112 | 134 | } |
113 | 747 | return true; |
114 | 747 | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_putERKS4_l Line | Count | Source | 90 | 12 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { | 91 | 12 | MonotonicStopWatch timer; | 92 | 12 | timer.start(); | 93 | 12 | std::unique_lock<std::mutex> unique_lock(_lock); | 94 | 12 | while (!(_shutdown || _list.size() < _max_elements)) { | 95 | 0 | ++_put_waiting; | 96 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 97 | 0 | std::cv_status::timeout) { | 98 | 0 | _put_waiting--; | 99 | 0 | } | 100 | 0 | } | 101 | 12 | _total_put_wait_time += timer.elapsed_time(); | 102 | | | 103 | 12 | if (_shutdown) { | 104 | 0 | return false; | 105 | 0 | } | 106 | | | 107 | 12 | _list.push_back(val); | 108 | 12 | if (_get_waiting > 0) { | 109 | 3 | --_get_waiting; | 110 | 3 | unique_lock.unlock(); | 111 | 3 | _get_cv.notify_one(); | 112 | 3 | } | 113 | 12 | return true; | 114 | 12 | } |
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_putERKS3_l _ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_putERKS3_l Line | Count | Source | 90 | 735 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { | 91 | 735 | MonotonicStopWatch timer; | 92 | 735 | timer.start(); | 93 | 735 | std::unique_lock<std::mutex> unique_lock(_lock); | 94 | 735 | while (!(_shutdown || _list.size() < _max_elements)) { | 95 | 0 | ++_put_waiting; | 96 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 97 | 0 | std::cv_status::timeout) { | 98 | 0 | _put_waiting--; | 99 | 0 | } | 100 | 0 | } | 101 | 735 | _total_put_wait_time += timer.elapsed_time(); | 102 | | | 103 | 735 | if (_shutdown) { | 104 | 0 | return false; | 105 | 0 | } | 106 | | | 107 | 735 | _list.push_back(val); | 108 | 735 | if (_get_waiting > 0) { | 109 | 131 | --_get_waiting; | 110 | 131 | unique_lock.unlock(); | 111 | 131 | _get_cv.notify_one(); | 112 | 131 | } | 113 | 735 | return true; | 114 | 735 | } |
|
115 | | |
116 | | // Return false if queue full or has been shutdown. |
117 | 649k | bool try_put(const T& val) { |
118 | 649k | if (_shutdown || _list.size() >= _max_elements) { |
119 | 0 | return false; |
120 | 0 | } |
121 | | |
122 | 649k | MonotonicStopWatch timer; |
123 | 649k | timer.start(); |
124 | 649k | std::unique_lock<std::mutex> unique_lock(_lock); |
125 | 649k | _total_put_wait_time += timer.elapsed_time(); |
126 | | |
127 | 650k | if (_shutdown || _list.size() >= _max_elements) { |
128 | 0 | return false; |
129 | 0 | } |
130 | | |
131 | 649k | _list.push_back(val); |
132 | 650k | if (_get_waiting > 0) { |
133 | 650k | --_get_waiting; |
134 | 650k | unique_lock.unlock(); |
135 | 650k | _get_cv.notify_one(); |
136 | 650k | } |
137 | 649k | return true; |
138 | 649k | } |
139 | | |
140 | | // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut. |
141 | 255 | void shutdown() { |
142 | 255 | { |
143 | 255 | std::lock_guard<std::mutex> guard(_lock); |
144 | 255 | _shutdown = true; |
145 | 255 | } |
146 | | |
147 | 255 | _get_cv.notify_all(); |
148 | 255 | _put_cv.notify_all(); |
149 | 255 | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8shutdownEv Line | Count | Source | 141 | 5 | void shutdown() { | 142 | 5 | { | 143 | 5 | std::lock_guard<std::mutex> guard(_lock); | 144 | 5 | _shutdown = true; | 145 | 5 | } | 146 | | | 147 | 5 | _get_cv.notify_all(); | 148 | 5 | _put_cv.notify_all(); | 149 | 5 | } |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8shutdownEv Line | Count | Source | 141 | 37 | void shutdown() { | 142 | 37 | { | 143 | 37 | std::lock_guard<std::mutex> guard(_lock); | 144 | 37 | _shutdown = true; | 145 | 37 | } | 146 | | | 147 | 37 | _get_cv.notify_all(); | 148 | 37 | _put_cv.notify_all(); | 149 | 37 | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE8shutdownEv Line | Count | Source | 141 | 213 | void shutdown() { | 142 | 213 | { | 143 | 213 | std::lock_guard<std::mutex> guard(_lock); | 144 | 213 | _shutdown = true; | 145 | 213 | } | 146 | | | 147 | 213 | _get_cv.notify_all(); | 148 | 213 | _put_cv.notify_all(); | 149 | 213 | } |
|
150 | | |
151 | 661k | uint32_t get_size() const { |
152 | 661k | std::lock_guard<std::mutex> l(_lock); |
153 | 661k | return static_cast<uint32_t>(_list.size()); |
154 | 661k | } _ZNK5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8get_sizeEv Line | Count | Source | 151 | 8 | uint32_t get_size() const { | 152 | 8 | std::lock_guard<std::mutex> l(_lock); | 153 | 8 | return static_cast<uint32_t>(_list.size()); | 154 | 8 | } |
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8get_sizeEv Line | Count | Source | 151 | 660k | uint32_t get_size() const { | 152 | 660k | std::lock_guard<std::mutex> l(_lock); | 153 | 660k | return static_cast<uint32_t>(_list.size()); | 154 | 660k | } |
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE8get_sizeEv Line | Count | Source | 151 | 71 | uint32_t get_size() const { | 152 | 71 | std::lock_guard<std::mutex> l(_lock); | 153 | 71 | return static_cast<uint32_t>(_list.size()); | 154 | 71 | } |
|
155 | | |
156 | 7.90k | uint32_t get_capacity() const { return _max_elements; } |
157 | | |
158 | | // Returns the total amount of time threads have blocked in BlockingGet. |
159 | 7.97k | uint64_t total_get_wait_time() const { return _total_get_wait_time; }_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_get_wait_timeEv Line | Count | Source | 159 | 7.89k | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_get_wait_timeEv Line | Count | Source | 159 | 71 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
|
160 | | |
161 | | // Returns the total amount of time threads have blocked in BlockingPut. |
162 | 7.97k | uint64_t total_put_wait_time() const { return _total_put_wait_time; }_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_put_wait_timeEv Line | Count | Source | 162 | 7.90k | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_put_wait_timeEv Line | Count | Source | 162 | 71 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
|
163 | | |
164 | | private: |
165 | | static constexpr int64_t MAX_CV_WAIT_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour |
166 | | bool _shutdown; |
167 | | const int _max_elements; |
168 | | std::condition_variable _get_cv; // 'get' callers wait on this |
169 | | std::condition_variable _put_cv; // 'put' callers wait on this |
170 | | // _lock guards access to _list, total_get_wait_time, and total_put_wait_time |
171 | | mutable std::mutex _lock; |
172 | | std::list<T> _list; |
173 | | std::atomic<uint64_t> _total_get_wait_time; |
174 | | std::atomic<uint64_t> _total_put_wait_time; |
175 | | size_t _get_waiting; |
176 | | size_t _put_waiting; |
177 | | }; |
178 | | } // namespace doris |