Coverage Report

Created: 2026-05-20 07:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/work_thread_pool.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <mutex>
21
22
#include "util/blocking_priority_queue.hpp"
23
#include "util/blocking_queue.hpp"
24
#include "util/thread.h"
25
#include "util/thread_group.h"
26
27
namespace doris {
28
29
// Simple threadpool which processes items (of type T) in parallel which were placed on a
30
// blocking queue by Offer(). Each item is processed by a single user-supplied method.
31
template <bool Priority = false>
32
class WorkThreadPool {
33
public:
34
    // Signature of a work-processing function. Takes the integer id of the thread which is
35
    // calling it (ids run from 0 to num_threads - 1) and a reference to the item to
36
    // process.
37
    using WorkFunction = std::function<void()>;
38
39
    struct Task {
40
    public:
41
        int priority;
42
        WorkFunction work_function;
43
0
        bool operator<(const Task& o) const { return priority < o.priority; }
44
45
234
        Task& operator++() {
46
234
            priority += 2;
47
234
            return *this;
48
234
        }
49
    };
50
51
    using WorkQueue =
52
            std::conditional_t<Priority, BlockingPriorityQueue<Task>, BlockingQueue<Task>>;
53
54
    // Creates a new thread pool and start num_threads threads.
55
    //  -- num_threads: how many threads are part of this pool
56
    //  -- queue_size: the maximum size of the queue on which work items are offered. If the
57
    //     queue exceeds this size, subsequent calls to Offer will block until there is
58
    //     capacity available.
59
    WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name)
60
159
            : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
61
5.10k
        for (int i = 0; i < num_threads; ++i) {
62
4.94k
            _threads.create_thread(
63
4.94k
                    std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i));
64
4.94k
        }
65
159
    }
_ZN5doris14WorkThreadPoolILb1EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Line
Count
Source
60
113
            : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
61
340
        for (int i = 0; i < num_threads; ++i) {
62
227
            _threads.create_thread(
63
227
                    std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i));
64
227
        }
65
113
    }
_ZN5doris14WorkThreadPoolILb0EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Line
Count
Source
60
46
            : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
61
4.76k
        for (int i = 0; i < num_threads; ++i) {
62
4.72k
            _threads.create_thread(
63
4.72k
                    std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i));
64
4.72k
        }
65
46
    }
66
67
    // Destructor ensures that all threads are terminated before this object is freed
68
    // (otherwise they may continue to run and reference member variables)
69
124
    virtual ~WorkThreadPool() {
70
124
        shutdown();
71
124
        join();
72
124
    }
_ZN5doris14WorkThreadPoolILb1EED2Ev
Line
Count
Source
69
90
    virtual ~WorkThreadPool() {
70
90
        shutdown();
71
90
        join();
72
90
    }
_ZN5doris14WorkThreadPoolILb0EED2Ev
Line
Count
Source
69
34
    virtual ~WorkThreadPool() {
70
34
        shutdown();
71
34
        join();
72
34
    }
73
74
    // Blocking operation that puts a work item on the queue. If the queue is full, blocks
75
    // until there is capacity available.
76
    //
77
    // 'work' is copied into the work queue, but may be referenced at any time in the
78
    // future. Therefore the caller needs to ensure that any data referenced by work (if T
79
    // is, e.g., a pointer type) remains valid until work has been processed, and it's up to
80
    // the caller to provide their own signalling mechanism to detect this (or to wait until
81
    // after DrainAndshutdown returns).
82
    //
83
    // Returns true if the work item was successfully added to the queue, false otherwise
84
    // (which typically means that the thread pool has already been shut down).
85
3
    virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
_ZN5doris14WorkThreadPoolILb1EE5offerENS1_4TaskE
Line
Count
Source
85
3
    virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerENS1_4TaskE
86
87
120k
    virtual bool offer(WorkFunction func) {
88
120k
        WorkThreadPool::Task task = {0, func};
89
120k
        return _work_queue.blocking_put(task);
90
120k
    }
_ZN5doris14WorkThreadPoolILb1EE5offerESt8functionIFvvEE
Line
Count
Source
87
120k
    virtual bool offer(WorkFunction func) {
88
120k
        WorkThreadPool::Task task = {0, func};
89
120k
        return _work_queue.blocking_put(task);
90
120k
    }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerESt8functionIFvvEE
91
92
728k
    virtual bool try_offer(WorkFunction func) {
93
728k
        WorkThreadPool::Task task = {0, func};
94
728k
        return _work_queue.try_put(task);
95
728k
    }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE9try_offerESt8functionIFvvEE
_ZN5doris14WorkThreadPoolILb0EE9try_offerESt8functionIFvvEE
Line
Count
Source
92
728k
    virtual bool try_offer(WorkFunction func) {
93
728k
        WorkThreadPool::Task task = {0, func};
94
728k
        return _work_queue.try_put(task);
95
728k
    }
96
97
    // Shuts the thread pool down, causing the work queue to cease accepting offered work
98
    // and the worker threads to terminate once they have processed their current work item.
99
    // Returns once the shutdown flag has been set, does not wait for the threads to
100
    // terminate.
101
207
    virtual void shutdown() {
102
207
        _shutdown = true;
103
207
        _work_queue.shutdown();
104
207
    }
_ZN5doris14WorkThreadPoolILb1EE8shutdownEv
Line
Count
Source
101
173
    virtual void shutdown() {
102
173
        _shutdown = true;
103
173
        _work_queue.shutdown();
104
173
    }
_ZN5doris14WorkThreadPoolILb0EE8shutdownEv
Line
Count
Source
101
34
    virtual void shutdown() {
102
34
        _shutdown = true;
103
34
        _work_queue.shutdown();
104
34
    }
105
106
    // Blocks until all threads are finished. shutdown does not need to have been called,
107
    // since it may be called on a separate thread.
108
195
    virtual void join() { static_cast<void>(_threads.join_all()); }
_ZN5doris14WorkThreadPoolILb1EE4joinEv
Line
Count
Source
108
161
    virtual void join() { static_cast<void>(_threads.join_all()); }
_ZN5doris14WorkThreadPoolILb0EE4joinEv
Line
Count
Source
108
34
    virtual void join() { static_cast<void>(_threads.join_all()); }
109
110
8.50k
    virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
_ZNK5doris14WorkThreadPoolILb1EE14get_queue_sizeEv
Line
Count
Source
110
419
    virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
_ZNK5doris14WorkThreadPoolILb0EE14get_queue_sizeEv
Line
Count
Source
110
8.08k
    virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
111
1.71k
    virtual uint32_t get_active_threads() const { return _active_threads; }
Unexecuted instantiation: _ZNK5doris14WorkThreadPoolILb1EE18get_active_threadsEv
_ZNK5doris14WorkThreadPoolILb0EE18get_active_threadsEv
Line
Count
Source
111
1.71k
    virtual uint32_t get_active_threads() const { return _active_threads; }
112
113
    // Blocks until the work queue is empty, and then calls shutdown to stop the worker
114
    // threads and Join to wait until they are finished.
115
    // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
116
0
    virtual void drain_and_shutdown() {
117
0
        {
118
0
            std::unique_lock l(_lock);
119
0
            while (_work_queue.get_size() != 0) {
120
0
                _empty_cv.wait(l);
121
0
            }
122
0
        }
123
0
        shutdown();
124
0
        join();
125
0
    }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE18drain_and_shutdownEv
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE18drain_and_shutdownEv
126
127
6.78k
    std::string get_info() const {
128
6.78k
        return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
129
6.78k
               fmt::format(
130
6.78k
                       "(name={}, queue_size={}/{}, active_thread={}/{}, "
131
6.78k
                       "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})",
132
6.78k
                       _name, get_queue_size(), _work_queue.get_capacity(), _active_threads,
133
6.78k
                       _threads.size(), _work_queue.total_get_wait_time(),
134
6.78k
                       _work_queue.total_put_wait_time(), is_shutdown());
135
6.78k
    }
_ZNK5doris14WorkThreadPoolILb1EE8get_infoB5cxx11Ev
Line
Count
Source
127
419
    std::string get_info() const {
128
419
        return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
129
419
               fmt::format(
130
419
                       "(name={}, queue_size={}/{}, active_thread={}/{}, "
131
419
                       "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})",
132
419
                       _name, get_queue_size(), _work_queue.get_capacity(), _active_threads,
133
419
                       _threads.size(), _work_queue.total_get_wait_time(),
134
419
                       _work_queue.total_put_wait_time(), is_shutdown());
135
419
    }
_ZNK5doris14WorkThreadPoolILb0EE8get_infoB5cxx11Ev
Line
Count
Source
127
6.36k
    std::string get_info() const {
128
6.36k
        return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
129
6.36k
               fmt::format(
130
6.36k
                       "(name={}, queue_size={}/{}, active_thread={}/{}, "
131
6.36k
                       "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})",
132
6.36k
                       _name, get_queue_size(), _work_queue.get_capacity(), _active_threads,
133
6.36k
                       _threads.size(), _work_queue.total_get_wait_time(),
134
6.36k
                       _work_queue.total_put_wait_time(), is_shutdown());
135
6.36k
    }
136
137
protected:
138
861k
    virtual bool is_shutdown() const { return _shutdown; }
_ZNK5doris14WorkThreadPoolILb1EE11is_shutdownEv
Line
Count
Source
138
121k
    virtual bool is_shutdown() const { return _shutdown; }
_ZNK5doris14WorkThreadPoolILb0EE11is_shutdownEv
Line
Count
Source
138
740k
    virtual bool is_shutdown() const { return _shutdown; }
139
140
    // Collection of worker threads that process work from the queue.
141
    ThreadGroup _threads;
142
143
    // Guards _empty_cv
144
    std::mutex _lock;
145
146
    // Signalled when the queue becomes empty
147
    std::condition_variable _empty_cv;
148
149
private:
150
    // Driver method for each thread in the pool. Continues to read work from the queue
151
    // until the pool is shutdown.
152
4.94k
    void work_thread(int thread_id) {
153
4.94k
        Thread::set_self_name(_name);
154
4.94k
        LOG(INFO) << "WorkThreadPool started: " << get_info();
155
857k
        while (!is_shutdown()) {
156
852k
            Task task;
157
852k
            if (_work_queue.blocking_get(&task)) {
158
849k
                _active_threads++;
159
849k
                task.work_function();
160
849k
                _active_threads--;
161
849k
            }
162
852k
            if (_work_queue.get_size() == 0) {
163
705k
                _empty_cv.notify_all();
164
705k
            }
165
852k
        }
166
        LOG(INFO) << "WorkThreadPool shutdown: " << get_info();
167
4.94k
    }
_ZN5doris14WorkThreadPoolILb1EE11work_threadEi
Line
Count
Source
152
227
    void work_thread(int thread_id) {
153
227
        Thread::set_self_name(_name);
154
227
        LOG(INFO) << "WorkThreadPool started: " << get_info();
155
120k
        while (!is_shutdown()) {
156
120k
            Task task;
157
120k
            if (_work_queue.blocking_get(&task)) {
158
120k
                _active_threads++;
159
120k
                task.work_function();
160
120k
                _active_threads--;
161
120k
            }
162
120k
            if (_work_queue.get_size() == 0) {
163
270
                _empty_cv.notify_all();
164
270
            }
165
120k
        }
166
        LOG(INFO) << "WorkThreadPool shutdown: " << get_info();
167
227
    }
_ZN5doris14WorkThreadPoolILb0EE11work_threadEi
Line
Count
Source
152
4.71k
    void work_thread(int thread_id) {
153
4.71k
        Thread::set_self_name(_name);
154
4.71k
        LOG(INFO) << "WorkThreadPool started: " << get_info();
155
737k
        while (!is_shutdown()) {
156
732k
            Task task;
157
732k
            if (_work_queue.blocking_get(&task)) {
158
729k
                _active_threads++;
159
729k
                task.work_function();
160
729k
                _active_threads--;
161
729k
            }
162
732k
            if (_work_queue.get_size() == 0) {
163
704k
                _empty_cv.notify_all();
164
704k
            }
165
732k
        }
166
        LOG(INFO) << "WorkThreadPool shutdown: " << get_info();
167
4.71k
    }
168
169
    WorkQueue _work_queue;
170
171
    // Set to true when threads should stop doing work and terminate.
172
    std::atomic<bool> _shutdown;
173
    std::string _name;
174
    std::atomic<int> _active_threads;
175
};
176
177
using PriorityThreadPool = WorkThreadPool<true>;
178
using FifoThreadPool = WorkThreadPool<false>;
179
180
} // namespace doris