be/src/util/work_thread_pool.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <mutex> |
21 | | |
22 | | #include "util/blocking_priority_queue.hpp" |
23 | | #include "util/blocking_queue.hpp" |
24 | | #include "util/thread.h" |
25 | | #include "util/thread_group.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | // Simple threadpool which processes items (of type T) in parallel which were placed on a |
30 | | // blocking queue by Offer(). Each item is processed by a single user-supplied method. |
31 | | template <bool Priority = false> |
32 | | class WorkThreadPool { |
33 | | public: |
34 | | // Signature of a work-processing function. Takes the integer id of the thread which is |
35 | | // calling it (ids run from 0 to num_threads - 1) and a reference to the item to |
36 | | // process. |
37 | | using WorkFunction = std::function<void()>; |
38 | | |
39 | | struct Task { |
40 | | public: |
41 | | int priority; |
42 | | WorkFunction work_function; |
43 | 0 | bool operator<(const Task& o) const { return priority < o.priority; } |
44 | | |
45 | 202 | Task& operator++() { |
46 | 202 | priority += 2; |
47 | 202 | return *this; |
48 | 202 | } |
49 | | }; |
50 | | |
51 | | using WorkQueue = |
52 | | std::conditional_t<Priority, BlockingPriorityQueue<Task>, BlockingQueue<Task>>; |
53 | | |
54 | | // Creates a new thread pool and start num_threads threads. |
55 | | // -- num_threads: how many threads are part of this pool |
56 | | // -- queue_size: the maximum size of the queue on which work items are offered. If the |
57 | | // queue exceeds this size, subsequent calls to Offer will block until there is |
58 | | // capacity available. |
59 | | WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) |
60 | 168 | : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { |
61 | 5.89k | for (int i = 0; i < num_threads; ++i) { |
62 | 5.72k | _threads.create_thread( |
63 | 5.72k | std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i)); |
64 | 5.72k | } |
65 | 168 | } _ZN5doris14WorkThreadPoolILb1EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Line | Count | Source | 60 | 119 | : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { | 61 | 356 | for (int i = 0; i < num_threads; ++i) { | 62 | 237 | _threads.create_thread( | 63 | 237 | std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i)); | 64 | 237 | } | 65 | 119 | } |
_ZN5doris14WorkThreadPoolILb0EEC2EjjRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Line | Count | Source | 60 | 49 | : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { | 61 | 5.53k | for (int i = 0; i < num_threads; ++i) { | 62 | 5.48k | _threads.create_thread( | 63 | 5.48k | std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i)); | 64 | 5.48k | } | 65 | 49 | } |
|
66 | | |
67 | | // Destructor ensures that all threads are terminated before this object is freed |
68 | | // (otherwise they may continue to run and reference member variables) |
69 | 133 | virtual ~WorkThreadPool() { |
70 | 133 | shutdown(); |
71 | 133 | join(); |
72 | 133 | } _ZN5doris14WorkThreadPoolILb1EED2Ev Line | Count | Source | 69 | 96 | virtual ~WorkThreadPool() { | 70 | 96 | shutdown(); | 71 | 96 | join(); | 72 | 96 | } |
_ZN5doris14WorkThreadPoolILb0EED2Ev Line | Count | Source | 69 | 37 | virtual ~WorkThreadPool() { | 70 | 37 | shutdown(); | 71 | 37 | join(); | 72 | 37 | } |
|
73 | | |
74 | | // Blocking operation that puts a work item on the queue. If the queue is full, blocks |
75 | | // until there is capacity available. |
76 | | // |
77 | | // 'work' is copied into the work queue, but may be referenced at any time in the |
78 | | // future. Therefore the caller needs to ensure that any data referenced by work (if T |
79 | | // is, e.g., a pointer type) remains valid until work has been processed, and it's up to |
80 | | // the caller to provide their own signalling mechanism to detect this (or to wait until |
81 | | // after DrainAndshutdown returns). |
82 | | // |
83 | | // Returns true if the work item was successfully added to the queue, false otherwise |
84 | | // (which typically means that the thread pool has already been shut down). |
85 | 3 | virtual bool offer(Task task) { return _work_queue.blocking_put(task); }_ZN5doris14WorkThreadPoolILb1EE5offerENS1_4TaskE Line | Count | Source | 85 | 3 | virtual bool offer(Task task) { return _work_queue.blocking_put(task); } |
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerENS1_4TaskE |
86 | | |
87 | 104k | virtual bool offer(WorkFunction func) { |
88 | 104k | WorkThreadPool::Task task = {0, func}; |
89 | 104k | return _work_queue.blocking_put(task); |
90 | 104k | } _ZN5doris14WorkThreadPoolILb1EE5offerESt8functionIFvvEE Line | Count | Source | 87 | 104k | virtual bool offer(WorkFunction func) { | 88 | 104k | WorkThreadPool::Task task = {0, func}; | 89 | 104k | return _work_queue.blocking_put(task); | 90 | 104k | } |
Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE5offerESt8functionIFvvEE |
91 | | |
92 | 685k | virtual bool try_offer(WorkFunction func) { |
93 | 685k | WorkThreadPool::Task task = {0, func}; |
94 | 685k | return _work_queue.try_put(task); |
95 | 685k | } Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE9try_offerESt8functionIFvvEE _ZN5doris14WorkThreadPoolILb0EE9try_offerESt8functionIFvvEE Line | Count | Source | 92 | 685k | virtual bool try_offer(WorkFunction func) { | 93 | 685k | WorkThreadPool::Task task = {0, func}; | 94 | 685k | return _work_queue.try_put(task); | 95 | 685k | } |
|
96 | | |
97 | | // Shuts the thread pool down, causing the work queue to cease accepting offered work |
98 | | // and the worker threads to terminate once they have processed their current work item. |
99 | | // Returns once the shutdown flag has been set, does not wait for the threads to |
100 | | // terminate. |
101 | 222 | virtual void shutdown() { |
102 | 222 | _shutdown = true; |
103 | 222 | _work_queue.shutdown(); |
104 | 222 | } _ZN5doris14WorkThreadPoolILb1EE8shutdownEv Line | Count | Source | 101 | 185 | virtual void shutdown() { | 102 | 185 | _shutdown = true; | 103 | 185 | _work_queue.shutdown(); | 104 | 185 | } |
_ZN5doris14WorkThreadPoolILb0EE8shutdownEv Line | Count | Source | 101 | 37 | virtual void shutdown() { | 102 | 37 | _shutdown = true; | 103 | 37 | _work_queue.shutdown(); | 104 | 37 | } |
|
105 | | |
106 | | // Blocks until all threads are finished. shutdown does not need to have been called, |
107 | | // since it may be called on a separate thread. |
108 | 204 | virtual void join() { static_cast<void>(_threads.join_all()); }_ZN5doris14WorkThreadPoolILb1EE4joinEv Line | Count | Source | 108 | 167 | virtual void join() { static_cast<void>(_threads.join_all()); } |
_ZN5doris14WorkThreadPoolILb0EE4joinEv Line | Count | Source | 108 | 37 | virtual void join() { static_cast<void>(_threads.join_all()); } |
|
109 | | |
110 | 9.53k | virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }_ZNK5doris14WorkThreadPoolILb1EE14get_queue_sizeEv Line | Count | Source | 110 | 439 | virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } |
_ZNK5doris14WorkThreadPoolILb0EE14get_queue_sizeEv Line | Count | Source | 110 | 9.09k | virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } |
|
111 | 1.19k | virtual uint32_t get_active_threads() const { return _active_threads; }Unexecuted instantiation: _ZNK5doris14WorkThreadPoolILb1EE18get_active_threadsEv _ZNK5doris14WorkThreadPoolILb0EE18get_active_threadsEv Line | Count | Source | 111 | 1.19k | virtual uint32_t get_active_threads() const { return _active_threads; } |
|
112 | | |
113 | | // Blocks until the work queue is empty, and then calls shutdown to stop the worker |
114 | | // threads and Join to wait until they are finished. |
115 | | // Any work Offer()'ed during DrainAndshutdown may or may not be processed. |
116 | 0 | virtual void drain_and_shutdown() { |
117 | 0 | { |
118 | 0 | std::unique_lock l(_lock); |
119 | 0 | while (_work_queue.get_size() != 0) { |
120 | 0 | _empty_cv.wait(l); |
121 | 0 | } |
122 | 0 | } |
123 | 0 | shutdown(); |
124 | 0 | join(); |
125 | 0 | } Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb1EE18drain_and_shutdownEv Unexecuted instantiation: _ZN5doris14WorkThreadPoolILb0EE18drain_and_shutdownEv |
126 | | |
127 | 8.34k | std::string get_info() const { |
128 | 8.34k | return (Priority ? "PriorityThreadPool" : "FifoThreadPool") + |
129 | 8.34k | fmt::format( |
130 | 8.34k | "(name={}, queue_size={}/{}, active_thread={}/{}, " |
131 | 8.34k | "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})", |
132 | 8.34k | _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, |
133 | 8.34k | _threads.size(), _work_queue.total_get_wait_time(), |
134 | 8.34k | _work_queue.total_put_wait_time(), is_shutdown()); |
135 | 8.34k | } _ZNK5doris14WorkThreadPoolILb1EE8get_infoB5cxx11Ev Line | Count | Source | 127 | 439 | std::string get_info() const { | 128 | 439 | return (Priority ? "PriorityThreadPool" : "FifoThreadPool") + | 129 | 439 | fmt::format( | 130 | 439 | "(name={}, queue_size={}/{}, active_thread={}/{}, " | 131 | 439 | "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})", | 132 | 439 | _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, | 133 | 439 | _threads.size(), _work_queue.total_get_wait_time(), | 134 | 439 | _work_queue.total_put_wait_time(), is_shutdown()); | 135 | 439 | } |
_ZNK5doris14WorkThreadPoolILb0EE8get_infoB5cxx11Ev Line | Count | Source | 127 | 7.90k | std::string get_info() const { | 128 | 7.90k | return (Priority ? "PriorityThreadPool" : "FifoThreadPool") + | 129 | 7.90k | fmt::format( | 130 | 7.90k | "(name={}, queue_size={}/{}, active_thread={}/{}, " | 131 | 7.90k | "total_get_wait_time={}, total_put_wait_time={}, is_shutdown={})", | 132 | 7.90k | _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, | 133 | 7.90k | _threads.size(), _work_queue.total_get_wait_time(), | 134 | 7.90k | _work_queue.total_put_wait_time(), is_shutdown()); | 135 | 7.90k | } |
|
136 | | |
137 | | protected: |
138 | 805k | virtual bool is_shutdown() const { return _shutdown; }_ZNK5doris14WorkThreadPoolILb1EE11is_shutdownEv Line | Count | Source | 138 | 105k | virtual bool is_shutdown() const { return _shutdown; } |
_ZNK5doris14WorkThreadPoolILb0EE11is_shutdownEv Line | Count | Source | 138 | 700k | virtual bool is_shutdown() const { return _shutdown; } |
|
139 | | |
140 | | // Collection of worker threads that process work from the queue. |
141 | | ThreadGroup _threads; |
142 | | |
143 | | // Guards _empty_cv |
144 | | std::mutex _lock; |
145 | | |
146 | | // Signalled when the queue becomes empty |
147 | | std::condition_variable _empty_cv; |
148 | | |
149 | | private: |
150 | | // Driver method for each thread in the pool. Continues to read work from the queue |
151 | | // until the pool is shutdown. |
152 | 5.72k | void work_thread(int thread_id) { |
153 | 5.72k | Thread::set_self_name(_name); |
154 | 5.72k | LOG(INFO) << "WorkThreadPool started: " << get_info(); |
155 | 799k | while (!is_shutdown()) { |
156 | 794k | Task task; |
157 | 794k | if (_work_queue.blocking_get(&task)) { |
158 | 791k | _active_threads++; |
159 | 791k | task.work_function(); |
160 | 791k | _active_threads--; |
161 | 791k | } |
162 | 794k | if (_work_queue.get_size() == 0) { |
163 | 661k | _empty_cv.notify_all(); |
164 | 661k | } |
165 | 794k | } |
166 | | LOG(INFO) << "WorkThreadPool shutdown: " << get_info(); |
167 | 5.72k | } _ZN5doris14WorkThreadPoolILb1EE11work_threadEi Line | Count | Source | 152 | 236 | void work_thread(int thread_id) { | 153 | 236 | Thread::set_self_name(_name); | 154 | 236 | LOG(INFO) << "WorkThreadPool started: " << get_info(); | 155 | 104k | while (!is_shutdown()) { | 156 | 104k | Task task; | 157 | 104k | if (_work_queue.blocking_get(&task)) { | 158 | 104k | _active_threads++; | 159 | 104k | task.work_function(); | 160 | 104k | _active_threads--; | 161 | 104k | } | 162 | 104k | if (_work_queue.get_size() == 0) { | 163 | 281 | _empty_cv.notify_all(); | 164 | 281 | } | 165 | 104k | } | 166 | | LOG(INFO) << "WorkThreadPool shutdown: " << get_info(); | 167 | 236 | } |
_ZN5doris14WorkThreadPoolILb0EE11work_threadEi Line | Count | Source | 152 | 5.48k | void work_thread(int thread_id) { | 153 | 5.48k | Thread::set_self_name(_name); | 154 | 5.48k | LOG(INFO) << "WorkThreadPool started: " << get_info(); | 155 | 694k | while (!is_shutdown()) { | 156 | 689k | Task task; | 157 | 689k | if (_work_queue.blocking_get(&task)) { | 158 | 686k | _active_threads++; | 159 | 686k | task.work_function(); | 160 | 686k | _active_threads--; | 161 | 686k | } | 162 | 689k | if (_work_queue.get_size() == 0) { | 163 | 660k | _empty_cv.notify_all(); | 164 | 660k | } | 165 | 689k | } | 166 | | LOG(INFO) << "WorkThreadPool shutdown: " << get_info(); | 167 | 5.48k | } |
|
168 | | |
169 | | WorkQueue _work_queue; |
170 | | |
171 | | // Set to true when threads should stop doing work and terminate. |
172 | | std::atomic<bool> _shutdown; |
173 | | std::string _name; |
174 | | std::atomic<int> _active_threads; |
175 | | }; |
176 | | |
177 | | using PriorityThreadPool = WorkThreadPool<true>; |
178 | | using FifoThreadPool = WorkThreadPool<false>; |
179 | | |
180 | | } // namespace doris |