be/src/util/blocking_queue.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-queue.hpp |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <unistd.h> |
24 | | |
25 | | #include <atomic> |
26 | | #include <condition_variable> |
27 | | #include <list> |
28 | | #include <mutex> |
29 | | |
30 | | #include "common/logging.h" |
31 | | #include "util/stopwatch.hpp" |
32 | | |
33 | | namespace doris { |
34 | | #include "common/compile_check_begin.h" |
35 | | // Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block |
36 | | // if the queue is empty or full, respectively. |
37 | | template <typename T> |
38 | | class BlockingQueue { |
39 | | public: |
40 | | BlockingQueue(uint32_t max_elements) |
41 | 132 | : _shutdown(false), |
42 | 132 | _max_elements(max_elements), |
43 | 132 | _total_get_wait_time(0), |
44 | 132 | _total_put_wait_time(0), |
45 | 132 | _get_waiting(0), |
46 | 132 | _put_waiting(0) {}_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEEC2Ej Line | Count | Source | 41 | 9 | : _shutdown(false), | 42 | 9 | _max_elements(max_elements), | 43 | 9 | _total_get_wait_time(0), | 44 | 9 | _total_put_wait_time(0), | 45 | 9 | _get_waiting(0), | 46 | 9 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEEC2Ej Line | Count | Source | 41 | 49 | : _shutdown(false), | 42 | 49 | _max_elements(max_elements), | 43 | 49 | _total_get_wait_time(0), | 44 | 49 | _total_put_wait_time(0), | 45 | 49 | _get_waiting(0), | 46 | 49 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEEC2Ej Line | Count | Source | 41 | 74 | : _shutdown(false), | 42 | 74 | _max_elements(max_elements), | 43 | 74 | _total_get_wait_time(0), | 44 | 74 | _total_put_wait_time(0), | 45 | 74 | _get_waiting(0), | 46 | 74 | _put_waiting(0) {} |
|
47 | | |
48 | | // Get an element from the queue, waiting indefinitely for one to become available. |
49 | | // Returns false if we were shut down prior to getting the element, and there |
50 | | // are no more elements available. |
51 | 695k | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_getEPS4_ Line | Count | Source | 51 | 7 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_getEPS3_ Line | Count | Source | 51 | 694k | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_getEPS3_ Line | Count | Source | 51 | 74 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
|
52 | | |
53 | | // Blocking_get and blocking_put may cause deadlock, |
54 | | // but we still don't find root cause, |
55 | | // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily. |
56 | 695k | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { |
57 | 695k | MonotonicStopWatch timer; |
58 | 695k | timer.start(); |
59 | 695k | std::unique_lock<std::mutex> unique_lock(_lock); |
60 | 1.39M | while (!(_shutdown || !_list.empty())) { |
61 | 697k | ++_get_waiting; |
62 | 697k | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == |
63 | 697k | std::cv_status::timeout) { |
64 | 514 | _get_waiting--; |
65 | 514 | } |
66 | 697k | } |
67 | 695k | _total_get_wait_time += timer.elapsed_time(); |
68 | | |
69 | 695k | if (!_list.empty()) { |
70 | 691k | *out = _list.front(); |
71 | 691k | _list.pop_front(); |
72 | 691k | if (_put_waiting > 0) { |
73 | 0 | --_put_waiting; |
74 | 0 | unique_lock.unlock(); |
75 | 0 | _put_cv.notify_one(); |
76 | 0 | } |
77 | 691k | return true; |
78 | 691k | } else { |
79 | 3.91k | assert(_shutdown); |
80 | 2.56k | return false; |
81 | 3.91k | } |
82 | 695k | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_getEPS4_l Line | Count | Source | 56 | 7 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 7 | MonotonicStopWatch timer; | 58 | 7 | timer.start(); | 59 | 7 | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 10 | while (!(_shutdown || !_list.empty())) { | 61 | 3 | ++_get_waiting; | 62 | 3 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 3 | std::cv_status::timeout) { | 64 | 0 | _get_waiting--; | 65 | 0 | } | 66 | 3 | } | 67 | 7 | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 7 | if (!_list.empty()) { | 70 | 7 | *out = _list.front(); | 71 | 7 | _list.pop_front(); | 72 | 7 | if (_put_waiting > 0) { | 73 | 0 | --_put_waiting; | 74 | 0 | unique_lock.unlock(); | 75 | 0 | _put_cv.notify_one(); | 76 | 0 | } | 77 | 7 | return true; | 78 | 7 | } else { | 79 | 0 | assert(_shutdown); | 80 | 0 | return false; | 81 | 0 | } | 82 | 7 | } |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_getEPS3_l Line | Count | Source | 56 | 694k | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 694k | MonotonicStopWatch timer; | 58 | 694k | timer.start(); | 59 | 694k | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 1.39M | while (!(_shutdown || !_list.empty())) { | 61 | 697k | ++_get_waiting; | 62 | 697k | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 697k | std::cv_status::timeout) { | 64 | 509 | _get_waiting--; | 65 | 509 | } | 66 | 697k | } | 67 | 694k | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 694k | if (!_list.empty()) { | 70 | 691k | *out = _list.front(); | 71 | 691k | _list.pop_front(); | 72 | 691k | if (_put_waiting > 0) { | 73 | 0 | --_put_waiting; | 74 | 0 | unique_lock.unlock(); | 75 | 0 | _put_cv.notify_one(); | 76 | 0 | } | 77 | 691k | return true; | 78 | 691k | } else { | 79 | 3.76k | assert(_shutdown); | 80 | 2.41k | return false; | 81 | 3.76k | } | 82 | 694k | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_getEPS3_l Line | Count | Source | 56 | 885 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 885 | MonotonicStopWatch timer; | 58 | 885 | timer.start(); | 59 | 885 | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 1.09k | while (!(_shutdown || !_list.empty())) { | 61 | 214 | ++_get_waiting; | 62 | 214 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 214 | std::cv_status::timeout) { | 64 | 5 | _get_waiting--; | 65 | 5 | } | 66 | 214 | } | 67 | 885 | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 885 | if (!_list.empty()) { | 70 | 738 | *out = _list.front(); | 71 | 738 | _list.pop_front(); | 72 | 738 | if (_put_waiting > 0) { | 73 | 0 | --_put_waiting; | 74 | 0 | unique_lock.unlock(); | 75 | 0 | _put_cv.notify_one(); | 76 | 0 | } | 77 | 738 | return true; | 78 | 738 | } else { | 79 | 147 | assert(_shutdown); | 80 | 147 | return false; | 81 | 147 | } | 82 | 885 | } |
|
83 | | |
84 | | // Puts an element into the queue, waiting indefinitely until there is space. |
85 | | // If the queue is shut down, returns false. |
86 | 12 | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_putERKS4_ Line | Count | Source | 86 | 12 | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); } |
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_putERKS3_ |
87 | | |
88 | | // Blocking_get and blocking_put may cause deadlock, |
89 | | // but we still don't find root cause, |
90 | | // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily. |
91 | 751 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { |
92 | 751 | MonotonicStopWatch timer; |
93 | 751 | timer.start(); |
94 | 751 | std::unique_lock<std::mutex> unique_lock(_lock); |
95 | 751 | while (!(_shutdown || _list.size() < _max_elements)) { |
96 | 0 | ++_put_waiting; |
97 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == |
98 | 0 | std::cv_status::timeout) { |
99 | 0 | _put_waiting--; |
100 | 0 | } |
101 | 0 | } |
102 | 751 | _total_put_wait_time += timer.elapsed_time(); |
103 | | |
104 | 751 | if (_shutdown) { |
105 | 1 | return false; |
106 | 1 | } |
107 | | |
108 | 750 | _list.push_back(val); |
109 | 750 | if (_get_waiting > 0) { |
110 | 147 | --_get_waiting; |
111 | 147 | unique_lock.unlock(); |
112 | 147 | _get_cv.notify_one(); |
113 | 147 | } |
114 | 750 | return true; |
115 | 751 | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_putERKS4_l Line | Count | Source | 91 | 12 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { | 92 | 12 | MonotonicStopWatch timer; | 93 | 12 | timer.start(); | 94 | 12 | std::unique_lock<std::mutex> unique_lock(_lock); | 95 | 12 | while (!(_shutdown || _list.size() < _max_elements)) { | 96 | 0 | ++_put_waiting; | 97 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 98 | 0 | std::cv_status::timeout) { | 99 | 0 | _put_waiting--; | 100 | 0 | } | 101 | 0 | } | 102 | 12 | _total_put_wait_time += timer.elapsed_time(); | 103 | | | 104 | 12 | if (_shutdown) { | 105 | 0 | return false; | 106 | 0 | } | 107 | | | 108 | 12 | _list.push_back(val); | 109 | 12 | if (_get_waiting > 0) { | 110 | 3 | --_get_waiting; | 111 | 3 | unique_lock.unlock(); | 112 | 3 | _get_cv.notify_one(); | 113 | 3 | } | 114 | 12 | return true; | 115 | 12 | } |
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_putERKS3_l _ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_putERKS3_l Line | Count | Source | 91 | 739 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { | 92 | 739 | MonotonicStopWatch timer; | 93 | 739 | timer.start(); | 94 | 739 | std::unique_lock<std::mutex> unique_lock(_lock); | 95 | 739 | while (!(_shutdown || _list.size() < _max_elements)) { | 96 | 0 | ++_put_waiting; | 97 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 98 | 0 | std::cv_status::timeout) { | 99 | 0 | _put_waiting--; | 100 | 0 | } | 101 | 0 | } | 102 | 739 | _total_put_wait_time += timer.elapsed_time(); | 103 | | | 104 | 739 | if (_shutdown) { | 105 | 1 | return false; | 106 | 1 | } | 107 | | | 108 | 738 | _list.push_back(val); | 109 | 738 | if (_get_waiting > 0) { | 110 | 144 | --_get_waiting; | 111 | 144 | unique_lock.unlock(); | 112 | 144 | _get_cv.notify_one(); | 113 | 144 | } | 114 | 738 | return true; | 115 | 739 | } |
|
116 | | |
117 | | // Return false if queue full or has been shutdown. |
118 | 690k | bool try_put(const T& val) { |
119 | 690k | if (_shutdown || _list.size() >= _max_elements) { |
120 | 0 | return false; |
121 | 0 | } |
122 | | |
123 | 690k | MonotonicStopWatch timer; |
124 | 690k | timer.start(); |
125 | 690k | std::unique_lock<std::mutex> unique_lock(_lock); |
126 | 690k | _total_put_wait_time += timer.elapsed_time(); |
127 | | |
128 | 691k | if (_shutdown || _list.size() >= _max_elements) { |
129 | 0 | return false; |
130 | 0 | } |
131 | | |
132 | 690k | _list.push_back(val); |
133 | 691k | if (_get_waiting > 0) { |
134 | 691k | --_get_waiting; |
135 | 691k | unique_lock.unlock(); |
136 | 691k | _get_cv.notify_one(); |
137 | 691k | } |
138 | 690k | return true; |
139 | 690k | } |
140 | | |
141 | | // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut. |
142 | 264 | void shutdown() { |
143 | 264 | { |
144 | 264 | std::lock_guard<std::mutex> guard(_lock); |
145 | 264 | _shutdown = true; |
146 | 264 | } |
147 | | |
148 | 264 | _get_cv.notify_all(); |
149 | 264 | _put_cv.notify_all(); |
150 | 264 | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8shutdownEv Line | Count | Source | 142 | 5 | void shutdown() { | 143 | 5 | { | 144 | 5 | std::lock_guard<std::mutex> guard(_lock); | 145 | 5 | _shutdown = true; | 146 | 5 | } | 147 | | | 148 | 5 | _get_cv.notify_all(); | 149 | 5 | _put_cv.notify_all(); | 150 | 5 | } |
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8shutdownEv Line | Count | Source | 142 | 37 | void shutdown() { | 143 | 37 | { | 144 | 37 | std::lock_guard<std::mutex> guard(_lock); | 145 | 37 | _shutdown = true; | 146 | 37 | } | 147 | | | 148 | 37 | _get_cv.notify_all(); | 149 | 37 | _put_cv.notify_all(); | 150 | 37 | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE8shutdownEv Line | Count | Source | 142 | 222 | void shutdown() { | 143 | 222 | { | 144 | 222 | std::lock_guard<std::mutex> guard(_lock); | 145 | 222 | _shutdown = true; | 146 | 222 | } | 147 | | | 148 | 222 | _get_cv.notify_all(); | 149 | 222 | _put_cv.notify_all(); | 150 | 222 | } |
|
151 | | |
152 | 701k | uint32_t get_size() const { |
153 | 701k | std::lock_guard<std::mutex> l(_lock); |
154 | 701k | return static_cast<uint32_t>(_list.size()); |
155 | 701k | } _ZNK5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8get_sizeEv Line | Count | Source | 152 | 8 | uint32_t get_size() const { | 153 | 8 | std::lock_guard<std::mutex> l(_lock); | 154 | 8 | return static_cast<uint32_t>(_list.size()); | 155 | 8 | } |
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8get_sizeEv Line | Count | Source | 152 | 701k | uint32_t get_size() const { | 153 | 701k | std::lock_guard<std::mutex> l(_lock); | 154 | 701k | return static_cast<uint32_t>(_list.size()); | 155 | 701k | } |
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE8get_sizeEv Line | Count | Source | 152 | 74 | uint32_t get_size() const { | 153 | 74 | std::lock_guard<std::mutex> l(_lock); | 154 | 74 | return static_cast<uint32_t>(_list.size()); | 155 | 74 | } |
|
156 | | |
157 | 7.90k | uint32_t get_capacity() const { return _max_elements; } |
158 | | |
159 | | // Returns the total amount of time threads have blocked in BlockingGet. |
160 | 7.97k | uint64_t total_get_wait_time() const { return _total_get_wait_time; }_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_get_wait_timeEv Line | Count | Source | 160 | 7.90k | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_get_wait_timeEv Line | Count | Source | 160 | 74 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
|
161 | | |
162 | | // Returns the total amount of time threads have blocked in BlockingPut. |
163 | 7.97k | uint64_t total_put_wait_time() const { return _total_put_wait_time; }_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_put_wait_timeEv Line | Count | Source | 163 | 7.90k | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_put_wait_timeEv Line | Count | Source | 163 | 74 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
|
164 | | |
165 | | private: |
166 | | static constexpr int64_t MAX_CV_WAIT_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour |
167 | | bool _shutdown; |
168 | | const int _max_elements; |
169 | | std::condition_variable _get_cv; // 'get' callers wait on this |
170 | | std::condition_variable _put_cv; // 'put' callers wait on this |
171 | | // _lock guards access to _list, total_get_wait_time, and total_put_wait_time |
172 | | mutable std::mutex _lock; |
173 | | std::list<T> _list; |
174 | | std::atomic<uint64_t> _total_get_wait_time; |
175 | | std::atomic<uint64_t> _total_put_wait_time; |
176 | | size_t _get_waiting; |
177 | | size_t _put_waiting; |
178 | | }; |
179 | | #include "common/compile_check_end.h" |
180 | | } // namespace doris |