Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/blocking_priority_queue.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-priority-queue.hpp
19
// and modified by Doris
20
21
#pragma once
22
23
#include <unistd.h>
24
25
#include <cassert>
26
#include <condition_variable>
27
#include <cstdint>
28
#include <mutex>
29
#include <queue>
30
31
#include "common/config.h"
32
#include "util/stopwatch.hpp"
33
34
namespace doris {
35
#include "common/compile_check_begin.h"
36
// Fixed capacity FIFO queue, where both blocking_get and blocking_put operations block
37
// if the queue is empty or full, respectively.
38
template <typename T>
39
class BlockingPriorityQueue {
40
public:
41
    BlockingPriorityQueue(uint32_t max_elements)
42
119
            : _shutdown(false),
43
119
              _max_element(max_elements),
44
119
              _upgrade_counter(0),
45
119
              _total_get_wait_time(0),
46
119
              _total_put_wait_time(0),
47
119
              _get_waiting(0),
48
119
              _put_waiting(0) {}
49
50
    // Get an element from the queue, waiting indefinitely (or until timeout) for one to become available.
51
    // Returns false if we were shut down prior to getting the element, and there
52
    // are no more elements available.
53
    // -- timeout_ms: 0 means wait indefinitely
54
123k
    bool blocking_get(T* out, uint32_t timeout_ms = 0) {
55
123k
        MonotonicStopWatch timer;
56
123k
        timer.start();
57
123k
        std::unique_lock unique_lock(_lock);
58
123k
        bool wait_successful = false;
59
123k
        if (timeout_ms > 0) {
60
0
            while (!(_shutdown || !_queue.empty())) {
61
0
                ++_get_waiting;
62
0
                if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) ==
63
0
                    std::cv_status::timeout) {
64
                    // timeout
65
0
                    wait_successful = _shutdown || !_queue.empty();
66
0
                    break;
67
0
                }
68
0
            }
69
123k
        } else {
70
123k
            while (!(_shutdown || !_queue.empty())) {
71
308
                ++_get_waiting;
72
308
                _get_cv.wait(unique_lock);
73
308
            }
74
123k
            wait_successful = true;
75
123k
        }
76
123k
        _total_get_wait_time += timer.elapsed_time();
77
123k
        if (wait_successful) {
78
123k
            if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) {
79
238
                std::priority_queue<T> tmp_queue;
80
476
                while (!_queue.empty()) {
81
238
                    T v = _queue.top();
82
238
                    _queue.pop();
83
238
                    ++v;
84
238
                    tmp_queue.push(v);
85
238
                }
86
238
                swap(_queue, tmp_queue);
87
238
                _upgrade_counter = 0;
88
238
            }
89
123k
            if (!_queue.empty()) {
90
122k
                *out = _queue.top();
91
122k
                _queue.pop();
92
122k
                ++_upgrade_counter;
93
122k
                if (_put_waiting > 0) {
94
0
                    --_put_waiting;
95
0
                    unique_lock.unlock();
96
0
                    _put_cv.notify_one();
97
0
                }
98
122k
                return true;
99
122k
            } else {
100
202
                assert(_shutdown);
101
202
                return false;
102
202
            }
103
123k
        } else {
104
            //time out
105
35
            assert(!_shutdown);
106
0
            return false;
107
35
        }
108
123k
    }
109
110
    bool non_blocking_get(T* out) {
111
        MonotonicStopWatch timer;
112
        timer.start();
113
        std::unique_lock unique_lock(_lock);
114
115
        if (!_queue.empty()) {
116
            // 定期提高队列中残留的任务优先级
117
            // 保证优先级较低的大查询不至于完全饿死
118
            if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) {
119
                std::priority_queue<T> tmp_queue;
120
                while (!_queue.empty()) {
121
                    T v = _queue.top();
122
                    _queue.pop();
123
                    ++v;
124
                    tmp_queue.push(v);
125
                }
126
                swap(_queue, tmp_queue);
127
                _upgrade_counter = 0;
128
            }
129
            *out = _queue.top();
130
            _queue.pop();
131
            ++_upgrade_counter;
132
            _total_get_wait_time += timer.elapsed_time();
133
            if (_put_waiting > 0) {
134
                --_put_waiting;
135
                unique_lock.unlock();
136
                _put_cv.notify_one();
137
            }
138
            return true;
139
        }
140
141
        return false;
142
    }
143
144
    // Puts an element into the queue, waiting indefinitely until there is space.
145
    // If the queue is shut down, returns false.
146
122k
    bool blocking_put(const T& val) {
147
122k
        MonotonicStopWatch timer;
148
122k
        timer.start();
149
122k
        std::unique_lock unique_lock(_lock);
150
122k
        while (!(_shutdown || _queue.size() < _max_element)) {
151
0
            ++_put_waiting;
152
0
            _put_cv.wait(unique_lock);
153
0
        }
154
122k
        _total_put_wait_time += timer.elapsed_time();
155
156
122k
        if (_shutdown) {
157
0
            return false;
158
0
        }
159
160
122k
        _queue.push(val);
161
122k
        if (_get_waiting > 0) {
162
71
            --_get_waiting;
163
71
            unique_lock.unlock();
164
71
            _get_cv.notify_one();
165
71
        }
166
122k
        return true;
167
122k
    }
168
169
    // Return false if queue full or has been shutdown.
170
0
    bool try_put(const T& val) {
171
0
        std::unique_lock unique_lock(_lock);
172
0
        if (_queue.size() < _max_element && !_shutdown) {
173
0
            _queue.push(val);
174
0
            if (_get_waiting > 0) {
175
0
                --_get_waiting;
176
0
                unique_lock.unlock();
177
0
                _get_cv.notify_one();
178
0
            }
179
0
            return true;
180
0
        }
181
0
        return false;
182
0
    }
183
184
    // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put.
185
185
    void shutdown() {
186
185
        {
187
185
            std::lock_guard l(_lock);
188
185
            _shutdown = true;
189
185
        }
190
185
        _get_cv.notify_all();
191
185
        _put_cv.notify_all();
192
185
    }
193
194
123k
    uint32_t get_size() const {
195
123k
        std::lock_guard l(_lock);
196
123k
        return static_cast<uint32_t>(_queue.size());
197
123k
    }
198
199
438
    uint32_t get_capacity() const { return _max_element; }
200
201
    // Returns the total amount of time threads have blocked in blocking_get.
202
439
    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
203
204
    // Returns the total amount of time threads have blocked in blocking_put.
205
439
    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
206
207
private:
208
    bool _shutdown;
209
    const int _max_element;
210
    std::condition_variable _get_cv; // 'get' callers wait on this
211
    std::condition_variable _put_cv; // 'put' callers wait on this
212
    // _lock guards access to _queue, total_get_wait_time, and total_put_wait_time
213
    mutable std::mutex _lock;
214
    std::priority_queue<T> _queue;
215
    int _upgrade_counter;
216
    std::atomic<uint64_t> _total_get_wait_time;
217
    std::atomic<uint64_t> _total_put_wait_time;
218
    size_t _get_waiting;
219
    size_t _put_waiting;
220
};
221
#include "common/compile_check_end.h"
222
} // namespace doris