Coverage Report

Created: 2024-11-22 12:06

/root/doris/be/src/util/blocking_queue.hpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-queue.hpp
19
// and modified by Doris
20
21
#pragma once
22
23
#include <unistd.h>
24
25
#include <atomic>
26
#include <condition_variable>
27
#include <list>
28
#include <mutex>
29
30
#include "common/logging.h"
31
#include "util/stopwatch.hpp"
32
33
namespace doris {
34
35
// Fixed capacity FIFO queue, where both BlockingGet and BlockingPut operations block
36
// if the queue is empty or full, respectively.
37
template <typename T>
38
class BlockingQueue {
39
public:
40
    BlockingQueue(size_t max_elements)
41
            : _shutdown(false),
42
              _max_elements(max_elements),
43
              _total_get_wait_time(0),
44
              _total_put_wait_time(0),
45
              _get_waiting(0),
46
38
              _put_waiting(0) {}
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEEC2Em
Line
Count
Source
46
28
              _put_waiting(0) {}
_ZN5doris13BlockingQueueIiEC2Em
Line
Count
Source
46
2
              _put_waiting(0) {}
_ZN5doris13BlockingQueueIlEC2Em
Line
Count
Source
46
1
              _put_waiting(0) {}
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEEC2Em
Line
Count
Source
46
6
              _put_waiting(0) {}
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEEC2Em
Line
Count
Source
46
1
              _put_waiting(0) {}
47
48
    // Get an element from the queue, waiting indefinitely for one to become available.
49
    // Returns false if we were shut down prior to getting the element, and there
50
    // are no more elements available.
51
59.9k
    bool blocking_get(T* out) {
52
59.9k
        MonotonicStopWatch timer;
53
59.9k
        timer.start();
54
59.9k
        std::unique_lock<std::mutex> unique_lock(_lock);
55
60.0k
        while (!(_shutdown || !_list.empty())) {
56
143
            ++_get_waiting;
57
143
            _get_cv.wait(unique_lock);
58
143
        }
59
59.9k
        _total_get_wait_time += timer.elapsed_time();
60
61
59.9k
        if (!_list.empty()) {
62
50.0k
            *out = _list.front();
63
50.0k
            _list.pop_front();
64
50.0k
            if (_put_waiting > 0) {
65
7.10k
                --_put_waiting;
66
7.10k
                unique_lock.unlock();
67
7.10k
                _put_cv.notify_one();
68
7.10k
            }
69
50.0k
            return true;
70
50.0k
        } else {
71
9.87k
            assert(_shutdown);
72
0
            return false;
73
9.87k
        }
74
59.9k
    }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_getEPS3_
Line
Count
Source
51
142
    bool blocking_get(T* out) {
52
142
        MonotonicStopWatch timer;
53
142
        timer.start();
54
142
        std::unique_lock<std::mutex> unique_lock(_lock);
55
284
        while (!(_shutdown || !_list.empty())) {
56
142
            ++_get_waiting;
57
142
            _get_cv.wait(unique_lock);
58
142
        }
59
142
        _total_get_wait_time += timer.elapsed_time();
60
61
142
        if (!_list.empty()) {
62
30
            *out = _list.front();
63
30
            _list.pop_front();
64
30
            if (_put_waiting > 0) {
65
0
                --_put_waiting;
66
0
                unique_lock.unlock();
67
0
                _put_cv.notify_one();
68
0
            }
69
30
            return true;
70
112
        } else {
71
112
            assert(_shutdown);
72
0
            return false;
73
112
        }
74
142
    }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_getEPS4_
Line
Count
Source
51
2
    bool blocking_get(T* out) {
52
2
        MonotonicStopWatch timer;
53
2
        timer.start();
54
2
        std::unique_lock<std::mutex> unique_lock(_lock);
55
2
        while (!(_shutdown || !_list.empty())) {
56
0
            ++_get_waiting;
57
0
            _get_cv.wait(unique_lock);
58
0
        }
59
2
        _total_get_wait_time += timer.elapsed_time();
60
61
2
        if (!_list.empty()) {
62
2
            *out = _list.front();
63
2
            _list.pop_front();
64
2
            if (_put_waiting > 0) {
65
0
                --_put_waiting;
66
0
                unique_lock.unlock();
67
0
                _put_cv.notify_one();
68
0
            }
69
2
            return true;
70
2
        } else {
71
0
            assert(_shutdown);
72
0
            return false;
73
0
        }
74
2
    }
_ZN5doris13BlockingQueueIiE12blocking_getEPi
Line
Count
Source
51
59.7k
    bool blocking_get(T* out) {
52
59.7k
        MonotonicStopWatch timer;
53
59.7k
        timer.start();
54
59.7k
        std::unique_lock<std::mutex> unique_lock(_lock);
55
59.7k
        while (!(_shutdown || !_list.empty())) {
56
0
            ++_get_waiting;
57
0
            _get_cv.wait(unique_lock);
58
0
        }
59
59.7k
        _total_get_wait_time += timer.elapsed_time();
60
61
59.7k
        if (!_list.empty()) {
62
50.0k
            *out = _list.front();
63
50.0k
            _list.pop_front();
64
50.0k
            if (_put_waiting > 0) {
65
7.10k
                --_put_waiting;
66
7.10k
                unique_lock.unlock();
67
7.10k
                _put_cv.notify_one();
68
7.10k
            }
69
50.0k
            return true;
70
50.0k
        } else {
71
9.75k
            assert(_shutdown);
72
0
            return false;
73
9.75k
        }
74
59.7k
    }
_ZN5doris13BlockingQueueIlE12blocking_getEPl
Line
Count
Source
51
2
    bool blocking_get(T* out) {
52
2
        MonotonicStopWatch timer;
53
2
        timer.start();
54
2
        std::unique_lock<std::mutex> unique_lock(_lock);
55
2
        while (!(_shutdown || !_list.empty())) {
56
0
            ++_get_waiting;
57
0
            _get_cv.wait(unique_lock);
58
0
        }
59
2
        _total_get_wait_time += timer.elapsed_time();
60
61
2
        if (!_list.empty()) {
62
1
            *out = _list.front();
63
1
            _list.pop_front();
64
1
            if (_put_waiting > 0) {
65
0
                --_put_waiting;
66
0
                unique_lock.unlock();
67
0
                _put_cv.notify_one();
68
0
            }
69
1
            return true;
70
1
        } else {
71
1
            assert(_shutdown);
72
0
            return false;
73
1
        }
74
2
    }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_getEPS3_
Line
Count
Source
51
2
    bool blocking_get(T* out) {
52
2
        MonotonicStopWatch timer;
53
2
        timer.start();
54
2
        std::unique_lock<std::mutex> unique_lock(_lock);
55
3
        while (!(_shutdown || !_list.empty())) {
56
1
            ++_get_waiting;
57
1
            _get_cv.wait(unique_lock);
58
1
        }
59
2
        _total_get_wait_time += timer.elapsed_time();
60
61
2
        if (!_list.empty()) {
62
0
            *out = _list.front();
63
0
            _list.pop_front();
64
0
            if (_put_waiting > 0) {
65
0
                --_put_waiting;
66
0
                unique_lock.unlock();
67
0
                _put_cv.notify_one();
68
0
            }
69
0
            return true;
70
2
        } else {
71
2
            assert(_shutdown);
72
0
            return false;
73
2
        }
74
2
    }
75
76
    // Puts an element into the queue, waiting indefinitely until there is space.
77
    // If the queue is shut down, returns false.
78
49.9k
    bool blocking_put(const T& val) {
79
49.9k
        MonotonicStopWatch timer;
80
49.9k
        timer.start();
81
49.9k
        std::unique_lock<std::mutex> unique_lock(_lock);
82
57.1k
        while (!(_shutdown || _list.size() < _max_elements)) {
83
7.10k
            ++_put_waiting;
84
7.10k
            _put_cv.wait(unique_lock);
85
7.10k
        }
86
49.9k
        _total_put_wait_time += timer.elapsed_time();
87
88
49.9k
        if (_shutdown) {
89
1
            return false;
90
1
        }
91
92
49.9k
        _list.push_back(val);
93
49.9k
        if (_get_waiting > 0) {
94
0
            --_get_waiting;
95
0
            unique_lock.unlock();
96
0
            _get_cv.notify_one();
97
0
        }
98
49.9k
        return true;
99
49.9k
    }
Unexecuted instantiation: _ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE12blocking_putERKS3_
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE12blocking_putERKS4_
Line
Count
Source
78
4
    bool blocking_put(const T& val) {
79
4
        MonotonicStopWatch timer;
80
4
        timer.start();
81
4
        std::unique_lock<std::mutex> unique_lock(_lock);
82
4
        while (!(_shutdown || _list.size() < _max_elements)) {
83
0
            ++_put_waiting;
84
0
            _put_cv.wait(unique_lock);
85
0
        }
86
4
        _total_put_wait_time += timer.elapsed_time();
87
88
4
        if (_shutdown) {
89
0
            return false;
90
0
        }
91
92
4
        _list.push_back(val);
93
4
        if (_get_waiting > 0) {
94
0
            --_get_waiting;
95
0
            unique_lock.unlock();
96
0
            _get_cv.notify_one();
97
0
        }
98
4
        return true;
99
4
    }
_ZN5doris13BlockingQueueIiE12blocking_putERKi
Line
Count
Source
78
49.8k
    bool blocking_put(const T& val) {
79
49.8k
        MonotonicStopWatch timer;
80
49.8k
        timer.start();
81
49.8k
        std::unique_lock<std::mutex> unique_lock(_lock);
82
57.1k
        while (!(_shutdown || _list.size() < _max_elements)) {
83
7.10k
            ++_put_waiting;
84
7.10k
            _put_cv.wait(unique_lock);
85
7.10k
        }
86
49.8k
        _total_put_wait_time += timer.elapsed_time();
87
88
49.8k
        if (_shutdown) {
89
0
            return false;
90
0
        }
91
92
49.8k
        _list.push_back(val);
93
49.8k
        if (_get_waiting > 0) {
94
0
            --_get_waiting;
95
0
            unique_lock.unlock();
96
0
            _get_cv.notify_one();
97
0
        }
98
49.8k
        return true;
99
49.8k
    }
_ZN5doris13BlockingQueueIlE12blocking_putERKl
Line
Count
Source
78
2
    bool blocking_put(const T& val) {
79
2
        MonotonicStopWatch timer;
80
2
        timer.start();
81
2
        std::unique_lock<std::mutex> unique_lock(_lock);
82
2
        while (!(_shutdown || _list.size() < _max_elements)) {
83
0
            ++_put_waiting;
84
0
            _put_cv.wait(unique_lock);
85
0
        }
86
2
        _total_put_wait_time += timer.elapsed_time();
87
88
2
        if (_shutdown) {
89
1
            return false;
90
1
        }
91
92
1
        _list.push_back(val);
93
1
        if (_get_waiting > 0) {
94
0
            --_get_waiting;
95
0
            unique_lock.unlock();
96
0
            _get_cv.notify_one();
97
0
        }
98
1
        return true;
99
2
    }
Unexecuted instantiation: _ZN5doris13BlockingQueueIPN7RdKafka7MessageEE12blocking_putERKS3_
100
101
    // Return false if queue full or has been shutdown.
102
30
    bool try_put(const T& val) {
103
30
        if (_shutdown || _list.size() >= _max_elements) {
104
0
            return false;
105
0
        }
106
107
30
        MonotonicStopWatch timer;
108
30
        timer.start();
109
30
        std::unique_lock<std::mutex> unique_lock(_lock);
110
30
        _total_put_wait_time += timer.elapsed_time();
111
112
30
        if (_shutdown || _list.size() >= _max_elements) {
113
0
            return false;
114
0
        }
115
116
30
        _list.push_back(val);
117
30
        if (_get_waiting > 0) {
118
30
            --_get_waiting;
119
30
            unique_lock.unlock();
120
30
            _get_cv.notify_one();
121
30
        }
122
30
        return true;
123
30
    }
124
125
    // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut.
126
35
    void shutdown() {
127
35
        {
128
35
            std::lock_guard<std::mutex> guard(_lock);
129
35
            _shutdown = true;
130
35
        }
131
132
35
        _get_cv.notify_all();
133
35
        _put_cv.notify_all();
134
35
    }
_ZN5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8shutdownEv
Line
Count
Source
126
28
    void shutdown() {
127
28
        {
128
28
            std::lock_guard<std::mutex> guard(_lock);
129
28
            _shutdown = true;
130
28
        }
131
132
28
        _get_cv.notify_all();
133
28
        _put_cv.notify_all();
134
28
    }
_ZN5doris13BlockingQueueIiE8shutdownEv
Line
Count
Source
126
1
    void shutdown() {
127
1
        {
128
1
            std::lock_guard<std::mutex> guard(_lock);
129
1
            _shutdown = true;
130
1
        }
131
132
1
        _get_cv.notify_all();
133
1
        _put_cv.notify_all();
134
1
    }
_ZN5doris13BlockingQueueIlE8shutdownEv
Line
Count
Source
126
1
    void shutdown() {
127
1
        {
128
1
            std::lock_guard<std::mutex> guard(_lock);
129
1
            _shutdown = true;
130
1
        }
131
132
1
        _get_cv.notify_all();
133
1
        _put_cv.notify_all();
134
1
    }
_ZN5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8shutdownEv
Line
Count
Source
126
2
    void shutdown() {
127
2
        {
128
2
            std::lock_guard<std::mutex> guard(_lock);
129
2
            _shutdown = true;
130
2
        }
131
132
2
        _get_cv.notify_all();
133
2
        _put_cv.notify_all();
134
2
    }
_ZN5doris13BlockingQueueIPN7RdKafka7MessageEE8shutdownEv
Line
Count
Source
126
3
    void shutdown() {
127
3
        {
128
3
            std::lock_guard<std::mutex> guard(_lock);
129
3
            _shutdown = true;
130
3
        }
131
132
3
        _get_cv.notify_all();
133
3
        _put_cv.notify_all();
134
3
    }
135
136
143
    uint32_t get_size() const {
137
143
        std::lock_guard<std::mutex> l(_lock);
138
143
        return _list.size();
139
143
    }
_ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE8get_sizeEv
Line
Count
Source
136
142
    uint32_t get_size() const {
137
142
        std::lock_guard<std::mutex> l(_lock);
138
142
        return _list.size();
139
142
    }
Unexecuted instantiation: _ZNK5doris13BlockingQueueISt10shared_ptrIN5arrow11RecordBatchEEE8get_sizeEv
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE8get_sizeEv
Line
Count
Source
136
1
    uint32_t get_size() const {
137
1
        std::lock_guard<std::mutex> l(_lock);
138
1
        return _list.size();
139
1
    }
140
141
0
    uint32_t get_capacity() const { return _max_elements; }
142
143
    // Returns the total amount of time threads have blocked in BlockingGet.
144
1
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_get_wait_timeEv
Line
Count
Source
144
1
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
Unexecuted instantiation: _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_get_wait_timeEv
145
146
    // Returns the total amount of time threads have blocked in BlockingPut.
147
1
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
_ZNK5doris13BlockingQueueIPN7RdKafka7MessageEE19total_put_wait_timeEv
Line
Count
Source
147
1
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
Unexecuted instantiation: _ZNK5doris13BlockingQueueINS_14WorkThreadPoolILb0EE4TaskEE19total_put_wait_timeEv
148
149
private:
150
    bool _shutdown;
151
    const int _max_elements;
152
    std::condition_variable _get_cv; // 'get' callers wait on this
153
    std::condition_variable _put_cv; // 'put' callers wait on this
154
    // _lock guards access to _list, total_get_wait_time, and total_put_wait_time
155
    mutable std::mutex _lock;
156
    std::list<T> _list;
157
    std::atomic<uint64_t> _total_get_wait_time;
158
    std::atomic<uint64_t> _total_put_wait_time;
159
    size_t _get_waiting;
160
    size_t _put_waiting;
161
};
162
163
} // namespace doris