Coverage Report

Created: 2026-03-19 17:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/blocking_queue.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-queue.hpp
19
// and modified by Doris
20
21
#pragma once
22
23
#include <unistd.h>
24
25
#include <atomic>
26
#include <condition_variable>
27
#include <list>
28
#include <mutex>
29
30
#include "common/logging.h"
31
#include "util/stopwatch.hpp"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
// Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block
36
// if the queue is empty or full, respectively.
37
template <typename T>
38
class BlockingQueue {
39
public:
40
    BlockingQueue(uint32_t max_elements)
41
130
            : _shutdown(false),
42
130
              _max_elements(max_elements),
43
130
              _total_get_wait_time(0),
44
130
              _total_put_wait_time(0),
45
130
              _get_waiting(0),
46
130
              _put_waiting(0) {}
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEEC2Ej
Line
Count
Source
41
9
            : _shutdown(false),
42
9
              _max_elements(max_elements),
43
9
              _total_get_wait_time(0),
44
9
              _total_put_wait_time(0),
45
9
              _get_waiting(0),
46
9
              _put_waiting(0) {}
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEEC2Ej
Line
Count
Source
41
49
            : _shutdown(false),
42
49
              _max_elements(max_elements),
43
49
              _total_get_wait_time(0),
44
49
              _total_put_wait_time(0),
45
49
              _get_waiting(0),
46
49
              _put_waiting(0) {}
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEEC2Ej
Line
Count
Source
41
72
            : _shutdown(false),
42
72
              _max_elements(max_elements),
43
72
              _total_get_wait_time(0),
44
72
              _total_put_wait_time(0),
45
72
              _get_waiting(0),
46
72
              _put_waiting(0) {}
47
48
    // Get an element from the queue, waiting indefinitely for one to become available.
49
    // Returns false if we were shut down prior to getting the element, and there
50
    // are no more elements available.
51
890k
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_getEPS4_
Line
Count
Source
51
7
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_getEPS3_
Line
Count
Source
51
890k
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_getEPS3_
Line
Count
Source
51
72
    bool blocking_get(T* out) { return controlled_blocking_get(out, MAX_CV_WAIT_TIMEOUT_MS); }
52
53
    // Blocking_get and blocking_put may cause deadlock,
54
    // but we still don't find root cause,
55
    // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily.
56
891k
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
57
891k
        MonotonicStopWatch timer;
58
891k
        timer.start();
59
891k
        std::unique_lock<std::mutex> unique_lock(_lock);
60
1.78M
        while (!(_shutdown || !_list.empty())) {
61
894k
            ++_get_waiting;
62
894k
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
63
894k
                std::cv_status::timeout) {
64
1.44k
                _get_waiting--;
65
1.44k
            }
66
894k
        }
67
891k
        _total_get_wait_time += timer.elapsed_time();
68
69
891k
        if (!_list.empty()) {
70
887k
            *out = _list.front();
71
887k
            _list.pop_front();
72
887k
            if (_put_waiting > 0) {
73
0
                --_put_waiting;
74
0
                unique_lock.unlock();
75
0
                _put_cv.notify_one();
76
0
            }
77
887k
            return true;
78
887k
        } else {
79
3.04k
            assert(_shutdown);
80
2.56k
            return false;
81
3.04k
        }
82
891k
    }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_getEPS4_l
Line
Count
Source
56
7
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
57
7
        MonotonicStopWatch timer;
58
7
        timer.start();
59
7
        std::unique_lock<std::mutex> unique_lock(_lock);
60
10
        while (!(_shutdown || !_list.empty())) {
61
3
            ++_get_waiting;
62
3
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
63
3
                std::cv_status::timeout) {
64
0
                _get_waiting--;
65
0
            }
66
3
        }
67
7
        _total_get_wait_time += timer.elapsed_time();
68
69
7
        if (!_list.empty()) {
70
7
            *out = _list.front();
71
7
            _list.pop_front();
72
7
            if (_put_waiting > 0) {
73
0
                --_put_waiting;
74
0
                unique_lock.unlock();
75
0
                _put_cv.notify_one();
76
0
            }
77
7
            return true;
78
7
        } else {
79
0
            assert(_shutdown);
80
0
            return false;
81
0
        }
82
7
    }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_getEPS3_l
Line
Count
Source
56
890k
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
57
890k
        MonotonicStopWatch timer;
58
890k
        timer.start();
59
890k
        std::unique_lock<std::mutex> unique_lock(_lock);
60
1.78M
        while (!(_shutdown || !_list.empty())) {
61
894k
            ++_get_waiting;
62
894k
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
63
894k
                std::cv_status::timeout) {
64
1.43k
                _get_waiting--;
65
1.43k
            }
66
894k
        }
67
890k
        _total_get_wait_time += timer.elapsed_time();
68
69
890k
        if (!_list.empty()) {
70
887k
            *out = _list.front();
71
887k
            _list.pop_front();
72
887k
            if (_put_waiting > 0) {
73
0
                --_put_waiting;
74
0
                unique_lock.unlock();
75
0
                _put_cv.notify_one();
76
0
            }
77
887k
            return true;
78
887k
        } else {
79
2.89k
            assert(_shutdown);
80
2.41k
            return false;
81
2.89k
        }
82
890k
    }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_getEPS3_l
Line
Count
Source
56
880
    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
57
880
        MonotonicStopWatch timer;
58
880
        timer.start();
59
880
        std::unique_lock<std::mutex> unique_lock(_lock);
60
1.08k
        while (!(_shutdown || !_list.empty())) {
61
208
            ++_get_waiting;
62
208
            if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
63
208
                std::cv_status::timeout) {
64
5
                _get_waiting--;
65
5
            }
66
208
        }
67
880
        _total_get_wait_time += timer.elapsed_time();
68
69
880
        if (!_list.empty()) {
70
736
            *out = _list.front();
71
736
            _list.pop_front();
72
736
            if (_put_waiting > 0) {
73
0
                --_put_waiting;
74
0
                unique_lock.unlock();
75
0
                _put_cv.notify_one();
76
0
            }
77
736
            return true;
78
736
        } else {
79
144
            assert(_shutdown);
80
144
            return false;
81
144
        }
82
880
    }
83
84
    // Puts an element into the queue, waiting indefinitely until there is space.
85
    // If the queue is shut down, returns false.
86
12
    bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_putERKS4_
Line
Count
Source
86
12
    bool blocking_put(const T& val) { return controlled_blocking_put(val, MAX_CV_WAIT_TIMEOUT_MS); }
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_putERKS3_
87
88
    // Blocking_get and blocking_put may cause deadlock,
89
    // but we still don't find root cause,
90
    // introduce condition variable wait timeout to avoid blocking queue deadlock temporarily.
91
748
    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
92
748
        MonotonicStopWatch timer;
93
748
        timer.start();
94
748
        std::unique_lock<std::mutex> unique_lock(_lock);
95
748
        while (!(_shutdown || _list.size() < _max_elements)) {
96
0
            ++_put_waiting;
97
0
            if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
98
0
                std::cv_status::timeout) {
99
0
                _put_waiting--;
100
0
            }
101
0
        }
102
748
        _total_put_wait_time += timer.elapsed_time();
103
104
748
        if (_shutdown) {
105
0
            return false;
106
0
        }
107
108
748
        _list.push_back(val);
109
748
        if (_get_waiting > 0) {
110
139
            --_get_waiting;
111
139
            unique_lock.unlock();
112
139
            _get_cv.notify_one();
113
139
        }
114
748
        return true;
115
748
    }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE23controlled_blocking_putERKS4_l
Line
Count
Source
91
12
    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
92
12
        MonotonicStopWatch timer;
93
12
        timer.start();
94
12
        std::unique_lock<std::mutex> unique_lock(_lock);
95
12
        while (!(_shutdown || _list.size() < _max_elements)) {
96
0
            ++_put_waiting;
97
0
            if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
98
0
                std::cv_status::timeout) {
99
0
                _put_waiting--;
100
0
            }
101
0
        }
102
12
        _total_put_wait_time += timer.elapsed_time();
103
104
12
        if (_shutdown) {
105
0
            return false;
106
0
        }
107
108
12
        _list.push_back(val);
109
12
        if (_get_waiting > 0) {
110
3
            --_get_waiting;
111
3
            unique_lock.unlock();
112
3
            _get_cv.notify_one();
113
3
        }
114
12
        return true;
115
12
    }
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE23controlled_blocking_putERKS3_l
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE23controlled_blocking_putERKS3_l
Line
Count
Source
91
736
    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
92
736
        MonotonicStopWatch timer;
93
736
        timer.start();
94
736
        std::unique_lock<std::mutex> unique_lock(_lock);
95
736
        while (!(_shutdown || _list.size() < _max_elements)) {
96
0
            ++_put_waiting;
97
0
            if (_put_cv.wait_for(unique_lock, std::chrono::milliseconds(cv_wait_timeout_ms)) ==
98
0
                std::cv_status::timeout) {
99
0
                _put_waiting--;
100
0
            }
101
0
        }
102
736
        _total_put_wait_time += timer.elapsed_time();
103
104
736
        if (_shutdown) {
105
0
            return false;
106
0
        }
107
108
736
        _list.push_back(val);
109
736
        if (_get_waiting > 0) {
110
136
            --_get_waiting;
111
136
            unique_lock.unlock();
112
136
            _get_cv.notify_one();
113
136
        }
114
736
        return true;
115
736
    }
116
117
    // Return false if queue full or has been shutdown.
118
885k
    bool try_put(const T& val) {
119
886k
        if (_shutdown || _list.size() >= _max_elements) {
120
0
            return false;
121
0
        }
122
123
885k
        MonotonicStopWatch timer;
124
885k
        timer.start();
125
885k
        std::unique_lock<std::mutex> unique_lock(_lock);
126
885k
        _total_put_wait_time += timer.elapsed_time();
127
128
887k
        if (_shutdown || _list.size() >= _max_elements) {
129
0
            return false;
130
0
        }
131
132
885k
        _list.push_back(val);
133
887k
        if (_get_waiting > 0) {
134
887k
            --_get_waiting;
135
887k
            unique_lock.unlock();
136
887k
            _get_cv.notify_one();
137
887k
        }
138
885k
        return true;
139
885k
    }
140
141
    // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut.
142
258
    void shutdown() {
143
258
        {
144
258
            std::lock_guard<std::mutex> guard(_lock);
145
258
            _shutdown = true;
146
258
        }
147
148
258
        _get_cv.notify_all();
149
258
        _put_cv.notify_all();
150
258
    }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8shutdownEv
Line
Count
Source
142
5
    void shutdown() {
143
5
        {
144
5
            std::lock_guard<std::mutex> guard(_lock);
145
5
            _shutdown = true;
146
5
        }
147
148
5
        _get_cv.notify_all();
149
5
        _put_cv.notify_all();
150
5
    }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8shutdownEv
Line
Count
Source
142
37
    void shutdown() {
143
37
        {
144
37
            std::lock_guard<std::mutex> guard(_lock);
145
37
            _shutdown = true;
146
37
        }
147
148
37
        _get_cv.notify_all();
149
37
        _put_cv.notify_all();
150
37
    }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE8shutdownEv
Line
Count
Source
142
216
    void shutdown() {
143
216
        {
144
216
            std::lock_guard<std::mutex> guard(_lock);
145
216
            _shutdown = true;
146
216
        }
147
148
216
        _get_cv.notify_all();
149
216
        _put_cv.notify_all();
150
216
    }
151
152
899k
    uint32_t get_size() const {
153
899k
        std::lock_guard<std::mutex> l(_lock);
154
899k
        return static_cast<uint32_t>(_list.size());
155
899k
    }
_ZNK5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8get_sizeEv
Line
Count
Source
152
8
    uint32_t get_size() const {
153
8
        std::lock_guard<std::mutex> l(_lock);
154
8
        return static_cast<uint32_t>(_list.size());
155
8
    }
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8get_sizeEv
Line
Count
Source
152
899k
    uint32_t get_size() const {
153
899k
        std::lock_guard<std::mutex> l(_lock);
154
899k
        return static_cast<uint32_t>(_list.size());
155
899k
    }
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE8get_sizeEv
Line
Count
Source
152
72
    uint32_t get_size() const {
153
72
        std::lock_guard<std::mutex> l(_lock);
154
72
        return static_cast<uint32_t>(_list.size());
155
72
    }
156
157
7.90k
    uint32_t get_capacity() const { return _max_elements; }
158
159
    // Returns the total amount of time threads have blocked in BlockingGet.
160
7.97k
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_get_wait_timeEv
Line
Count
Source
160
7.90k
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_get_wait_timeEv
Line
Count
Source
160
72
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
161
162
    // Returns the total amount of time threads have blocked in BlockingPut.
163
7.97k
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_put_wait_timeEv
Line
Count
Source
163
7.90k
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_put_wait_timeEv
Line
Count
Source
163
72
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
164
165
private:
166
    static constexpr int64_t MAX_CV_WAIT_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour
167
    bool _shutdown;
168
    const int _max_elements;
169
    std::condition_variable _get_cv; // 'get' callers wait on this
170
    std::condition_variable _put_cv; // 'put' callers wait on this
171
    // _lock guards access to _list, total_get_wait_time, and total_put_wait_time
172
    mutable std::mutex _lock;
173
    std::list<T> _list;
174
    std::atomic<uint64_t> _total_get_wait_time;
175
    std::atomic<uint64_t> _total_put_wait_time;
176
    size_t _get_waiting;
177
    size_t _put_waiting;
178
};
179
#include "common/compile_check_end.h"
180
} // namespace doris