/root/doris/be/src/util/blocking_queue.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-queue.hpp |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <unistd.h> |
24 | | |
25 | | #include <atomic> |
26 | | #include <condition_variable> |
27 | | #include <list> |
28 | | #include <mutex> |
29 | | |
30 | | #include "common/logging.h" |
31 | | #include "util/stopwatch.hpp" |
32 | | |
33 | | namespace doris { |
34 | | |
35 | | // Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block |
36 | | // if the queue is empty or full, respectively. |
37 | | template <typename T> |
38 | | class BlockingQueue { |
39 | | public: |
40 | | BlockingQueue(size_t max_elements) |
41 | | : _shutdown(false), |
42 | | _max_elements(max_elements), |
43 | | _total_get_wait_time(0), |
44 | | _total_put_wait_time(0), |
45 | | _get_waiting(0), |
46 | 10 | _put_waiting(0) {} _ZN5doris13BlockingQueueIiEC2Em Line | Count | Source | 46 | 2 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueIlEC2Em Line | Count | Source | 46 | 1 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEEC2Em Line | Count | Source | 46 | 6 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEEC2Em Line | Count | Source | 46 | 1 | _put_waiting(0) {} |
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEEC2Em Unexecuted instantiation: _ZN5doris13BlockingQueueIPNS_10vectorized14ScannerContextEEC2Em |
47 | | |
48 | | // Get an element from the queue, waiting indefinitely for one to become available. |
49 | | // Returns false if we were shut down prior to getting the element, and there |
50 | | // are no more elements available. |
51 | 59.9k | bool blocking_get(T* out) { |
52 | 59.9k | MonotonicStopWatch timer; |
53 | 59.9k | timer.start(); |
54 | 59.9k | std::unique_lock<std::mutex> unique_lock(_lock); |
55 | 59.9k | while (!(_shutdown || !_list.empty())) { |
56 | 1 | ++_get_waiting; |
57 | 1 | _get_cv.wait(unique_lock); |
58 | 1 | } |
59 | 59.9k | _total_get_wait_time += timer.elapsed_time(); |
60 | | |
61 | 59.9k | if (!_list.empty()) { |
62 | 50.0k | *out = _list.front(); |
63 | 50.0k | _list.pop_front(); |
64 | 50.0k | if (_put_waiting > 0) { |
65 | 5.83k | --_put_waiting; |
66 | 5.83k | unique_lock.unlock(); |
67 | 5.83k | _put_cv.notify_one(); |
68 | 5.83k | } |
69 | 50.0k | return true; |
70 | 50.0k | } else { |
71 | 9.90k | assert(_shutdown); |
72 | 0 | return false; |
73 | 9.90k | } |
74 | 59.9k | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_getEPS4_ Line | Count | Source | 51 | 2 | bool blocking_get(T* out) { | 52 | 2 | MonotonicStopWatch timer; | 53 | 2 | timer.start(); | 54 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 55 | 2 | while (!(_shutdown || !_list.empty())) { | 56 | 0 | ++_get_waiting; | 57 | 0 | _get_cv.wait(unique_lock); | 58 | 0 | } | 59 | 2 | _total_get_wait_time += timer.elapsed_time(); | 60 | | | 61 | 2 | if (!_list.empty()) { | 62 | 2 | *out = _list.front(); | 63 | 2 | _list.pop_front(); | 64 | 2 | if (_put_waiting > 0) { | 65 | 0 | --_put_waiting; | 66 | 0 | unique_lock.unlock(); | 67 | 0 | _put_cv.notify_one(); | 68 | 0 | } | 69 | 2 | return true; | 70 | 2 | } else { | 71 | 0 | assert(_shutdown); | 72 | 0 | return false; | 73 | 0 | } | 74 | 2 | } |
_ZN5doris13BlockingQueueIiE12blocking_getEPi Line | Count | Source | 51 | 59.9k | bool blocking_get(T* out) { | 52 | 59.9k | MonotonicStopWatch timer; | 53 | 59.9k | timer.start(); | 54 | 59.9k | std::unique_lock<std::mutex> unique_lock(_lock); | 55 | 59.9k | while (!(_shutdown || !_list.empty())) { | 56 | 0 | ++_get_waiting; | 57 | 0 | _get_cv.wait(unique_lock); | 58 | 0 | } | 59 | 59.9k | _total_get_wait_time += timer.elapsed_time(); | 60 | | | 61 | 59.9k | if (!_list.empty()) { | 62 | 50.0k | *out = _list.front(); | 63 | 50.0k | _list.pop_front(); | 64 | 50.0k | if (_put_waiting > 0) { | 65 | 5.83k | --_put_waiting; | 66 | 5.83k | unique_lock.unlock(); | 67 | 5.83k | _put_cv.notify_one(); | 68 | 5.83k | } | 69 | 50.0k | return true; | 70 | 50.0k | } else { | 71 | 9.90k | assert(_shutdown); | 72 | 0 | return false; | 73 | 9.90k | } | 74 | 59.9k | } |
_ZN5doris13BlockingQueueIlE12blocking_getEPl Line | Count | Source | 51 | 2 | bool blocking_get(T* out) { | 52 | 2 | MonotonicStopWatch timer; | 53 | 2 | timer.start(); | 54 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 55 | 2 | while (!(_shutdown || !_list.empty())) { | 56 | 0 | ++_get_waiting; | 57 | 0 | _get_cv.wait(unique_lock); | 58 | 0 | } | 59 | 2 | _total_get_wait_time += timer.elapsed_time(); | 60 | | | 61 | 2 | if (!_list.empty()) { | 62 | 1 | *out = _list.front(); | 63 | 1 | _list.pop_front(); | 64 | 1 | if (_put_waiting > 0) { | 65 | 0 | --_put_waiting; | 66 | 0 | unique_lock.unlock(); | 67 | 0 | _put_cv.notify_one(); | 68 | 0 | } | 69 | 1 | return true; | 70 | 1 | } else { | 71 | 1 | assert(_shutdown); | 72 | 0 | return false; | 73 | 1 | } | 74 | 2 | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_getEPS3_ Line | Count | Source | 51 | 2 | bool blocking_get(T* out) { | 52 | 2 | MonotonicStopWatch timer; | 53 | 2 | timer.start(); | 54 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 55 | 3 | while (!(_shutdown || !_list.empty())) { | 56 | 1 | ++_get_waiting; | 57 | 1 | _get_cv.wait(unique_lock); | 58 | 1 | } | 59 | 2 | _total_get_wait_time += timer.elapsed_time(); | 60 | | | 61 | 2 | if (!_list.empty()) { | 62 | 0 | *out = _list.front(); | 63 | 0 | _list.pop_front(); | 64 | 0 | if (_put_waiting > 0) { | 65 | 0 | --_put_waiting; | 66 | 0 | unique_lock.unlock(); | 67 | 0 | _put_cv.notify_one(); | 68 | 0 | } | 69 | 0 | return true; | 70 | 2 | } else { | 71 | 2 | assert(_shutdown); | 72 | 0 | return false; | 73 | 2 | } | 74 | 2 | } |
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_getEPS3_ Unexecuted instantiation: _ZN5doris13BlockingQueueIPNS_10vectorized14ScannerContextEE12blocking_getEPS3_ |
75 | | |
76 | | // Puts an element into the queue, waiting indefinitely until there is space. |
77 | | // If the queue is shut down, returns false. |
78 | 49.9k | bool blocking_put(const T& val) { |
79 | 49.9k | MonotonicStopWatch timer; |
80 | 49.9k | timer.start(); |
81 | 49.9k | std::unique_lock<std::mutex> unique_lock(_lock); |
82 | 55.8k | while (!(_shutdown || _list.size() < _max_elements)) { |
83 | 5.83k | ++_put_waiting; |
84 | 5.83k | _put_cv.wait(unique_lock); |
85 | 5.83k | } |
86 | 49.9k | _total_put_wait_time += timer.elapsed_time(); |
87 | | |
88 | 49.9k | if (_shutdown) { |
89 | 1 | return false; |
90 | 1 | } |
91 | | |
92 | 49.9k | _list.push_back(val); |
93 | 49.9k | if (_get_waiting > 0) { |
94 | 0 | --_get_waiting; |
95 | 0 | unique_lock.unlock(); |
96 | 0 | _get_cv.notify_one(); |
97 | 0 | } |
98 | 49.9k | return true; |
99 | 49.9k | } _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_putERKS4_ Line | Count | Source | 78 | 4 | bool blocking_put(const T& val) { | 79 | 4 | MonotonicStopWatch timer; | 80 | 4 | timer.start(); | 81 | 4 | std::unique_lock<std::mutex> unique_lock(_lock); | 82 | 4 | while (!(_shutdown || _list.size() < _max_elements)) { | 83 | 0 | ++_put_waiting; | 84 | 0 | _put_cv.wait(unique_lock); | 85 | 0 | } | 86 | 4 | _total_put_wait_time += timer.elapsed_time(); | 87 | | | 88 | 4 | if (_shutdown) { | 89 | 0 | return false; | 90 | 0 | } | 91 | | | 92 | 4 | _list.push_back(val); | 93 | 4 | if (_get_waiting > 0) { | 94 | 0 | --_get_waiting; | 95 | 0 | unique_lock.unlock(); | 96 | 0 | _get_cv.notify_one(); | 97 | 0 | } | 98 | 4 | return true; | 99 | 4 | } |
_ZN5doris13BlockingQueueIiE12blocking_putERKi Line | Count | Source | 78 | 49.9k | bool blocking_put(const T& val) { | 79 | 49.9k | MonotonicStopWatch timer; | 80 | 49.9k | timer.start(); | 81 | 49.9k | std::unique_lock<std::mutex> unique_lock(_lock); | 82 | 55.8k | while (!(_shutdown || _list.size() < _max_elements)) { | 83 | 5.83k | ++_put_waiting; | 84 | 5.83k | _put_cv.wait(unique_lock); | 85 | 5.83k | } | 86 | 49.9k | _total_put_wait_time += timer.elapsed_time(); | 87 | | | 88 | 49.9k | if (_shutdown) { | 89 | 0 | return false; | 90 | 0 | } | 91 | | | 92 | 49.9k | _list.push_back(val); | 93 | 49.9k | if (_get_waiting > 0) { | 94 | 0 | --_get_waiting; | 95 | 0 | unique_lock.unlock(); | 96 | 0 | _get_cv.notify_one(); | 97 | 0 | } | 98 | 49.9k | return true; | 99 | 49.9k | } |
_ZN5doris13BlockingQueueIlE12blocking_putERKl Line | Count | Source | 78 | 2 | bool blocking_put(const T& val) { | 79 | 2 | MonotonicStopWatch timer; | 80 | 2 | timer.start(); | 81 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 82 | 2 | while (!(_shutdown || _list.size() < _max_elements)) { | 83 | 0 | ++_put_waiting; | 84 | 0 | _put_cv.wait(unique_lock); | 85 | 0 | } | 86 | 2 | _total_put_wait_time += timer.elapsed_time(); | 87 | | | 88 | 2 | if (_shutdown) { | 89 | 1 | return false; | 90 | 1 | } | 91 | | | 92 | 1 | _list.push_back(val); | 93 | 1 | if (_get_waiting > 0) { | 94 | 0 | --_get_waiting; | 95 | 0 | unique_lock.unlock(); | 96 | 0 | _get_cv.notify_one(); | 97 | 0 | } | 98 | 1 | return true; | 99 | 2 | } |
Unexecuted instantiation: _ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_putERKS3_ Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_putERKS3_ Unexecuted instantiation: _ZN5doris13BlockingQueueIPNS_10vectorized14ScannerContextEE12blocking_putERKS3_ |
100 | | |
101 | | // Return false if queue full or has been shutdown. |
102 | 0 | bool try_put(const T& val) { |
103 | 0 | if (_shutdown || _list.size() >= _max_elements) { |
104 | 0 | return false; |
105 | 0 | } |
106 | | |
107 | 0 | MonotonicStopWatch timer; |
108 | 0 | timer.start(); |
109 | 0 | std::unique_lock<std::mutex> unique_lock(_lock); |
110 | 0 | _total_put_wait_time += timer.elapsed_time(); |
111 | |
|
112 | 0 | if (_shutdown || _list.size() >= _max_elements) { |
113 | 0 | return false; |
114 | 0 | } |
115 | | |
116 | 0 | _list.push_back(val); |
117 | 0 | if (_get_waiting > 0) { |
118 | 0 | --_get_waiting; |
119 | 0 | unique_lock.unlock(); |
120 | 0 | _get_cv.notify_one(); |
121 | 0 | } |
122 | 0 | return true; |
123 | 0 | } |
124 | | |
125 | | // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut. |
126 | 7 | void shutdown() { |
127 | 7 | { |
128 | 7 | std::lock_guard<std::mutex> guard(_lock); |
129 | 7 | _shutdown = true; |
130 | 7 | } |
131 | | |
132 | 7 | _get_cv.notify_all(); |
133 | 7 | _put_cv.notify_all(); |
134 | 7 | } _ZN5doris13BlockingQueueIiE8shutdownEv Line | Count | Source | 126 | 1 | void shutdown() { | 127 | 1 | { | 128 | 1 | std::lock_guard<std::mutex> guard(_lock); | 129 | 1 | _shutdown = true; | 130 | 1 | } | 131 | | | 132 | 1 | _get_cv.notify_all(); | 133 | 1 | _put_cv.notify_all(); | 134 | 1 | } |
_ZN5doris13BlockingQueueIlE8shutdownEv Line | Count | Source | 126 | 1 | void shutdown() { | 127 | 1 | { | 128 | 1 | std::lock_guard<std::mutex> guard(_lock); | 129 | 1 | _shutdown = true; | 130 | 1 | } | 131 | | | 132 | 1 | _get_cv.notify_all(); | 133 | 1 | _put_cv.notify_all(); | 134 | 1 | } |
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8shutdownEv Line | Count | Source | 126 | 2 | void shutdown() { | 127 | 2 | { | 128 | 2 | std::lock_guard<std::mutex> guard(_lock); | 129 | 2 | _shutdown = true; | 130 | 2 | } | 131 | | | 132 | 2 | _get_cv.notify_all(); | 133 | 2 | _put_cv.notify_all(); | 134 | 2 | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE8shutdownEv Line | Count | Source | 126 | 3 | void shutdown() { | 127 | 3 | { | 128 | 3 | std::lock_guard<std::mutex> guard(_lock); | 129 | 3 | _shutdown = true; | 130 | 3 | } | 131 | | | 132 | 3 | _get_cv.notify_all(); | 133 | 3 | _put_cv.notify_all(); | 134 | 3 | } |
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8shutdownEv Unexecuted instantiation: _ZN5doris13BlockingQueueIPNS_10vectorized14ScannerContextEE8shutdownEv |
135 | | |
136 | 1 | uint32_t get_size() const { |
137 | 1 | std::lock_guard<std::mutex> l(_lock); |
138 | 1 | return _list.size(); |
139 | 1 | } _ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE8get_sizeEv Line | Count | Source | 136 | 1 | uint32_t get_size() const { | 137 | 1 | std::lock_guard<std::mutex> l(_lock); | 138 | 1 | return _list.size(); | 139 | 1 | } |
Unexecuted instantiation: _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8get_sizeEv |
140 | | |
141 | 0 | uint32_t get_capacity() const { return _max_elements; } |
142 | | |
143 | | // Returns the total amount of time threads have blocked in BlockingGet. |
144 | 1 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } _ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_get_wait_timeEv Line | Count | Source | 144 | 1 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
Unexecuted instantiation: _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_get_wait_timeEv |
145 | | |
146 | | // Returns the total amount of time threads have blocked in BlockingPut. |
147 | 1 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } _ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_put_wait_timeEv Line | Count | Source | 147 | 1 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
Unexecuted instantiation: _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_put_wait_timeEv |
148 | | |
149 | | private: |
150 | | bool _shutdown; |
151 | | const int _max_elements; |
152 | | std::condition_variable _get_cv; // 'get' callers wait on this |
153 | | std::condition_variable _put_cv; // 'put' callers wait on this |
154 | | // _lock guards access to _list, total_get_wait_time, and total_put_wait_time |
155 | | mutable std::mutex _lock; |
156 | | std::list<T> _list; |
157 | | std::atomic<uint64_t> _total_get_wait_time; |
158 | | std::atomic<uint64_t> _total_put_wait_time; |
159 | | size_t _get_waiting; |
160 | | size_t _put_waiting; |
161 | | }; |
162 | | |
163 | | } // namespace doris |