Coverage Report

Created: 2026-03-12 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/work_thread_pool.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <mutex>
21
22
#include "util/blocking_priority_queue.hpp"
23
#include "util/blocking_queue.hpp"
24
#include "util/thread.h"
25
#include "util/thread_group.h"
26
27
namespace doris {
28
29
// Simple threadpool which processes items (of type T) in parallel which were placed on a
30
// blocking queue by Offer(). Each item is processed by a single user-supplied method.
31
template <bool Priority = false>
32
class WorkThreadPool {
33
public:
34
    // Signature of a work-processing function. Takes the integer id of the thread which is
35
    // calling it (ids run from 0 to num_threads - 1) and a reference to the item to
36
    // process.
37
    using WorkFunction = std::function<void()>;
38
39
    struct Task {
40
    public:
41
        int priority;
42
        WorkFunction work_function;
43
0
        bool operator<(const Task& o) const { return priority < o.priority; }
44
45
202
        Task& operator++() {
46
202
            priority += 2;
47
202
            return *this;
48
202
        }
49
    };
50
51
    using WorkQueue =
52
            std::conditional_t<Priority, BlockingPriorityQueue<Task>, BlockingQueue<Task>>;
53
54
    // Creates a new thread pool and start num_threads threads.
55
    //  -- num_threads: how many threads are part of this pool
56
    //  -- queue_size: the maximum size of the queue on which work items are offered. If the
57
    //     queue exceeds this size, subsequent calls to Offer will block until there is
58
    //     capacity available.
59
    WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name)
60
168
            : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
61
5.89k
        for (int i = 0; i < num_threads; ++i) {
62
5.72k
            _threads.create_thread(
63
5.72k
                    std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i));
64
5.72k
        }
65
168
    }
_ZN5doris14WorkThreadPoolILb1EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Line
Count
Source
60
119
            : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
61
356
        for (int i = 0; i < num_threads; ++i) {
62
237
            _threads.create_thread(
63
237
                    std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i));
64
237
        }
65
119
    }
_ZN5doris14WorkThreadPoolILb0EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Line
Count
Source
60
49
            : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
61
5.53k
        for (int i = 0; i < num_threads; ++i) {
62
5.48k
            _threads.create_thread(
63
5.48k
                    std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i));
64
5.48k
        }
65
49
    }
66
67
    // Destructor ensures that all threads are terminated before this object is freed
68
    // (otherwise they may continue to run and reference member variables)
69
133
    virtual ~WorkThreadPool() {
70
133
        shutdown();
71
133
        join();
72
133
    }
_ZN5doris14WorkThreadPoolILb1EED2Ev
Line
Count
Source
69
96
    virtual ~WorkThreadPool() {
70
96
        shutdown();
71
96
        join();
72
96
    }
_ZN5doris14WorkThreadPoolILb0EED2Ev
Line
Count
Source
69
37
    virtual ~WorkThreadPool() {
70
37
        shutdown();
71
37
        join();
72
37
    }
73
74
    // Blocking operation that puts a work item on the queue. If the queue is full, blocks
75
    // until there is capacity available.
76
    //
77
    // 'work' is copied into the work queue, but may be referenced at any time in the
78
    // future. Therefore the caller needs to ensure that any data referenced by work (if T
79
    // is, e.g., a pointer type) remains valid until work has been processed, and it's up to
80
    // the caller to provide their own signalling mechanism to detect this (or to wait until
81
    // after DrainAndshutdown returns).
82
    //
83
    // Returns true if the work item was successfully added to the queue, false otherwise
84
    // (which typically means that the thread pool has already been shut down).
85
3
    virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
_ZN5doris14WorkThreadPoolILb1EE5offerENS1_4TaskE
Line
Count
Source
85
3
    virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerENS1_4TaskE
86
87
104k
    virtual bool offer(WorkFunction func) {
88
104k
        WorkThreadPool::Task task = {0, func};
89
104k
        return _work_queue.blocking_put(task);
90
104k
    }
_ZN5doris14WorkThreadPoolILb1EE5offerESt8functionIFvvEE
Line
Count
Source
87
104k
    virtual bool offer(WorkFunction func) {
88
104k
        WorkThreadPool::Task task = {0, func};
89
104k
        return _work_queue.blocking_put(task);
90
104k
    }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerESt8functionIFvvEE
91
92
685k
    virtual bool try_offer(WorkFunction func) {
93
685k
        WorkThreadPool::Task task = {0, func};
94
685k
        return _work_queue.try_put(task);
95
685k
    }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE9try_offerESt8functionIFvvEE
_ZN5doris14WorkThreadPoolILb0EE9try_offerESt8functionIFvvEE
Line
Count
Source
92
685k
    virtual bool try_offer(WorkFunction func) {
93
685k
        WorkThreadPool::Task task = {0, func};
94
685k
        return _work_queue.try_put(task);
95
685k
    }
96
97
    // Shuts the thread pool down, causing the work queue to cease accepting offered work
98
    // and the worker threads to terminate once they have processed their current work item.
99
    // Returns once the shutdown flag has been set, does not wait for the threads to
100
    // terminate.
101
222
    virtual void shutdown() {
102
222
        _shutdown = true;
103
222
        _work_queue.shutdown();
104
222
    }
_ZN5doris14WorkThreadPoolILb1EE8shutdownEv
Line
Count
Source
101
185
    virtual void shutdown() {
102
185
        _shutdown = true;
103
185
        _work_queue.shutdown();
104
185
    }
_ZN5doris14WorkThreadPoolILb0EE8shutdownEv
Line
Count
Source
101
37
    virtual void shutdown() {
102
37
        _shutdown = true;
103
37
        _work_queue.shutdown();
104
37
    }
105
106
    // Blocks until all threads are finished. shutdown does not need to have been called,
107
    // since it may be called on a separate thread.
108
204
    virtual void join() { static_cast<void>(_threads.join_all()); }
_ZN5doris14WorkThreadPoolILb1EE4joinEv
Line
Count
Source
108
167
    virtual void join() { static_cast<void>(_threads.join_all()); }
_ZN5doris14WorkThreadPoolILb0EE4joinEv
Line
Count
Source
108
37
    virtual void join() { static_cast<void>(_threads.join_all()); }
109
110
9.53k
    virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
_ZNK5doris14WorkThreadPoolILb1EE14get_queue_sizeEv
Line
Count
Source
110
439
    virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
_ZNK5doris14WorkThreadPoolILb0EE14get_queue_sizeEv
Line
Count
Source
110
9.09k
    virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
111
1.19k
    virtual uint32_t get_active_threads() const { return _active_threads; }
Unexecuted instantiation: _ZNK5doris14WorkThreadPoolILb1EE18get_active_threadsEv
_ZNK5doris14WorkThreadPoolILb0EE18get_active_threadsEv
Line
Count
Source
111
1.19k
    virtual uint32_t get_active_threads() const { return _active_threads; }
112
113
    // Blocks until the work queue is empty, and then calls shutdown to stop the worker
114
    // threads and Join to wait until they are finished.
115
    // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
116
0
    virtual void drain_and_shutdown() {
117
0
        {
118
0
            std::unique_lock l(_lock);
119
0
            while (_work_queue.get_size() != 0) {
120
0
                _empty_cv.wait(l);
121
0
            }
122
0
        }
123
0
        shutdown();
124
0
        join();
125
0
    }
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE18drain_and_shutdownEv
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE18drain_and_shutdownEv
126
127
8.34k
    std::string get_info() const {
128
8.34k
        return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
129
8.34k
               fmt::format(
130
8.34k
                       "(name={}, queue_size={}/{}, active_thread={}/{}, "
131
8.34k
                       "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})",
132
8.34k
                       _name, get_queue_size(), _work_queue.get_capacity(), _active_threads,
133
8.34k
                       _threads.size(), _work_queue.total_get_wait_time(),
134
8.34k
                       _work_queue.total_put_wait_time(), is_shutdown());
135
8.34k
    }
_ZNK5doris14WorkThreadPoolILb1EE8get_infoB5cxx11Ev
Line
Count
Source
127
439
    std::string get_info() const {
128
439
        return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
129
439
               fmt::format(
130
439
                       "(name={}, queue_size={}/{}, active_thread={}/{}, "
131
439
                       "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})",
132
439
                       _name, get_queue_size(), _work_queue.get_capacity(), _active_threads,
133
439
                       _threads.size(), _work_queue.total_get_wait_time(),
134
439
                       _work_queue.total_put_wait_time(), is_shutdown());
135
439
    }
_ZNK5doris14WorkThreadPoolILb0EE8get_infoB5cxx11Ev
Line
Count
Source
127
7.90k
    std::string get_info() const {
128
7.90k
        return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
129
7.90k
               fmt::format(
130
7.90k
                       "(name={}, queue_size={}/{}, active_thread={}/{}, "
131
7.90k
                       "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})",
132
7.90k
                       _name, get_queue_size(), _work_queue.get_capacity(), _active_threads,
133
7.90k
                       _threads.size(), _work_queue.total_get_wait_time(),
134
7.90k
                       _work_queue.total_put_wait_time(), is_shutdown());
135
7.90k
    }
136
137
protected:
138
805k
    virtual bool is_shutdown() const { return _shutdown; }
_ZNK5doris14WorkThreadPoolILb1EE11is_shutdownEv
Line
Count
Source
138
105k
    virtual bool is_shutdown() const { return _shutdown; }
_ZNK5doris14WorkThreadPoolILb0EE11is_shutdownEv
Line
Count
Source
138
700k
    virtual bool is_shutdown() const { return _shutdown; }
139
140
    // Collection of worker threads that process work from the queue.
141
    ThreadGroup _threads;
142
143
    // Guards _empty_cv
144
    std::mutex _lock;
145
146
    // Signalled when the queue becomes empty
147
    std::condition_variable _empty_cv;
148
149
private:
150
    // Driver method for each thread in the pool. Continues to read work from the queue
151
    // until the pool is shutdown.
152
5.72k
    void work_thread(int thread_id) {
153
5.72k
        Thread::set_self_name(_name);
154
5.72k
        LOG(INFO) << "WorkThreadPool started: " << get_info();
155
799k
        while (!is_shutdown()) {
156
794k
            Task task;
157
794k
            if (_work_queue.blocking_get(&task)) {
158
791k
                _active_threads++;
159
791k
                task.work_function();
160
791k
                _active_threads--;
161
791k
            }
162
794k
            if (_work_queue.get_size() == 0) {
163
661k
                _empty_cv.notify_all();
164
661k
            }
165
794k
        }
166
        LOG(INFO) << "WorkThreadPool shutdown: " << get_info();
167
5.72k
    }
_ZN5doris14WorkThreadPoolILb1EE11work_threadEi
Line
Count
Source
152
236
    void work_thread(int thread_id) {
153
236
        Thread::set_self_name(_name);
154
236
        LOG(INFO) << "WorkThreadPool started: " << get_info();
155
104k
        while (!is_shutdown()) {
156
104k
            Task task;
157
104k
            if (_work_queue.blocking_get(&task)) {
158
104k
                _active_threads++;
159
104k
                task.work_function();
160
104k
                _active_threads--;
161
104k
            }
162
104k
            if (_work_queue.get_size() == 0) {
163
281
                _empty_cv.notify_all();
164
281
            }
165
104k
        }
166
        LOG(INFO) << "WorkThreadPool shutdown: " << get_info();
167
236
    }
_ZN5doris14WorkThreadPoolILb0EE11work_threadEi
Line
Count
Source
152
5.48k
    void work_thread(int thread_id) {
153
5.48k
        Thread::set_self_name(_name);
154
5.48k
        LOG(INFO) << "WorkThreadPool started: " << get_info();
155
694k
        while (!is_shutdown()) {
156
689k
            Task task;
157
689k
            if (_work_queue.blocking_get(&task)) {
158
686k
                _active_threads++;
159
686k
                task.work_function();
160
686k
                _active_threads--;
161
686k
            }
162
689k
            if (_work_queue.get_size() == 0) {
163
660k
                _empty_cv.notify_all();
164
660k
            }
165
689k
        }
166
        LOG(INFO) << "WorkThreadPool shutdown: " << get_info();
167
5.48k
    }
168
169
    WorkQueue _work_queue;
170
171
    // Set to true when threads should stop doing work and terminate.
172
    std::atomic<bool> _shutdown;
173
    std::string _name;
174
    std::atomic<int> _active_threads;
175
};
176
177
using PriorityThreadPool = WorkThreadPool<true>;
178
using FifoThreadPool = WorkThreadPool<false>;
179
180
} // namespace doris