be/src/util/work_thread_pool.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <mutex> |
21 | | |
22 | | #include "util/blocking_priority_queue.hpp" |
23 | | #include "util/blocking_queue.hpp" |
24 | | #include "util/thread.h" |
25 | | #include "util/thread_group.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | // Simple threadpool which processes items (of type T) in parallel which were placed on a |
30 | | // blocking queue by Offer(). Each item is processed by a single user-supplied method. |
31 | | template <bool Priority = false> |
32 | | class WorkThreadPool { |
33 | | public: |
34 | | // Signature of a work-processing function. Takes the integer id of the thread which is |
35 | | // calling it (ids run from 0 to num_threads - 1) and a reference to the item to |
36 | | // process. |
37 | | using WorkFunction = std::function<void()>; |
38 | | |
39 | | struct Task { |
40 | | public: |
41 | | int priority; |
42 | | WorkFunction work_function; |
43 | 0 | bool operator<(const Task& o) const { return priority < o.priority; } |
44 | | |
45 | 0 | Task& operator++() { |
46 | 0 | priority += 2; |
47 | 0 | return *this; |
48 | 0 | } |
49 | | }; |
50 | | |
51 | | using WorkQueue = |
52 | | std::conditional_t<Priority, BlockingPriorityQueue<Task>, BlockingQueue<Task>>; |
53 | | |
54 | | // Creates a new thread pool and start num_threads threads. |
55 | | // -- num_threads: how many threads are part of this pool |
56 | | // -- queue_size: the maximum size of the queue on which work items are offered. If the |
57 | | // queue exceeds this size, subsequent calls to Offer will block until there is |
58 | | // capacity available. |
59 | | WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) |
60 | 36 | : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { |
61 | 250 | for (int i = 0; i < num_threads; ++i) { |
62 | 214 | _threads.create_thread( |
63 | 214 | std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i)); |
64 | 214 | } |
65 | 36 | } _ZN5doris14WorkThreadPoolILb0EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Line | Count | Source | 60 | 28 | : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { | 61 | 140 | for (int i = 0; i < num_threads; ++i) { | 62 | 112 | _threads.create_thread( | 63 | 112 | std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i)); | 64 | 112 | } | 65 | 28 | } |
_ZN5doris14WorkThreadPoolILb1EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Line | Count | Source | 60 | 8 | : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { | 61 | 110 | for (int i = 0; i < num_threads; ++i) { | 62 | 102 | _threads.create_thread( | 63 | 102 | std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i)); | 64 | 102 | } | 65 | 8 | } |
|
66 | | |
67 | | // Destructor ensures that all threads are terminated before this object is freed |
68 | | // (otherwise they may continue to run and reference member variables) |
69 | 36 | virtual ~WorkThreadPool() { |
70 | 36 | shutdown(); |
71 | 36 | join(); |
72 | 36 | } _ZN5doris14WorkThreadPoolILb0EED2Ev Line | Count | Source | 69 | 28 | virtual ~WorkThreadPool() { | 70 | 28 | shutdown(); | 71 | 28 | join(); | 72 | 28 | } |
_ZN5doris14WorkThreadPoolILb1EED2Ev Line | Count | Source | 69 | 8 | virtual ~WorkThreadPool() { | 70 | 8 | shutdown(); | 71 | 8 | join(); | 72 | 8 | } |
|
73 | | |
74 | | // Blocking operation that puts a work item on the queue. If the queue is full, blocks |
75 | | // until there is capacity available. |
76 | | // |
77 | | // 'work' is copied into the work queue, but may be referenced at any time in the |
78 | | // future. Therefore the caller needs to ensure that any data referenced by work (if T |
79 | | // is, e.g., a pointer type) remains valid until work has been processed, and it's up to |
80 | | // the caller to provide their own signalling mechanism to detect this (or to wait until |
81 | | // after DrainAndshutdown returns). |
82 | | // |
83 | | // Returns true if the work item was successfully added to the queue, false otherwise |
84 | | // (which typically means that the thread pool has already been shut down). |
85 | 3 | virtual bool offer(Task task) { return _work_queue.blocking_put(task); }Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerENS1_4TaskE _ZN5doris14WorkThreadPoolILb1EE5offerENS1_4TaskE Line | Count | Source | 85 | 3 | virtual bool offer(Task task) { return _work_queue.blocking_put(task); } |
|
86 | | |
87 | 6 | virtual bool offer(WorkFunction func) { |
88 | 6 | WorkThreadPool::Task task = {0, func}; |
89 | 6 | return _work_queue.blocking_put(task); |
90 | 6 | } Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerESt8functionIFvvEE _ZN5doris14WorkThreadPoolILb1EE5offerESt8functionIFvvEE Line | Count | Source | 87 | 6 | virtual bool offer(WorkFunction func) { | 88 | 6 | WorkThreadPool::Task task = {0, func}; | 89 | 6 | return _work_queue.blocking_put(task); | 90 | 6 | } |
|
91 | | |
92 | 30 | virtual bool try_offer(WorkFunction func) { |
93 | 30 | WorkThreadPool::Task task = {0, func}; |
94 | 30 | return _work_queue.try_put(task); |
95 | 30 | } _ZN5doris14WorkThreadPoolILb0EE9try_offerESt8functionIFvvEE Line | Count | Source | 92 | 30 | virtual bool try_offer(WorkFunction func) { | 93 | 30 | WorkThreadPool::Task task = {0, func}; | 94 | 30 | return _work_queue.try_put(task); | 95 | 30 | } |
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE9try_offerESt8functionIFvvEE |
96 | | |
97 | | // Shuts the thread pool down, causing the work queue to cease accepting offered work |
98 | | // and the worker threads to terminate once they have processed their current work item. |
99 | | // Returns once the shutdown flag has been set, does not wait for the threads to |
100 | | // terminate. |
101 | 37 | virtual void shutdown() { |
102 | 37 | _shutdown = true; |
103 | 37 | _work_queue.shutdown(); |
104 | 37 | } _ZN5doris14WorkThreadPoolILb0EE8shutdownEv Line | Count | Source | 101 | 28 | virtual void shutdown() { | 102 | 28 | _shutdown = true; | 103 | 28 | _work_queue.shutdown(); | 104 | 28 | } |
_ZN5doris14WorkThreadPoolILb1EE8shutdownEv Line | Count | Source | 101 | 9 | virtual void shutdown() { | 102 | 9 | _shutdown = true; | 103 | 9 | _work_queue.shutdown(); | 104 | 9 | } |
|
105 | | |
106 | | // Blocks until all threads are finished. shutdown does not need to have been called, |
107 | | // since it may be called on a separate thread. |
108 | 37 | virtual void join() { static_cast<void>(_threads.join_all()); }_ZN5doris14WorkThreadPoolILb0EE4joinEv Line | Count | Source | 108 | 28 | virtual void join() { static_cast<void>(_threads.join_all()); } |
_ZN5doris14WorkThreadPoolILb1EE4joinEv Line | Count | Source | 108 | 9 | virtual void join() { static_cast<void>(_threads.join_all()); } |
|
109 | | |
110 | 428 | virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }_ZNK5doris14WorkThreadPoolILb0EE14get_queue_sizeEv Line | Count | Source | 110 | 224 | virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } |
_ZNK5doris14WorkThreadPoolILb1EE14get_queue_sizeEv Line | Count | Source | 110 | 204 | virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } |
|
111 | 0 | virtual uint32_t get_active_threads() const { return _active_threads; }Unexecuted instantiation: _ZNK5doris14WorkThreadPoolILb0EE18get_active_threadsEv Unexecuted instantiation: _ZNK5doris14WorkThreadPoolILb1EE18get_active_threadsEv |
112 | | |
113 | | // Blocks until the work queue is empty, and then calls shutdown to stop the worker |
114 | | // threads and Join to wait until they are finished. |
115 | | // Any work Offer()'ed during DrainAndshutdown may or may not be processed. |
116 | 0 | virtual void drain_and_shutdown() { |
117 | 0 | { |
118 | 0 | std::unique_lock l(_lock); |
119 | 0 | while (_work_queue.get_size() != 0) { |
120 | 0 | _empty_cv.wait(l); |
121 | 0 | } |
122 | 0 | } |
123 | 0 | shutdown(); |
124 | 0 | join(); |
125 | 0 | } Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE18drain_and_shutdownEv Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE18drain_and_shutdownEv |
126 | | |
127 | 428 | std::string get_info() const { |
128 | 428 | return (Priority ? "PriorityThreadPool" : "FifoThreadPool") + |
129 | 428 | fmt::format( |
130 | 428 | "(name={}, queue_size={}/{}, active_thread={}/{}, " |
131 | 428 | "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})", |
132 | 428 | _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, |
133 | 428 | _threads.size(), _work_queue.total_get_wait_time(), |
134 | 428 | _work_queue.total_put_wait_time(), is_shutdown()); |
135 | 428 | } _ZNK5doris14WorkThreadPoolILb0EE8get_infoB5cxx11Ev Line | Count | Source | 127 | 224 | std::string get_info() const { | 128 | 224 | return (Priority ? "PriorityThreadPool" : "FifoThreadPool") + | 129 | 224 | fmt::format( | 130 | 224 | "(name={}, queue_size={}/{}, active_thread={}/{}, " | 131 | 224 | "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})", | 132 | 224 | _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, | 133 | 224 | _threads.size(), _work_queue.total_get_wait_time(), | 134 | 224 | _work_queue.total_put_wait_time(), is_shutdown()); | 135 | 224 | } |
_ZNK5doris14WorkThreadPoolILb1EE8get_infoB5cxx11Ev Line | Count | Source | 127 | 204 | std::string get_info() const { | 128 | 204 | return (Priority ? "PriorityThreadPool" : "FifoThreadPool") + | 129 | 204 | fmt::format( | 130 | 204 | "(name={}, queue_size={}/{}, active_thread={}/{}, " | 131 | 204 | "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})", | 132 | 204 | _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, | 133 | 204 | _threads.size(), _work_queue.total_get_wait_time(), | 134 | 204 | _work_queue.total_put_wait_time(), is_shutdown()); | 135 | 204 | } |
|
136 | | |
137 | | protected: |
138 | 893 | virtual bool is_shutdown() const { return _shutdown; }_ZNK5doris14WorkThreadPoolILb0EE11is_shutdownEv Line | Count | Source | 138 | 477 | virtual bool is_shutdown() const { return _shutdown; } |
_ZNK5doris14WorkThreadPoolILb1EE11is_shutdownEv Line | Count | Source | 138 | 416 | virtual bool is_shutdown() const { return _shutdown; } |
|
139 | | |
140 | | // Collection of worker threads that process work from the queue. |
141 | | ThreadGroup _threads; |
142 | | |
143 | | // Guards _empty_cv |
144 | | std::mutex _lock; |
145 | | |
146 | | // Signalled when the queue becomes empty |
147 | | std::condition_variable _empty_cv; |
148 | | |
149 | | private: |
150 | | // Driver method for each thread in the pool. Continues to read work from the queue |
151 | | // until the pool is shutdown. |
152 | 214 | void work_thread(int thread_id) { |
153 | 214 | Thread::set_self_name(_name); |
154 | 214 | LOG(INFO) << "WorkThreadPool started: " << get_info(); |
155 | 467 | while (!is_shutdown()) { |
156 | 253 | Task task; |
157 | 253 | if (_work_queue.blocking_get(&task)) { |
158 | 39 | _active_threads++; |
159 | 39 | task.work_function(); |
160 | 39 | _active_threads--; |
161 | 39 | } |
162 | 253 | if (_work_queue.get_size() == 0) { |
163 | 250 | _empty_cv.notify_all(); |
164 | 250 | } |
165 | 253 | } |
166 | | LOG(INFO) << "WorkThreadPool shutdown: " << get_info(); |
167 | 214 | } _ZN5doris14WorkThreadPoolILb0EE11work_threadEi Line | Count | Source | 152 | 112 | void work_thread(int thread_id) { | 153 | 112 | Thread::set_self_name(_name); | 154 | 112 | LOG(INFO) << "WorkThreadPool started: " << get_info(); | 155 | 254 | while (!is_shutdown()) { | 156 | 142 | Task task; | 157 | 142 | if (_work_queue.blocking_get(&task)) { | 158 | 30 | _active_threads++; | 159 | 30 | task.work_function(); | 160 | 30 | _active_threads--; | 161 | 30 | } | 162 | 142 | if (_work_queue.get_size() == 0) { | 163 | 142 | _empty_cv.notify_all(); | 164 | 142 | } | 165 | 142 | } | 166 | | LOG(INFO) << "WorkThreadPool shutdown: " << get_info(); | 167 | 112 | } |
_ZN5doris14WorkThreadPoolILb1EE11work_threadEi Line | Count | Source | 152 | 102 | void work_thread(int thread_id) { | 153 | 102 | Thread::set_self_name(_name); | 154 | 102 | LOG(INFO) << "WorkThreadPool started: " << get_info(); | 155 | 213 | while (!is_shutdown()) { | 156 | 111 | Task task; | 157 | 111 | if (_work_queue.blocking_get(&task)) { | 158 | 9 | _active_threads++; | 159 | 9 | task.work_function(); | 160 | 9 | _active_threads--; | 161 | 9 | } | 162 | 111 | if (_work_queue.get_size() == 0) { | 163 | 108 | _empty_cv.notify_all(); | 164 | 108 | } | 165 | 111 | } | 166 | | LOG(INFO) << "WorkThreadPool shutdown: " << get_info(); | 167 | 102 | } |
|
168 | | |
169 | | WorkQueue _work_queue; |
170 | | |
171 | | // Set to true when threads should stop doing work and terminate. |
172 | | std::atomic<bool> _shutdown; |
173 | | std::string _name; |
174 | | std::atomic<int> _active_threads; |
175 | | }; |
176 | | |
177 | | using PriorityThreadPool = WorkThreadPool<true>; |
178 | | using FifoThreadPool = WorkThreadPool<false>; |
179 | | |
180 | | } // namespace doris |