/root/doris/be/src/util/threadpool.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.h |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <gen_cpp/Types_types.h> |
24 | | |
25 | | #include <boost/intrusive/detail/algo_type.hpp> |
26 | | #include <boost/intrusive/list.hpp> |
27 | | #include <boost/intrusive/list_hook.hpp> |
28 | | #include <climits> |
29 | | #include <cstddef> |
30 | | // IWYU pragma: no_include <bits/chrono.h> |
31 | | #include <chrono> // IWYU pragma: keep |
32 | | #include <condition_variable> |
33 | | #include <deque> |
34 | | #include <functional> |
35 | | #include <iosfwd> |
36 | | #include <memory> |
37 | | #include <mutex> |
38 | | #include <string> |
39 | | #include <unordered_set> |
40 | | |
41 | | #include "agent/cgroup_cpu_ctl.h" |
42 | | #include "common/status.h" |
43 | | #include "util/interval_histogram.h" |
44 | | #include "util/metrics.h" |
45 | | #include "util/uid_util.h" |
46 | | #include "util/work_thread_pool.hpp" |
47 | | |
48 | | namespace doris { |
49 | | |
50 | | class Thread; |
51 | | class ThreadPool; |
52 | | class ThreadPoolToken; |
53 | | |
54 | | class Runnable { |
55 | | public: |
56 | | virtual void run() = 0; |
57 | 10.0k | virtual ~Runnable() = default; |
58 | | }; |
59 | | |
60 | | // ThreadPool takes a lot of arguments. We provide sane defaults with a builder. |
61 | | // |
62 | | // name: Used for debugging output and default names of the worker threads. |
63 | | // Since thread names are limited to 16 characters on Linux, it's good to |
64 | | // choose a short name here. |
65 | | // Required. |
66 | | // |
67 | | // trace_metric_prefix: used to prefix the names of TraceMetric counters. |
68 | | // When a task on a thread pool has an associated trace, the thread pool |
69 | | // implementation will increment TraceMetric counters to indicate the |
70 | | // amount of time spent waiting in the queue as well as the amount of wall |
71 | | // and CPU time spent executing. By default, these counters are prefixed |
72 | | // with the name of the thread pool. For example, if the pool is named |
73 | | // 'apply', then counters such as 'apply.queue_time_us' will be |
74 | | // incremented. |
75 | | // |
76 | | // The TraceMetrics implementation relies on the number of distinct counter |
77 | | // names being small. Thus, if the thread pool name itself is dynamically |
78 | | // generated, the default behavior described above would result in an |
79 | | // unbounded number of distinct counter names. The 'trace_metric_prefix' |
80 | | // setting can be used to override the prefix used in generating the trace |
81 | | // metric names. |
82 | | // |
83 | | // For example, the Raft thread pools are named "<tablet id>-raft" which |
84 | | // has unbounded cardinality (a server may have thousands of different |
85 | | // tablet IDs over its lifetime). In that case, setting the prefix to |
86 | | // "raft" will avoid any issues. |
87 | | // |
88 | | // min_threads: Minimum number of threads we'll have at any time. |
89 | | // Default: 0. |
90 | | // |
91 | | // max_threads: Maximum number of threads we'll have at any time. |
92 | | // Default: Number of CPUs detected on the system. |
93 | | // |
94 | | // max_queue_size: Maximum number of items to enqueue before returning a |
95 | | // Status::ServiceUnavailable message from Submit(). |
96 | | // Default: INT_MAX. |
97 | | // |
98 | | // idle_timeout: How long we'll keep around an idle thread before timing it out. |
99 | | // We always keep at least min_threads. |
100 | | // Default: 500 milliseconds. |
101 | | // |
102 | | // metrics: Histograms, counters, etc. to update on various threadpool events. |
103 | | // Default: not set. |
104 | | // |
105 | | class ThreadPoolBuilder { |
106 | | public: |
107 | | explicit ThreadPoolBuilder(std::string name, std::string workload_group = ""); |
108 | | |
109 | | // Note: We violate the style guide by returning mutable references here |
110 | | // in order to provide traditional Builder pattern conveniences. |
111 | | ThreadPoolBuilder& set_min_threads(int min_threads); |
112 | | ThreadPoolBuilder& set_max_threads(int max_threads); |
113 | | ThreadPoolBuilder& set_max_queue_size(int max_queue_size); |
114 | | ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl); |
115 | | template <class Rep, class Period> |
116 | 7 | ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { |
117 | 7 | _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); |
118 | 7 | return *this; |
119 | 7 | } _ZN5doris17ThreadPoolBuilder16set_idle_timeoutIlSt5ratioILl1ELl1000EEEERS0_RKNSt6chrono8durationIT_T0_EE Line | Count | Source | 116 | 6 | ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { | 117 | 6 | _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); | 118 | 6 | return *this; | 119 | 6 | } |
_ZN5doris17ThreadPoolBuilder16set_idle_timeoutIlSt5ratioILl1ELl1000000EEEERS0_RKNSt6chrono8durationIT_T0_EE Line | Count | Source | 116 | 1 | ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { | 117 | 1 | _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); | 118 | 1 | return *this; | 119 | 1 | } |
|
120 | | // Instantiate a new ThreadPool with the existing builder arguments. |
121 | | template <typename ThreadPoolType> |
122 | 246 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { |
123 | 246 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { |
124 | 5 | pool->reset(new ThreadPoolType(*this)); |
125 | 241 | RETURN_IF_ERROR((*pool)->init()); |
126 | 241 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { |
127 | 5 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); |
128 | 5 | } else { |
129 | 241 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); |
130 | 241 | } |
131 | 241 | return Status::OK(); |
132 | 246 | } _ZNK5doris17ThreadPoolBuilder5buildINS_10ThreadPoolEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS5_EE Line | Count | Source | 122 | 241 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { | 123 | 241 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { | 124 | 241 | pool->reset(new ThreadPoolType(*this)); | 125 | 241 | RETURN_IF_ERROR((*pool)->init()); | 126 | 241 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { | 127 | 241 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); | 128 | 241 | } else { | 129 | 241 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); | 130 | 241 | } | 131 | 241 | return Status::OK(); | 132 | 241 | } |
_ZNK5doris17ThreadPoolBuilder5buildINS_14WorkThreadPoolILb1EEEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS6_EE Line | Count | Source | 122 | 5 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { | 123 | 5 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { | 124 | 5 | pool->reset(new ThreadPoolType(*this)); | 125 | 5 | RETURN_IF_ERROR((*pool)->init()); | 126 | 5 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { | 127 | 5 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); | 128 | 5 | } else { | 129 | 5 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); | 130 | 5 | } | 131 | 5 | return Status::OK(); | 132 | 5 | } |
|
133 | | |
134 | | ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; |
135 | | void operator=(const ThreadPoolBuilder&) = delete; |
136 | | |
137 | | private: |
138 | | friend class ThreadPool; |
139 | | const std::string _name; |
140 | | const std::string _workload_group; |
141 | | int _min_threads; |
142 | | int _max_threads; |
143 | | int _max_queue_size; |
144 | | CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; |
145 | | std::chrono::milliseconds _idle_timeout; |
146 | | |
147 | | template <typename T> |
148 | | static constexpr bool always_false_v = false; |
149 | | }; |
150 | | |
151 | | // Thread pool with a variable number of threads. |
152 | | // |
153 | | // Tasks submitted directly to the thread pool enter a FIFO queue and are |
154 | | // dispatched to a worker thread when one becomes free. Tasks may also be |
155 | | // submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions |
156 | | // can then be used to block on logical groups of tasks. |
157 | | // |
158 | | // A token operates in one of two ExecutionModes, determined at token |
159 | | // construction time: |
160 | | // 1. SERIAL: submitted tasks are run one at a time. |
161 | | // 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike |
162 | | // tasks submitted without a token, but the logical grouping that tokens |
163 | | // impart can be useful when a pool is shared by many contexts (e.g. to |
164 | | // safely shut down one context, to derive context-specific metrics, etc.). |
165 | | // |
166 | | // Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are |
167 | | // processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are |
168 | | // processed in a round-robin fashion, one task at a time. This prevents them |
169 | | // from starving one another. However, tokenless (and CONCURRENT token-based) |
170 | | // tasks can starve SERIAL token-based tasks. |
171 | | // |
172 | | // Usage Example: |
173 | | // static void Func(int n) { ... } |
174 | | // class Task : public Runnable { ... } |
175 | | // |
176 | | // std::unique_ptr<ThreadPool> thread_pool; |
177 | | // CHECK_OK( |
178 | | // ThreadPoolBuilder("my_pool") |
179 | | // .set_min_threads(0) |
180 | | // .set_max_threads(5) |
181 | | // .set_max_queue_size(10) |
182 | | // .set_idle_timeout(2000ms)) |
183 | | // .Build(&thread_pool)); |
184 | | // thread_pool->Submit(shared_ptr<Runnable>(new Task())); |
185 | | // thread_pool->SubmitFunc(std::bind(&Func, 10)); |
186 | | class ThreadPool { |
187 | | public: |
188 | | ~ThreadPool(); |
189 | | |
190 | | // Wait for the running tasks to complete and then shutdown the threads. |
191 | | // All the other pending tasks in the queue will be removed. |
192 | | // NOTE: That the user may implement an external abort logic for the |
193 | | // runnables, that must be called before Shutdown(), if the system |
194 | | // should know about the non-execution of these tasks, or the runnable |
195 | | // require an explicit "abort" notification to exit from the run loop. |
196 | | void shutdown(); |
197 | | |
198 | | // Submits a Runnable class. |
199 | | Status submit(std::shared_ptr<Runnable> r); |
200 | | |
201 | | // Submits a function bound using std::bind(&FuncName, args...). |
202 | | Status submit_func(std::function<void()> f); |
203 | | |
204 | | // Waits until all the tasks are completed. |
205 | | void wait(); |
206 | | |
207 | | // Waits for the pool to reach the idle state, or until 'delta' time elapses. |
208 | | // Returns true if the pool reached the idle state, false otherwise. |
209 | | template <class Rep, class Period> |
210 | | bool wait_for(const std::chrono::duration<Rep, Period>& delta) { |
211 | | std::unique_lock<std::mutex> l(_lock); |
212 | | check_not_pool_thread_unlocked(); |
213 | | return _idle_cond.wait_for( |
214 | | l, delta, [&]() { return _total_queued_tasks <= 0 && _active_threads <= 0; }); |
215 | | } |
216 | | Status set_min_threads(int min_threads); |
217 | | Status set_max_threads(int max_threads); |
218 | | |
219 | | // Allocates a new token for use in token-based task submission. All tokens |
220 | | // must be destroyed before their ThreadPool is destroyed. |
221 | | // |
222 | | // There is no limit on the number of tokens that may be allocated. |
223 | | enum class ExecutionMode { |
224 | | // Tasks submitted via this token will be executed serially. |
225 | | SERIAL, |
226 | | |
227 | | // Tasks submitted via this token may be executed concurrently. |
228 | | CONCURRENT |
229 | | }; |
230 | | std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int max_concurrency = INT_MAX); |
231 | | |
232 | | // Return the number of threads currently running (or in the process of starting up) |
233 | | // for this thread pool. |
234 | 24 | int num_threads() const { |
235 | 24 | std::lock_guard<std::mutex> l(_lock); |
236 | 24 | return _num_threads + _num_threads_pending_start; |
237 | 24 | } |
238 | | |
239 | 13 | int max_threads() const { |
240 | 13 | std::lock_guard<std::mutex> l(_lock); |
241 | 13 | return _max_threads; |
242 | 13 | } |
243 | | |
244 | 4 | int min_threads() const { |
245 | 4 | std::lock_guard<std::mutex> l(_lock); |
246 | 4 | return _min_threads; |
247 | 4 | } |
248 | | |
249 | 0 | int num_threads_pending_start() const { |
250 | 0 | std::lock_guard<std::mutex> l(_lock); |
251 | 0 | return _num_threads_pending_start; |
252 | 0 | } |
253 | | |
254 | 0 | int num_active_threads() const { |
255 | 0 | std::lock_guard<std::mutex> l(_lock); |
256 | 0 | return _active_threads; |
257 | 0 | } |
258 | | |
259 | 0 | int get_queue_size() const { |
260 | 0 | std::lock_guard<std::mutex> l(_lock); |
261 | 0 | return _total_queued_tasks; |
262 | 0 | } |
263 | | |
264 | 0 | int get_max_queue_size() const { |
265 | 0 | std::lock_guard<std::mutex> l(_lock); |
266 | 0 | return _max_queue_size; |
267 | 0 | } |
268 | | |
269 | 0 | std::vector<int> debug_info() const { |
270 | 0 | std::lock_guard<std::mutex> l(_lock); |
271 | 0 | std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads, |
272 | 0 | _max_threads}; |
273 | 0 | return arr; |
274 | 0 | } |
275 | | |
276 | 0 | std::string get_info() const { |
277 | 0 | std::lock_guard<std::mutex> l(_lock); |
278 | 0 | return fmt::format("ThreadPool(name={}, threads(active/pending)=({}/{}), queued_task={})", |
279 | 0 | _name, _active_threads, _num_threads_pending_start, _total_queued_tasks); |
280 | 0 | } |
281 | | |
282 | | ThreadPool(const ThreadPool&) = delete; |
283 | | void operator=(const ThreadPool&) = delete; |
284 | | |
285 | | private: |
286 | | friend class ThreadPoolBuilder; |
287 | | friend class ThreadPoolToken; |
288 | | |
289 | | // Client-provided task to be executed by this pool. |
290 | | struct Task { |
291 | | std::shared_ptr<Runnable> runnable; |
292 | | |
293 | | // Time at which the entry was submitted to the pool. |
294 | | MonotonicStopWatch submit_time_wather; |
295 | | }; |
296 | | |
297 | | // Creates a new thread pool using a builder. |
298 | | explicit ThreadPool(const ThreadPoolBuilder& builder); |
299 | | |
300 | | // Initializes the thread pool by starting the minimum number of threads. |
301 | | Status init(); |
302 | | |
303 | | // Dispatcher responsible for dequeueing and executing the tasks |
304 | | void dispatch_thread(); |
305 | | |
306 | | // Create new thread. |
307 | | // |
308 | | // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call. |
309 | | // NOTE: For performance reasons, _lock should not be held. |
310 | | Status create_thread(); |
311 | | |
312 | | // Aborts if the current thread is a member of this thread pool. |
313 | | void check_not_pool_thread_unlocked(); |
314 | | |
315 | | // Submits a task to be run via token. |
316 | | Status do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token); |
317 | | |
318 | | // Releases token 't' and invalidates it. |
319 | | void release_token(ThreadPoolToken* t); |
320 | | |
321 | | const std::string _name; |
322 | | const std::string _workload_group; |
323 | | int _min_threads; |
324 | | int _max_threads; |
325 | | const int _max_queue_size; |
326 | | const std::chrono::milliseconds _idle_timeout; |
327 | | |
328 | | // Overall status of the pool. Set to an error when the pool is shut down. |
329 | | // |
330 | | // Protected by '_lock'. |
331 | | Status _pool_status; |
332 | | |
333 | | // Synchronizes many of the members of the pool and all of its |
334 | | // condition variables. |
335 | | mutable std::mutex _lock; |
336 | | |
337 | | // Condition variable for "pool is idling". Waiters wake up when |
338 | | // _active_threads reaches zero. |
339 | | std::condition_variable _idle_cond; |
340 | | |
341 | | // Condition variable for "pool has no threads". Waiters wake up when |
342 | | // _num_threads and num_pending_threads_ are both 0. |
343 | | std::condition_variable _no_threads_cond; |
344 | | |
345 | | // Number of threads currently running. |
346 | | // |
347 | | // Protected by _lock. |
348 | | int _num_threads; |
349 | | |
350 | | // Number of threads which are in the process of starting. |
351 | | // When these threads start, they will decrement this counter and |
352 | | // accordingly increment '_num_threads'. |
353 | | // |
354 | | // Protected by _lock. |
355 | | int _num_threads_pending_start; |
356 | | |
357 | | // Number of threads currently running and executing client tasks. |
358 | | // |
359 | | // Protected by _lock. |
360 | | int _active_threads; |
361 | | |
362 | | // Total number of client tasks queued, either directly (_queue) or |
363 | | // indirectly (_tokens). |
364 | | // |
365 | | // Protected by _lock. |
366 | | int _total_queued_tasks; |
367 | | |
368 | | CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; |
369 | | |
370 | | // All allocated tokens. |
371 | | // |
372 | | // Protected by _lock. |
373 | | std::unordered_set<ThreadPoolToken*> _tokens; |
374 | | |
375 | | // FIFO of tokens from which tasks should be executed. Does not own the |
376 | | // tokens; they are owned by clients and are removed from the FIFO on shutdown. |
377 | | // |
378 | | // Protected by _lock. |
379 | | std::deque<ThreadPoolToken*> _queue; |
380 | | |
381 | | // Pointers to all running threads. Raw pointers are safe because a Thread |
382 | | // may only go out of scope after being removed from _threads. |
383 | | // |
384 | | // Protected by _lock. |
385 | | std::unordered_set<Thread*> _threads; |
386 | | |
387 | | // List of all threads currently waiting for work. |
388 | | // |
389 | | // A thread is added to the front of the list when it goes idle and is |
390 | | // removed from the front and signaled when new work arrives. This produces a |
391 | | // LIFO usage pattern that is more efficient than idling on a single |
392 | | // |
393 | | // Protected by _lock. |
394 | | struct IdleThread : public boost::intrusive::list_base_hook<> { |
395 | 1.42k | explicit IdleThread() = default; |
396 | | |
397 | | // Condition variable for "queue is not empty". Waiters wake up when a new |
398 | | // task is queued. |
399 | | std::condition_variable not_empty; |
400 | | IdleThread(const IdleThread&) = delete; |
401 | | void operator=(const IdleThread&) = delete; |
402 | | }; |
403 | | boost::intrusive::list<IdleThread> _idle_threads; // NOLINT(build/include_what_you_use) |
404 | | |
405 | | // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. |
406 | | std::unique_ptr<ThreadPoolToken> _tokenless; |
407 | | const UniqueId _id; |
408 | | |
409 | | std::shared_ptr<MetricEntity> _metric_entity; |
410 | | IntGauge* thread_pool_active_threads = nullptr; |
411 | | IntGauge* thread_pool_queue_size = nullptr; |
412 | | IntGauge* thread_pool_max_queue_size = nullptr; |
413 | | IntGauge* thread_pool_max_threads = nullptr; |
414 | | IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr; |
415 | | IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr; |
416 | | |
417 | | IntervalHistogramStat<int64_t> _task_execution_time_ns_statistic {1000}; |
418 | | IntervalHistogramStat<int64_t> _task_wait_worker_time_ns_statistic {1000}; |
419 | | |
420 | | IntCounter* thread_pool_submit_failed = nullptr; |
421 | | }; |
422 | | |
423 | | // Entry point for token-based task submission and blocking for a particular |
424 | | // thread pool. Tokens can only be created via ThreadPool::new_token(). |
425 | | // |
426 | | // All functions are thread-safe. Mutable members are protected via the |
427 | | // ThreadPool's lock. |
428 | | class ThreadPoolToken { |
429 | | public: |
430 | | // Destroys the token. |
431 | | // |
432 | | // May be called on a token with outstanding tasks, as Shutdown() will be |
433 | | // called first to take care of them. |
434 | | ~ThreadPoolToken(); |
435 | | |
436 | | // Submits a Runnable class. |
437 | | Status submit(std::shared_ptr<Runnable> r); |
438 | | |
439 | | // Submits a function bound using std::bind(&FuncName, args...). |
440 | | Status submit_func(std::function<void()> f); |
441 | | |
442 | | // Marks the token as unusable for future submissions. Any queued tasks not |
443 | | // yet running are destroyed. If tasks are in flight, Shutdown() will wait |
444 | | // on their completion before returning. |
445 | | void shutdown(); |
446 | | |
447 | | // Waits until all the tasks submitted via this token are completed. |
448 | | void wait(); |
449 | | |
450 | | // Waits for all submissions using this token are complete, or until 'delta' |
451 | | // time elapses. |
452 | | // |
453 | | // Returns true if all submissions are complete, false otherwise. |
454 | | template <class Rep, class Period> |
455 | | bool wait_for(const std::chrono::duration<Rep, Period>& delta) { |
456 | | std::unique_lock<std::mutex> l(_pool->_lock); |
457 | | _pool->check_not_pool_thread_unlocked(); |
458 | | return _not_running_cond.wait_for(l, delta, [&]() { return !is_active(); }); |
459 | | } |
460 | | |
461 | | bool need_dispatch(); |
462 | | |
463 | 34 | size_t num_tasks() { |
464 | 34 | std::lock_guard<std::mutex> l(_pool->_lock); |
465 | 34 | return _entries.size(); |
466 | 34 | } |
467 | | |
468 | | ThreadPoolToken(const ThreadPoolToken&) = delete; |
469 | | void operator=(const ThreadPoolToken&) = delete; |
470 | | |
471 | | private: |
472 | | // All possible token states. Legal state transitions: |
473 | | // IDLE -> RUNNING: task is submitted via token |
474 | | // IDLE -> QUIESCED: token or pool is shut down |
475 | | // RUNNING -> IDLE: worker thread finishes executing a task and |
476 | | // there are no more tasks queued to the token |
477 | | // RUNNING -> QUIESCING: token or pool is shut down while worker thread |
478 | | // is executing a task |
479 | | // RUNNING -> QUIESCED: token or pool is shut down |
480 | | // QUIESCING -> QUIESCED: worker thread finishes executing a task |
481 | | // belonging to a shut down token or pool |
482 | | enum class State { |
483 | | // Token has no queued tasks. |
484 | | IDLE, |
485 | | |
486 | | // A worker thread is running one of the token's previously queued tasks. |
487 | | RUNNING, |
488 | | |
489 | | // No new tasks may be submitted to the token. A worker thread is still |
490 | | // running a previously queued task. |
491 | | QUIESCING, |
492 | | |
493 | | // No new tasks may be submitted to the token. There are no active tasks |
494 | | // either. At this state, the token may only be destroyed. |
495 | | QUIESCED, |
496 | | }; |
497 | | |
498 | | // Writes a textual representation of the token state in 's' to 'o'. |
499 | | friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s); |
500 | | |
501 | | friend class ThreadPool; |
502 | | |
503 | | // Returns a textual representation of 's' suitable for debugging. |
504 | | static const char* state_to_string(State s); |
505 | | |
506 | | // Constructs a new token. |
507 | | // |
508 | | // The token may not outlive its thread pool ('pool'). |
509 | | ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, |
510 | | int max_concurrency = INT_MAX); |
511 | | |
512 | | // Changes this token's state to 'new_state' taking actions as needed. |
513 | | void transition(State new_state); |
514 | | |
515 | | // Returns true if this token has a task queued and ready to run, or if a |
516 | | // task belonging to this token is already running. |
517 | 10.4k | bool is_active() const { return _state == State::RUNNING || _state == State::QUIESCING; } |
518 | | |
519 | | // Returns true if new tasks may be submitted to this token. |
520 | 10.0k | bool may_submit_new_tasks() const { |
521 | 10.0k | return _state != State::QUIESCING && _state != State::QUIESCED; |
522 | 10.0k | } |
523 | | |
524 | 23.4k | State state() const { return _state; } |
525 | 4.17k | ThreadPool::ExecutionMode mode() const { return _mode; } |
526 | | |
527 | | // Token's configured execution mode. |
528 | | ThreadPool::ExecutionMode _mode; |
529 | | |
530 | | // Pointer to the token's thread pool. |
531 | | ThreadPool* _pool = nullptr; |
532 | | |
533 | | // Token state machine. |
534 | | State _state; |
535 | | |
536 | | // Queued client tasks. |
537 | | std::deque<ThreadPool::Task> _entries; |
538 | | |
539 | | // Condition variable for "token is idle". Waiters wake up when the token |
540 | | // transitions to IDLE or QUIESCED. |
541 | | std::condition_variable _not_running_cond; |
542 | | |
543 | | // Number of worker threads currently executing tasks belonging to this |
544 | | // token. |
545 | | int _active_threads; |
546 | | // The max number of tasks that can be ran concurrenlty. This is to limit |
547 | | // the concurrency of a thread pool token, and default is INT_MAX(no limited) |
548 | | int _max_concurrency; |
549 | | // Number of tasks which has been submitted to the thread pool's queue. |
550 | | int _num_submitted_tasks; |
551 | | // Number of tasks which has not been submitted to the thread pool's queue. |
552 | | int _num_unsubmitted_tasks; |
553 | | }; |
554 | | |
555 | | } // namespace doris |