Coverage Report

Created: 2026-04-14 20:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/blocking_queue.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-queue.hpp
19
// and modified by Doris
20
21
#pragma once
22
23
#include <unistd.h>
24
25
#include <atomic>
26
#include <condition_variable>
27
#include <list>
28
#include <mutex>
29
30
#include "common/logging.h"
31
#include "util/stopwatch.hpp"
32
33
namespace doris {
34
// Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block
35
// if the queue is empty or full, respectively.
36
template <typename T>
37
class BlockingQueue {
38
public:
39
    BlockingQueue(uint32_t max_elements)
40
38
            : _shutdown(false),
41
38
              _max_elements(max_elements),
42
38
              _total_get_wait_time(0),
43
38
              _total_put_wait_time(0),
44
38
              _get_waiting(0),
45
38
              _put_waiting(0) {}
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEEC2Ej
Line
Count
Source
40
28
            : _shutdown(false),
41
28
              _max_elements(max_elements),
42
28
              _total_get_wait_time(0),
43
28
              _total_put_wait_time(0),
44
28
              _get_waiting(0),
45
28
              _put_waiting(0) {}
_ZN5doris13BlockingQueueIiEC2Ej
Line
Count
Source
40
2
            : _shutdown(false),
41
2
              _max_elements(max_elements),
42
2
              _total_get_wait_time(0),
43
2
              _total_put_wait_time(0),
44
2
              _get_waiting(0),
45
2
              _put_waiting(0) {}
_ZN5doris13BlockingQueueIlEC2Ej
Line
Count
Source
40
1
            : _shutdown(false),
41
1
              _max_elements(max_elements),
42
1
              _total_get_wait_time(0),
43
1
              _total_put_wait_time(0),
44
1
              _get_waiting(0),
45
1
              _put_waiting(0) {}
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEEC2Ej
Line
Count
Source
40
6
            : _shutdown(false),
41
6
              _max_elements(max_elements),
42
6
              _total_get_wait_time(0),
43
6
              _total_put_wait_time(0),
44
6
              _get_waiting(0),
45
6
              _put_waiting(0) {}
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEEC2Ej
Line
Count
Source
40
1
            : _shutdown(false),
41
1
              _max_elements(max_elements),
42
1
              _total_get_wait_time(0),
43
1
              _total_put_wait_time(0),
44
1
              _get_waiting(0),
45
1
              _put_waiting(0) {}
46
47
    // Get an element from the queue, waiting indefinitely for one to become available.
48
    // Returns false if we were shut down prior to getting the element, and there
49
    // are no more elements available.
50
59.7k
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_getEPS3_
Line
Count
Source
50
141
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueIiE12blocking_getEPi
Line
Count
Source
50
59.6k
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueIlE12blocking_getEPl
Line
Count
Source
50
2
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_getEPS4_
Line
Count
Source
50
2
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_getEPS3_
Line
Count
Source
50
1
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
51
52
    // Blocking_get and blocking_put may cause deadlock,
53
    // but we still don't find root cause,
54
    // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily.
55
59.5k
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
56
59.5k
        MonotonicStopWatch timer;
57
59.5k
        timer.start();
58
59.5k
        std::unique_lock<std::mutex> unique_lock(_lock);
59
59.6k
        while (!(_shutdown || !_list.empty())) {
60
147
            ++_get_waiting;
61
147
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
62
147
                std::cv_status::timeout) {
63
5
                _get_waiting--;
64
5
            }
65
147
        }
66
59.5k
        _total_get_wait_time += timer.elapsed_time();
67
68
59.5k
        if (!_list.empty()) {
69
50.0k
            *out = _list.front();
70
50.0k
            _list.pop_front();
71
50.0k
            if (_put_waiting > 0) {
72
630
                --_put_waiting;
73
630
                unique_lock.unlock();
74
630
                _put_cv.notify_one();
75
630
            }
76
50.0k
            return true;
77
50.0k
        } else {
78
9.46k
            assert(_shutdown);
79
10.1k
            return false;
80
9.46k
        }
81
59.5k
    }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_getEPS3_l
Line
Count
Source
55
141
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
56
141
        MonotonicStopWatch timer;
57
141
        timer.start();
58
141
        std::unique_lock<std::mutex> unique_lock(_lock);
59
282
        while (!(_shutdown || !_list.empty())) {
60
141
            ++_get_waiting;
61
141
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
62
141
                std::cv_status::timeout) {
63
0
                _get_waiting--;
64
0
            }
65
141
        }
66
141
        _total_get_wait_time += timer.elapsed_time();
67
68
141
        if (!_list.empty()) {
69
29
            *out = _list.front();
70
29
            _list.pop_front();
71
29
            if (_put_waiting > 0) {
72
0
                --_put_waiting;
73
0
                unique_lock.unlock();
74
0
                _put_cv.notify_one();
75
0
            }
76
29
            return true;
77
112
        } else {
78
112
            assert(_shutdown);
79
112
            return false;
80
112
        }
81
141
    }
_ZN5doris13BlockingQueueIiE23controlled_blocking_getEPil
Line
Count
Source
55
59.3k
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
56
59.3k
        MonotonicStopWatch timer;
57
59.3k
        timer.start();
58
59.3k
        std::unique_lock<std::mutex> unique_lock(_lock);
59
59.3k
        while (!(_shutdown || !_list.empty())) {
60
0
            ++_get_waiting;
61
0
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
62
0
                std::cv_status::timeout) {
63
0
                _get_waiting--;
64
0
            }
65
0
        }
66
59.3k
        _total_get_wait_time += timer.elapsed_time();
67
68
59.3k
        if (!_list.empty()) {
69
50.0k
            *out = _list.front();
70
50.0k
            _list.pop_front();
71
50.0k
            if (_put_waiting > 0) {
72
630
                --_put_waiting;
73
630
                unique_lock.unlock();
74
630
                _put_cv.notify_one();
75
630
            }
76
50.0k
            return true;
77
50.0k
        } else {
78
9.35k
            assert(_shutdown);
79
10.0k
            return false;
80
9.35k
        }
81
59.3k
    }
_ZN5doris13BlockingQueueIlE23controlled_blocking_getEPll
Line
Count
Source
55
2
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
56
2
        MonotonicStopWatch timer;
57
2
        timer.start();
58
2
        std::unique_lock<std::mutex> unique_lock(_lock);
59
2
        while (!(_shutdown || !_list.empty())) {
60
0
            ++_get_waiting;
61
0
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
62
0
                std::cv_status::timeout) {
63
0
                _get_waiting--;
64
0
            }
65
0
        }
66
2
        _total_get_wait_time += timer.elapsed_time();
67
68
2
        if (!_list.empty()) {
69
1
            *out = _list.front();
70
1
            _list.pop_front();
71
1
            if (_put_waiting > 0) {
72
0
                --_put_waiting;
73
0
                unique_lock.unlock();
74
0
                _put_cv.notify_one();
75
0
            }
76
1
            return true;
77
1
        } else {
78
1
            assert(_shutdown);
79
1
            return false;
80
1
        }
81
2
    }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_getEPS4_l
Line
Count
Source
55
2
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
56
2
        MonotonicStopWatch timer;
57
2
        timer.start();
58
2
        std::unique_lock<std::mutex> unique_lock(_lock);
59
2
        while (!(_shutdown || !_list.empty())) {
60
0
            ++_get_waiting;
61
0
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
62
0
                std::cv_status::timeout) {
63
0
                _get_waiting--;
64
0
            }
65
0
        }
66
2
        _total_get_wait_time += timer.elapsed_time();
67
68
2
        if (!_list.empty()) {
69
2
            *out = _list.front();
70
2
            _list.pop_front();
71
2
            if (_put_waiting > 0) {
72
0
                --_put_waiting;
73
0
                unique_lock.unlock();
74
0
                _put_cv.notify_one();
75
0
            }
76
2
            return true;
77
2
        } else {
78
0
            assert(_shutdown);
79
0
            return false;
80
0
        }
81
2
    }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_getEPS3_l
Line
Count
Source
55
2
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
56
2
        MonotonicStopWatch timer;
57
2
        timer.start();
58
2
        std::unique_lock<std::mutex> unique_lock(_lock);
59
8
        while (!(_shutdown || !_list.empty())) {
60
6
            ++_get_waiting;
61
6
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
62
6
                std::cv_status::timeout) {
63
5
                _get_waiting--;
64
5
            }
65
6
        }
66
2
        _total_get_wait_time += timer.elapsed_time();
67
68
2
        if (!_list.empty()) {
69
0
            *out = _list.front();
70
0
            _list.pop_front();
71
0
            if (_put_waiting > 0) {
72
0
                --_put_waiting;
73
0
                unique_lock.unlock();
74
0
                _put_cv.notify_one();
75
0
            }
76
0
            return true;
77
2
        } else {
78
2
            assert(_shutdown);
79
2
            return false;
80
2
        }
81
2
    }
82
83
    // Puts an element into the queue, waiting indefinitely until there is space.
84
    // If the queue is shut down, returns false.
85
49.8k
    bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_putERKS3_
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_putERKS4_
Line
Count
Source
85
4
    bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueIiE12blocking_putERKi
Line
Count
Source
85
49.8k
    bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueIlE12blocking_putERKl
Line
Count
Source
85
2
    bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }
86
87
    // Blocking_get and blocking_put may cause deadlock,
88
    // but we still don't find root cause,
89
    // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily.
90
49.7k
    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
91
49.7k
        MonotonicStopWatch timer;
92
49.7k
        timer.start();
93
49.7k
        std::unique_lock<std::mutex> unique_lock(_lock);
94
50.6k
        while (!(_shutdown || _list.size() < _max_elements)) {
95
630
            ++_put_waiting;
96
630
            if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
97
630
                std::cv_status::timeout) {
98
0
                _put_waiting--;
99
0
            }
100
630
        }
101
49.7k
        _total_put_wait_time += timer.elapsed_time();
102
103
49.7k
        if (_shutdown) {
104
1
            return false;
105
1
        }
106
107
49.7k
        _list.push_back(val);
108
49.7k
        if (_get_waiting > 0) {
109
0
            --_get_waiting;
110
0
            unique_lock.unlock();
111
0
            _get_cv.notify_one();
112
0
        }
113
49.7k
        return true;
114
49.7k
    }
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_putERKS3_l
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_putERKS4_l
Line
Count
Source
90
4
    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
91
4
        MonotonicStopWatch timer;
92
4
        timer.start();
93
4
        std::unique_lock<std::mutex> unique_lock(_lock);
94
4
        while (!(_shutdown || _list.size() < _max_elements)) {
95
0
            ++_put_waiting;
96
0
            if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
97
0
                std::cv_status::timeout) {
98
0
                _put_waiting--;
99
0
            }
100
0
        }
101
4
        _total_put_wait_time += timer.elapsed_time();
102
103
4
        if (_shutdown) {
104
0
            return false;
105
0
        }
106
107
4
        _list.push_back(val);
108
4
        if (_get_waiting > 0) {
109
0
            --_get_waiting;
110
0
            unique_lock.unlock();
111
0
            _get_cv.notify_one();
112
0
        }
113
4
        return true;
114
4
    }
_ZN5doris13BlockingQueueIiE23controlled_blocking_putERKil
Line
Count
Source
90
49.7k
    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
91
49.7k
        MonotonicStopWatch timer;
92
49.7k
        timer.start();
93
49.7k
        std::unique_lock<std::mutex> unique_lock(_lock);
94
50.6k
        while (!(_shutdown || _list.size() < _max_elements)) {
95
630
            ++_put_waiting;
96
630
            if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
97
630
                std::cv_status::timeout) {
98
0
                _put_waiting--;
99
0
            }
100
630
        }
101
49.7k
        _total_put_wait_time += timer.elapsed_time();
102
103
49.7k
        if (_shutdown) {
104
0
            return false;
105
0
        }
106
107
49.7k
        _list.push_back(val);
108
49.7k
        if (_get_waiting > 0) {
109
0
            --_get_waiting;
110
0
            unique_lock.unlock();
111
0
            _get_cv.notify_one();
112
0
        }
113
49.7k
        return true;
114
49.7k
    }
_ZN5doris13BlockingQueueIlE23controlled_blocking_putERKll
Line
Count
Source
90
2
    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
91
2
        MonotonicStopWatch timer;
92
2
        timer.start();
93
2
        std::unique_lock<std::mutex> unique_lock(_lock);
94
2
        while (!(_shutdown || _list.size() < _max_elements)) {
95
0
            ++_put_waiting;
96
0
            if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
97
0
                std::cv_status::timeout) {
98
0
                _put_waiting--;
99
0
            }
100
0
        }
101
2
        _total_put_wait_time += timer.elapsed_time();
102
103
2
        if (_shutdown) {
104
1
            return false;
105
1
        }
106
107
1
        _list.push_back(val);
108
1
        if (_get_waiting > 0) {
109
0
            --_get_waiting;
110
0
            unique_lock.unlock();
111
0
            _get_cv.notify_one();
112
0
        }
113
1
        return true;
114
2
    }
Unexecuted instantiation: _ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_putERKS3_l
115
116
    // Return false if queue full or has been shutdown.
117
29
    bool try_put(const T& val) {
118
29
        if (_shutdown || _list.size() >= _max_elements) {
119
0
            return false;
120
0
        }
121
122
29
        MonotonicStopWatch timer;
123
29
        timer.start();
124
29
        std::unique_lock<std::mutex> unique_lock(_lock);
125
29
        _total_put_wait_time += timer.elapsed_time();
126
127
29
        if (_shutdown || _list.size() >= _max_elements) {
128
0
            return false;
129
0
        }
130
131
29
        _list.push_back(val);
132
29
        if (_get_waiting > 0) {
133
29
            --_get_waiting;
134
29
            unique_lock.unlock();
135
29
            _get_cv.notify_one();
136
29
        }
137
29
        return true;
138
29
    }
139
140
    // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut.
141
35
    void shutdown() {
142
35
        {
143
35
            std::lock_guard<std::mutex> guard(_lock);
144
35
            _shutdown = true;
145
35
        }
146
147
35
        _get_cv.notify_all();
148
35
        _put_cv.notify_all();
149
35
    }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8shutdownEv
Line
Count
Source
141
28
    void shutdown() {
142
28
        {
143
28
            std::lock_guard<std::mutex> guard(_lock);
144
28
            _shutdown = true;
145
28
        }
146
147
28
        _get_cv.notify_all();
148
28
        _put_cv.notify_all();
149
28
    }
_ZN5doris13BlockingQueueIlE8shutdownEv
Line
Count
Source
141
1
    void shutdown() {
142
1
        {
143
1
            std::lock_guard<std::mutex> guard(_lock);
144
1
            _shutdown = true;
145
1
        }
146
147
1
        _get_cv.notify_all();
148
1
        _put_cv.notify_all();
149
1
    }
_ZN5doris13BlockingQueueIiE8shutdownEv
Line
Count
Source
141
1
    void shutdown() {
142
1
        {
143
1
            std::lock_guard<std::mutex> guard(_lock);
144
1
            _shutdown = true;
145
1
        }
146
147
1
        _get_cv.notify_all();
148
1
        _put_cv.notify_all();
149
1
    }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8shutdownEv
Line
Count
Source
141
2
    void shutdown() {
142
2
        {
143
2
            std::lock_guard<std::mutex> guard(_lock);
144
2
            _shutdown = true;
145
2
        }
146
147
2
        _get_cv.notify_all();
148
2
        _put_cv.notify_all();
149
2
    }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE8shutdownEv
Line
Count
Source
141
3
    void shutdown() {
142
3
        {
143
3
            std::lock_guard<std::mutex> guard(_lock);
144
3
            _shutdown = true;
145
3
        }
146
147
3
        _get_cv.notify_all();
148
3
        _put_cv.notify_all();
149
3
    }
150
151
362
    uint32_t get_size() const {
152
362
        std::lock_guard<std::mutex> l(_lock);
153
362
        return static_cast<uint32_t>(_list.size());
154
362
    }
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8get_sizeEv
Line
Count
Source
151
361
    uint32_t get_size() const {
152
361
        std::lock_guard<std::mutex> l(_lock);
153
361
        return static_cast<uint32_t>(_list.size());
154
361
    }
Unexecuted instantiation: _ZNK5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8get_sizeEv
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE8get_sizeEv
Line
Count
Source
151
1
    uint32_t get_size() const {
152
1
        std::lock_guard<std::mutex> l(_lock);
153
1
        return static_cast<uint32_t>(_list.size());
154
1
    }
155
156
224
    uint32_t get_capacity() const { return _max_elements; }
157
158
    // Returns the total amount of time threads have blocked in BlockingGet.
159
224
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_get_wait_timeEv
Line
Count
Source
159
223
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_get_wait_timeEv
Line
Count
Source
159
1
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
160
161
    // Returns the total amount of time threads have blocked in BlockingPut.
162
225
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_put_wait_timeEv
Line
Count
Source
162
224
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_put_wait_timeEv
Line
Count
Source
162
1
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
163
164
private:
165
    static constexpr int64_t MAX_CV_WAIT_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour
166
    bool _shutdown;
167
    const int _max_elements;
168
    std::condition_variable _get_cv; // 'get' callers wait on this
169
    std::condition_variable _put_cv; // 'put' callers wait on this
170
    // _lock guards access to _list, total_get_wait_time, and total_put_wait_time
171
    mutable std::mutex _lock;
172
    std::list<T> _list;
173
    std::atomic<uint64_t> _total_get_wait_time;
174
    std::atomic<uint64_t> _total_put_wait_time;
175
    size_t _get_waiting;
176
    size_t _put_waiting;
177
};
178
} // namespace doris