/root/doris/be/src/util/blocking_queue.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-queue.hpp |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <unistd.h> |
24 | | |
25 | | #include <atomic> |
26 | | #include <condition_variable> |
27 | | #include <list> |
28 | | #include <mutex> |
29 | | |
30 | | #include "common/logging.h" |
31 | | #include "util/stopwatch.hpp" |
32 | | |
33 | | namespace doris { |
34 | | |
35 | | // Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block |
36 | | // if the queue is empty or full, respectively. |
37 | | template <typename T> |
38 | | class BlockingQueue { |
39 | | public: |
40 | | BlockingQueue(size_t max_elements) |
41 | | : _shutdown(false), |
42 | | _max_elements(max_elements), |
43 | | _total_get_wait_time(0), |
44 | | _total_put_wait_time(0), |
45 | | _get_waiting(0), |
46 | 38 | _put_waiting(0) {} _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEEC2Em Line | Count | Source | 46 | 28 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueIiEC2Em Line | Count | Source | 46 | 2 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueIlEC2Em Line | Count | Source | 46 | 1 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEEC2Em Line | Count | Source | 46 | 6 | _put_waiting(0) {} |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEEC2Em Line | Count | Source | 46 | 1 | _put_waiting(0) {} |
|
47 | | |
48 | | // Get an element from the queue, waiting indefinitely for one to become available. |
49 | | // Returns false if we were shut down prior to getting the element, and there |
50 | | // are no more elements available. |
51 | 59.9k | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_getEPS3_ Line | Count | Source | 51 | 139 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueIiE12blocking_getEPi Line | Count | Source | 51 | 59.8k | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueIlE12blocking_getEPl Line | Count | Source | 51 | 2 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_getEPS4_ Line | Count | Source | 51 | 2 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_getEPS3_ Line | Count | Source | 51 | 1 | bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); } |
|
52 | | |
53 | | // Blocking_get and blocking_put may cause deadlock, |
54 | | // but we still don't find root cause, |
55 | | // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily. |
56 | 59.9k | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { |
57 | 59.9k | MonotonicStopWatch timer; |
58 | 59.9k | timer.start(); |
59 | 59.9k | std::unique_lock<std::mutex> unique_lock(_lock); |
60 | 60.0k | while (!(_shutdown || !_list.empty())) { |
61 | 147 | ++_get_waiting; |
62 | 147 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == |
63 | 147 | std::cv_status::timeout) { |
64 | 5 | _get_waiting--; |
65 | 5 | } |
66 | 147 | } |
67 | 59.9k | _total_get_wait_time += timer.elapsed_time(); |
68 | | |
69 | 59.9k | if (!_list.empty()) { |
70 | 50.0k | *out = _list.front(); |
71 | 50.0k | _list.pop_front(); |
72 | 50.0k | if (_put_waiting > 0) { |
73 | 3.77k | --_put_waiting; |
74 | 3.77k | unique_lock.unlock(); |
75 | 3.77k | _put_cv.notify_one(); |
76 | 3.77k | } |
77 | 50.0k | return true; |
78 | 50.0k | } else { |
79 | 9.87k | assert(_shutdown); |
80 | 0 | return false; |
81 | 9.87k | } |
82 | 59.9k | } _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_getEPS3_l Line | Count | Source | 56 | 139 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 139 | MonotonicStopWatch timer; | 58 | 139 | timer.start(); | 59 | 139 | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 280 | while (!(_shutdown || !_list.empty())) { | 61 | 141 | ++_get_waiting; | 62 | 141 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 141 | std::cv_status::timeout) { | 64 | 0 | _get_waiting--; | 65 | 0 | } | 66 | 141 | } | 67 | 139 | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 139 | if (!_list.empty()) { | 70 | 29 | *out = _list.front(); | 71 | 29 | _list.pop_front(); | 72 | 29 | if (_put_waiting > 0) { | 73 | 0 | --_put_waiting; | 74 | 0 | unique_lock.unlock(); | 75 | 0 | _put_cv.notify_one(); | 76 | 0 | } | 77 | 29 | return true; | 78 | 110 | } else { | 79 | 110 | assert(_shutdown); | 80 | 0 | return false; | 81 | 110 | } | 82 | 139 | } |
_ZN5doris13BlockingQueueIiE23controlled_blocking_getEPil Line | Count | Source | 56 | 59.7k | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 59.7k | MonotonicStopWatch timer; | 58 | 59.7k | timer.start(); | 59 | 59.7k | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 59.7k | while (!(_shutdown || !_list.empty())) { | 61 | 0 | ++_get_waiting; | 62 | 0 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 0 | std::cv_status::timeout) { | 64 | 0 | _get_waiting--; | 65 | 0 | } | 66 | 0 | } | 67 | 59.7k | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 59.7k | if (!_list.empty()) { | 70 | 50.0k | *out = _list.front(); | 71 | 50.0k | _list.pop_front(); | 72 | 50.0k | if (_put_waiting > 0) { | 73 | 3.77k | --_put_waiting; | 74 | 3.77k | unique_lock.unlock(); | 75 | 3.77k | _put_cv.notify_one(); | 76 | 3.77k | } | 77 | 50.0k | return true; | 78 | 50.0k | } else { | 79 | 9.76k | assert(_shutdown); | 80 | 0 | return false; | 81 | 9.76k | } | 82 | 59.7k | } |
_ZN5doris13BlockingQueueIlE23controlled_blocking_getEPll Line | Count | Source | 56 | 2 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 2 | MonotonicStopWatch timer; | 58 | 2 | timer.start(); | 59 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 2 | while (!(_shutdown || !_list.empty())) { | 61 | 0 | ++_get_waiting; | 62 | 0 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 0 | std::cv_status::timeout) { | 64 | 0 | _get_waiting--; | 65 | 0 | } | 66 | 0 | } | 67 | 2 | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 2 | if (!_list.empty()) { | 70 | 1 | *out = _list.front(); | 71 | 1 | _list.pop_front(); | 72 | 1 | if (_put_waiting > 0) { | 73 | 0 | --_put_waiting; | 74 | 0 | unique_lock.unlock(); | 75 | 0 | _put_cv.notify_one(); | 76 | 0 | } | 77 | 1 | return true; | 78 | 1 | } else { | 79 | 1 | assert(_shutdown); | 80 | 0 | return false; | 81 | 1 | } | 82 | 2 | } |
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_getEPS4_l Line | Count | Source | 56 | 2 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 2 | MonotonicStopWatch timer; | 58 | 2 | timer.start(); | 59 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 2 | while (!(_shutdown || !_list.empty())) { | 61 | 0 | ++_get_waiting; | 62 | 0 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 0 | std::cv_status::timeout) { | 64 | 0 | _get_waiting--; | 65 | 0 | } | 66 | 0 | } | 67 | 2 | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 2 | if (!_list.empty()) { | 70 | 2 | *out = _list.front(); | 71 | 2 | _list.pop_front(); | 72 | 2 | if (_put_waiting > 0) { | 73 | 0 | --_put_waiting; | 74 | 0 | unique_lock.unlock(); | 75 | 0 | _put_cv.notify_one(); | 76 | 0 | } | 77 | 2 | return true; | 78 | 2 | } else { | 79 | 0 | assert(_shutdown); | 80 | 0 | return false; | 81 | 0 | } | 82 | 2 | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_getEPS3_l Line | Count | Source | 56 | 2 | bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) { | 57 | 2 | MonotonicStopWatch timer; | 58 | 2 | timer.start(); | 59 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 60 | 8 | while (!(_shutdown || !_list.empty())) { | 61 | 6 | ++_get_waiting; | 62 | 6 | if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 63 | 6 | std::cv_status::timeout) { | 64 | 5 | _get_waiting--; | 65 | 5 | } | 66 | 6 | } | 67 | 2 | _total_get_wait_time += timer.elapsed_time(); | 68 | | | 69 | 2 | if (!_list.empty()) { | 70 | 0 | *out = _list.front(); | 71 | 0 | _list.pop_front(); | 72 | 0 | if (_put_waiting > 0) { | 73 | 0 | --_put_waiting; | 74 | 0 | unique_lock.unlock(); | 75 | 0 | _put_cv.notify_one(); | 76 | 0 | } | 77 | 0 | return true; | 78 | 2 | } else { | 79 | 2 | assert(_shutdown); | 80 | 0 | return false; | 81 | 2 | } | 82 | 2 | } |
|
83 | | |
84 | | // Puts an element into the queue, waiting indefinitely until there is space. |
85 | | // If the queue is shut down, returns false. |
86 | 49.9k | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); } Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_putERKS3_ _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_putERKS4_ Line | Count | Source | 86 | 4 | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueIiE12blocking_putERKi Line | Count | Source | 86 | 49.9k | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); } |
_ZN5doris13BlockingQueueIlE12blocking_putERKl Line | Count | Source | 86 | 2 | bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); } |
|
87 | | |
88 | | // Blocking_get and blocking_put may cause deadlock, |
89 | | // but we still don't find root cause, |
90 | | // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily. |
91 | 49.8k | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { |
92 | 49.8k | MonotonicStopWatch timer; |
93 | 49.8k | timer.start(); |
94 | 49.8k | std::unique_lock<std::mutex> unique_lock(_lock); |
95 | 53.7k | while (!(_shutdown || _list.size() < _max_elements)) { |
96 | 3.77k | ++_put_waiting; |
97 | 3.77k | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == |
98 | 3.77k | std::cv_status::timeout) { |
99 | 0 | _put_waiting--; |
100 | 0 | } |
101 | 3.77k | } |
102 | 49.8k | _total_put_wait_time += timer.elapsed_time(); |
103 | | |
104 | 49.8k | if (_shutdown) { |
105 | 1 | return false; |
106 | 1 | } |
107 | | |
108 | 49.8k | _list.push_back(val); |
109 | 49.8k | if (_get_waiting > 0) { |
110 | 0 | --_get_waiting; |
111 | 0 | unique_lock.unlock(); |
112 | 0 | _get_cv.notify_one(); |
113 | 0 | } |
114 | 49.8k | return true; |
115 | 49.8k | } Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_putERKS3_l _ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_putERKS4_l Line | Count | Source | 91 | 4 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { | 92 | 4 | MonotonicStopWatch timer; | 93 | 4 | timer.start(); | 94 | 4 | std::unique_lock<std::mutex> unique_lock(_lock); | 95 | 4 | while (!(_shutdown || _list.size() < _max_elements)) { | 96 | 0 | ++_put_waiting; | 97 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 98 | 0 | std::cv_status::timeout) { | 99 | 0 | _put_waiting--; | 100 | 0 | } | 101 | 0 | } | 102 | 4 | _total_put_wait_time += timer.elapsed_time(); | 103 | | | 104 | 4 | if (_shutdown) { | 105 | 0 | return false; | 106 | 0 | } | 107 | | | 108 | 4 | _list.push_back(val); | 109 | 4 | if (_get_waiting > 0) { | 110 | 0 | --_get_waiting; | 111 | 0 | unique_lock.unlock(); | 112 | 0 | _get_cv.notify_one(); | 113 | 0 | } | 114 | 4 | return true; | 115 | 4 | } |
_ZN5doris13BlockingQueueIiE23controlled_blocking_putERKil Line | Count | Source | 91 | 49.8k | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { | 92 | 49.8k | MonotonicStopWatch timer; | 93 | 49.8k | timer.start(); | 94 | 49.8k | std::unique_lock<std::mutex> unique_lock(_lock); | 95 | 53.7k | while (!(_shutdown || _list.size() < _max_elements)) { | 96 | 3.77k | ++_put_waiting; | 97 | 3.77k | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 98 | 3.77k | std::cv_status::timeout) { | 99 | 0 | _put_waiting--; | 100 | 0 | } | 101 | 3.77k | } | 102 | 49.8k | _total_put_wait_time += timer.elapsed_time(); | 103 | | | 104 | 49.8k | if (_shutdown) { | 105 | 0 | return false; | 106 | 0 | } | 107 | | | 108 | 49.8k | _list.push_back(val); | 109 | 49.8k | if (_get_waiting > 0) { | 110 | 0 | --_get_waiting; | 111 | 0 | unique_lock.unlock(); | 112 | 0 | _get_cv.notify_one(); | 113 | 0 | } | 114 | 49.8k | return true; | 115 | 49.8k | } |
_ZN5doris13BlockingQueueIlE23controlled_blocking_putERKll Line | Count | Source | 91 | 2 | bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) { | 92 | 2 | MonotonicStopWatch timer; | 93 | 2 | timer.start(); | 94 | 2 | std::unique_lock<std::mutex> unique_lock(_lock); | 95 | 2 | while (!(_shutdown || _list.size() < _max_elements)) { | 96 | 0 | ++_put_waiting; | 97 | 0 | if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) == | 98 | 0 | std::cv_status::timeout) { | 99 | 0 | _put_waiting--; | 100 | 0 | } | 101 | 0 | } | 102 | 2 | _total_put_wait_time += timer.elapsed_time(); | 103 | | | 104 | 2 | if (_shutdown) { | 105 | 1 | return false; | 106 | 1 | } | 107 | | | 108 | 1 | _list.push_back(val); | 109 | 1 | if (_get_waiting > 0) { | 110 | 0 | --_get_waiting; | 111 | 0 | unique_lock.unlock(); | 112 | 0 | _get_cv.notify_one(); | 113 | 0 | } | 114 | 1 | return true; | 115 | 2 | } |
Unexecuted instantiation: _ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_putERKS3_l |
116 | | |
117 | | // Return false if queue full or has been shutdown. |
118 | 29 | bool try_put(const T& val) { |
119 | 29 | if (_shutdown || _list.size() >= _max_elements) { |
120 | 0 | return false; |
121 | 0 | } |
122 | | |
123 | 29 | MonotonicStopWatch timer; |
124 | 29 | timer.start(); |
125 | 29 | std::unique_lock<std::mutex> unique_lock(_lock); |
126 | 29 | _total_put_wait_time += timer.elapsed_time(); |
127 | | |
128 | 29 | if (_shutdown || _list.size() >= _max_elements) { |
129 | 0 | return false; |
130 | 0 | } |
131 | | |
132 | 29 | _list.push_back(val); |
133 | 29 | if (_get_waiting > 0) { |
134 | 29 | --_get_waiting; |
135 | 29 | unique_lock.unlock(); |
136 | 29 | _get_cv.notify_one(); |
137 | 29 | } |
138 | 29 | return true; |
139 | 29 | } |
140 | | |
141 | | // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut. |
142 | 35 | void shutdown() { |
143 | 35 | { |
144 | 35 | std::lock_guard<std::mutex> guard(_lock); |
145 | 35 | _shutdown = true; |
146 | 35 | } |
147 | | |
148 | 35 | _get_cv.notify_all(); |
149 | 35 | _put_cv.notify_all(); |
150 | 35 | } _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8shutdownEv Line | Count | Source | 142 | 28 | void shutdown() { | 143 | 28 | { | 144 | 28 | std::lock_guard<std::mutex> guard(_lock); | 145 | 28 | _shutdown = true; | 146 | 28 | } | 147 | | | 148 | 28 | _get_cv.notify_all(); | 149 | 28 | _put_cv.notify_all(); | 150 | 28 | } |
_ZN5doris13BlockingQueueIiE8shutdownEv Line | Count | Source | 142 | 1 | void shutdown() { | 143 | 1 | { | 144 | 1 | std::lock_guard<std::mutex> guard(_lock); | 145 | 1 | _shutdown = true; | 146 | 1 | } | 147 | | | 148 | 1 | _get_cv.notify_all(); | 149 | 1 | _put_cv.notify_all(); | 150 | 1 | } |
_ZN5doris13BlockingQueueIlE8shutdownEv Line | Count | Source | 142 | 1 | void shutdown() { | 143 | 1 | { | 144 | 1 | std::lock_guard<std::mutex> guard(_lock); | 145 | 1 | _shutdown = true; | 146 | 1 | } | 147 | | | 148 | 1 | _get_cv.notify_all(); | 149 | 1 | _put_cv.notify_all(); | 150 | 1 | } |
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8shutdownEv Line | Count | Source | 142 | 2 | void shutdown() { | 143 | 2 | { | 144 | 2 | std::lock_guard<std::mutex> guard(_lock); | 145 | 2 | _shutdown = true; | 146 | 2 | } | 147 | | | 148 | 2 | _get_cv.notify_all(); | 149 | 2 | _put_cv.notify_all(); | 150 | 2 | } |
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE8shutdownEv Line | Count | Source | 142 | 3 | void shutdown() { | 143 | 3 | { | 144 | 3 | std::lock_guard<std::mutex> guard(_lock); | 145 | 3 | _shutdown = true; | 146 | 3 | } | 147 | | | 148 | 3 | _get_cv.notify_all(); | 149 | 3 | _put_cv.notify_all(); | 150 | 3 | } |
|
151 | | |
152 | 142 | uint32_t get_size() const { |
153 | 142 | std::lock_guard<std::mutex> l(_lock); |
154 | 142 | return _list.size(); |
155 | 142 | } _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8get_sizeEv Line | Count | Source | 152 | 141 | uint32_t get_size() const { | 153 | 141 | std::lock_guard<std::mutex> l(_lock); | 154 | 141 | return _list.size(); | 155 | 141 | } |
Unexecuted instantiation: _ZNK5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8get_sizeEv _ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE8get_sizeEv Line | Count | Source | 152 | 1 | uint32_t get_size() const { | 153 | 1 | std::lock_guard<std::mutex> l(_lock); | 154 | 1 | return _list.size(); | 155 | 1 | } |
|
156 | | |
157 | 0 | uint32_t get_capacity() const { return _max_elements; } |
158 | | |
159 | | // Returns the total amount of time threads have blocked in BlockingGet. |
160 | 1 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } _ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_get_wait_timeEv Line | Count | Source | 160 | 1 | uint64_t total_get_wait_time() const { return _total_get_wait_time; } |
Unexecuted instantiation: _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_get_wait_timeEv |
161 | | |
162 | | // Returns the total amount of time threads have blocked in BlockingPut. |
163 | 1 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } _ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_put_wait_timeEv Line | Count | Source | 163 | 1 | uint64_t total_put_wait_time() const { return _total_put_wait_time; } |
Unexecuted instantiation: _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_put_wait_timeEv |
164 | | |
165 | | private: |
166 | | static constexpr int64_t MAX_CV_WAIT_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour |
167 | | bool _shutdown; |
168 | | const int _max_elements; |
169 | | std::condition_variable _get_cv; // 'get' callers wait on this |
170 | | std::condition_variable _put_cv; // 'put' callers wait on this |
171 | | // _lock guards access to _list, total_get_wait_time, and total_put_wait_time |
172 | | mutable std::mutex _lock; |
173 | | std::list<T> _list; |
174 | | std::atomic<uint64_t> _total_get_wait_time; |
175 | | std::atomic<uint64_t> _total_put_wait_time; |
176 | | size_t _get_waiting; |
177 | | size_t _put_waiting; |
178 | | }; |
179 | | |
180 | | } // namespace doris |