/root/doris/be/src/util/threadpool.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.h |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <limits.h> |
24 | | #include <stddef.h> |
25 | | |
26 | | #include <boost/intrusive/detail/algo_type.hpp> |
27 | | #include <boost/intrusive/list.hpp> |
28 | | #include <boost/intrusive/list_hook.hpp> |
29 | | // IWYU pragma: no_include <bits/chrono.h> |
30 | | #include <chrono> // IWYU pragma: keep |
31 | | #include <condition_variable> |
32 | | #include <deque> |
33 | | #include <functional> |
34 | | #include <iosfwd> |
35 | | #include <memory> |
36 | | #include <mutex> |
37 | | #include <string> |
38 | | #include <unordered_set> |
39 | | |
40 | | #include "agent/cgroup_cpu_ctl.h" |
41 | | #include "common/status.h" |
42 | | #include "util/work_thread_pool.hpp" |
43 | | |
44 | | namespace doris { |
45 | | |
46 | | class Thread; |
47 | | class ThreadPool; |
48 | | class ThreadPoolToken; |
49 | | |
50 | | class Runnable { |
51 | | public: |
52 | | virtual void run() = 0; |
53 | 7.92k | virtual ~Runnable() {} |
54 | | }; |
55 | | |
56 | | // ThreadPool takes a lot of arguments. We provide sane defaults with a builder. |
57 | | // |
58 | | // name: Used for debugging output and default names of the worker threads. |
59 | | // Since thread names are limited to 16 characters on Linux, it's good to |
60 | | // choose a short name here. |
61 | | // Required. |
62 | | // |
63 | | // trace_metric_prefix: used to prefix the names of TraceMetric counters. |
64 | | // When a task on a thread pool has an associated trace, the thread pool |
65 | | // implementation will increment TraceMetric counters to indicate the |
66 | | // amount of time spent waiting in the queue as well as the amount of wall |
67 | | // and CPU time spent executing. By default, these counters are prefixed |
68 | | // with the name of the thread pool. For example, if the pool is named |
69 | | // 'apply', then counters such as 'apply.queue_time_us' will be |
70 | | // incremented. |
71 | | // |
72 | | // The TraceMetrics implementation relies on the number of distinct counter |
73 | | // names being small. Thus, if the thread pool name itself is dynamically |
74 | | // generated, the default behavior described above would result in an |
75 | | // unbounded number of distinct counter names. The 'trace_metric_prefix' |
76 | | // setting can be used to override the prefix used in generating the trace |
77 | | // metric names. |
78 | | // |
79 | | // For example, the Raft thread pools are named "<tablet id>-raft" which |
80 | | // has unbounded cardinality (a server may have thousands of different |
81 | | // tablet IDs over its lifetime). In that case, setting the prefix to |
82 | | // "raft" will avoid any issues. |
83 | | // |
84 | | // min_threads: Minimum number of threads we'll have at any time. |
85 | | // Default: 0. |
86 | | // |
87 | | // max_threads: Maximum number of threads we'll have at any time. |
88 | | // Default: Number of CPUs detected on the system. |
89 | | // |
90 | | // max_queue_size: Maximum number of items to enqueue before returning a |
91 | | // Status::ServiceUnavailable message from Submit(). |
92 | | // Default: INT_MAX. |
93 | | // |
94 | | // idle_timeout: How long we'll keep around an idle thread before timing it out. |
95 | | // We always keep at least min_threads. |
96 | | // Default: 500 milliseconds. |
97 | | // |
98 | | // metrics: Histograms, counters, etc. to update on various threadpool events. |
99 | | // Default: not set. |
100 | | // |
101 | | class ThreadPoolBuilder { |
102 | | public: |
103 | | explicit ThreadPoolBuilder(std::string name); |
104 | | |
105 | | // Note: We violate the style guide by returning mutable references here |
106 | | // in order to provide traditional Builder pattern conveniences. |
107 | | ThreadPoolBuilder& set_min_threads(int min_threads); |
108 | | ThreadPoolBuilder& set_max_threads(int max_threads); |
109 | | ThreadPoolBuilder& set_max_queue_size(int max_queue_size); |
110 | | ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl); |
111 | | template <class Rep, class Period> |
112 | 7 | ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { |
113 | 7 | _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); |
114 | 7 | return *this; |
115 | 7 | } _ZN5doris17ThreadPoolBuilder16set_idle_timeoutIlSt5ratioILl1ELl1000EEEERS0_RKNSt6chrono8durationIT_T0_EE Line | Count | Source | 112 | 6 | ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { | 113 | 6 | _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); | 114 | 6 | return *this; | 115 | 6 | } |
_ZN5doris17ThreadPoolBuilder16set_idle_timeoutIlSt5ratioILl1ELl1000000EEEERS0_RKNSt6chrono8durationIT_T0_EE Line | Count | Source | 112 | 1 | ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { | 113 | 1 | _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); | 114 | 1 | return *this; | 115 | 1 | } |
|
116 | | // Instantiate a new ThreadPool with the existing builder arguments. |
117 | | template <typename ThreadPoolType> |
118 | 358 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { |
119 | 358 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { |
120 | 22 | pool->reset(new ThreadPoolType(*this)); |
121 | 336 | RETURN_IF_ERROR((*pool)->init()); |
122 | 336 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { |
123 | 22 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); |
124 | 22 | } else { |
125 | 336 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); |
126 | 336 | } |
127 | 336 | return Status::OK(); |
128 | 358 | } _ZNK5doris17ThreadPoolBuilder5buildINS_10ThreadPoolEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS5_EE Line | Count | Source | 118 | 336 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { | 119 | 336 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { | 120 | 336 | pool->reset(new ThreadPoolType(*this)); | 121 | 336 | RETURN_IF_ERROR((*pool)->init()); | 122 | 336 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { | 123 | 336 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); | 124 | 336 | } else { | 125 | 336 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); | 126 | 336 | } | 127 | 336 | return Status::OK(); | 128 | 336 | } |
_ZNK5doris17ThreadPoolBuilder5buildINS_14WorkThreadPoolILb1EEEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS6_EE Line | Count | Source | 118 | 22 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { | 119 | 22 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { | 120 | 22 | pool->reset(new ThreadPoolType(*this)); | 121 | 22 | RETURN_IF_ERROR((*pool)->init()); | 122 | 22 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { | 123 | 22 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); | 124 | 22 | } else { | 125 | 22 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); | 126 | 22 | } | 127 | 22 | return Status::OK(); | 128 | 22 | } |
|
129 | | |
130 | | private: |
131 | | friend class ThreadPool; |
132 | | const std::string _name; |
133 | | int _min_threads; |
134 | | int _max_threads; |
135 | | int _max_queue_size; |
136 | | CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; |
137 | | std::chrono::milliseconds _idle_timeout; |
138 | | |
139 | | ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; |
140 | | void operator=(const ThreadPoolBuilder&) = delete; |
141 | | |
142 | | template <typename T> |
143 | | static constexpr bool always_false_v = false; |
144 | | }; |
145 | | |
146 | | // Thread pool with a variable number of threads. |
147 | | // |
148 | | // Tasks submitted directly to the thread pool enter a FIFO queue and are |
149 | | // dispatched to a worker thread when one becomes free. Tasks may also be |
150 | | // submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions |
151 | | // can then be used to block on logical groups of tasks. |
152 | | // |
153 | | // A token operates in one of two ExecutionModes, determined at token |
154 | | // construction time: |
155 | | // 1. SERIAL: submitted tasks are run one at a time. |
156 | | // 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike |
157 | | // tasks submitted without a token, but the logical grouping that tokens |
158 | | // impart can be useful when a pool is shared by many contexts (e.g. to |
159 | | // safely shut down one context, to derive context-specific metrics, etc.). |
160 | | // |
161 | | // Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are |
162 | | // processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are |
163 | | // processed in a round-robin fashion, one task at a time. This prevents them |
164 | | // from starving one another. However, tokenless (and CONCURRENT token-based) |
165 | | // tasks can starve SERIAL token-based tasks. |
166 | | // |
167 | | // Usage Example: |
168 | | // static void Func(int n) { ... } |
169 | | // class Task : public Runnable { ... } |
170 | | // |
171 | | // std::unique_ptr<ThreadPool> thread_pool; |
172 | | // CHECK_OK( |
173 | | // ThreadPoolBuilder("my_pool") |
174 | | // .set_min_threads(0) |
175 | | // .set_max_threads(5) |
176 | | // .set_max_queue_size(10) |
177 | | // .set_idle_timeout(2000ms)) |
178 | | // .Build(&thread_pool)); |
179 | | // thread_pool->Submit(shared_ptr<Runnable>(new Task())); |
180 | | // thread_pool->SubmitFunc(std::bind(&Func, 10)); |
181 | | class ThreadPool { |
182 | | public: |
183 | | ~ThreadPool(); |
184 | | |
185 | | // Wait for the running tasks to complete and then shutdown the threads. |
186 | | // All the other pending tasks in the queue will be removed. |
187 | | // NOTE: That the user may implement an external abort logic for the |
188 | | // runnables, that must be called before Shutdown(), if the system |
189 | | // should know about the non-execution of these tasks, or the runnable |
190 | | // require an explicit "abort" notification to exit from the run loop. |
191 | | void shutdown(); |
192 | | |
193 | | // Submits a Runnable class. |
194 | | Status submit(std::shared_ptr<Runnable> r); |
195 | | |
196 | | // Submits a function bound using std::bind(&FuncName, args...). |
197 | | Status submit_func(std::function<void()> f); |
198 | | |
199 | | // Waits until all the tasks are completed. |
200 | | void wait(); |
201 | | |
202 | | // Waits for the pool to reach the idle state, or until 'delta' time elapses. |
203 | | // Returns true if the pool reached the idle state, false otherwise. |
204 | | template <class Rep, class Period> |
205 | | bool wait_for(const std::chrono::duration<Rep, Period>& delta) { |
206 | | std::unique_lock<std::mutex> l(_lock); |
207 | | check_not_pool_thread_unlocked(); |
208 | | return _idle_cond.wait_for( |
209 | | l, delta, [&]() { return _total_queued_tasks <= 0 && _active_threads <= 0; }); |
210 | | } |
211 | | Status set_min_threads(int min_threads); |
212 | | Status set_max_threads(int max_threads); |
213 | | |
214 | | // Allocates a new token for use in token-based task submission. All tokens |
215 | | // must be destroyed before their ThreadPool is destroyed. |
216 | | // |
217 | | // There is no limit on the number of tokens that may be allocated. |
218 | | enum class ExecutionMode { |
219 | | // Tasks submitted via this token will be executed serially. |
220 | | SERIAL, |
221 | | |
222 | | // Tasks submitted via this token may be executed concurrently. |
223 | | CONCURRENT |
224 | | }; |
225 | | std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int max_concurrency = INT_MAX); |
226 | | |
227 | | // Return the number of threads currently running (or in the process of starting up) |
228 | | // for this thread pool. |
229 | 24 | int num_threads() const { |
230 | 24 | std::lock_guard<std::mutex> l(_lock); |
231 | 24 | return _num_threads + _num_threads_pending_start; |
232 | 24 | } |
233 | | |
234 | 121 | int max_threads() const { |
235 | 121 | std::lock_guard<std::mutex> l(_lock); |
236 | 121 | return _max_threads; |
237 | 121 | } |
238 | | |
239 | 118 | int min_threads() const { |
240 | 118 | std::lock_guard<std::mutex> l(_lock); |
241 | 118 | return _min_threads; |
242 | 118 | } |
243 | | |
244 | 0 | int num_threads_pending_start() const { |
245 | 0 | std::lock_guard<std::mutex> l(_lock); |
246 | 0 | return _num_threads_pending_start; |
247 | 0 | } |
248 | | |
249 | 0 | int num_active_threads() const { |
250 | 0 | std::lock_guard<std::mutex> l(_lock); |
251 | 0 | return _active_threads; |
252 | 0 | } |
253 | | |
254 | 0 | int get_queue_size() const { |
255 | 0 | std::lock_guard<std::mutex> l(_lock); |
256 | 0 | return _total_queued_tasks; |
257 | 0 | } |
258 | | |
259 | 0 | std::vector<int> debug_info() { |
260 | 0 | std::lock_guard<std::mutex> l(_lock); |
261 | 0 | std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads, |
262 | 0 | _max_threads}; |
263 | 0 | return arr; |
264 | 0 | } |
265 | | |
266 | | private: |
267 | | friend class ThreadPoolBuilder; |
268 | | friend class ThreadPoolToken; |
269 | | |
270 | | // Client-provided task to be executed by this pool. |
271 | | struct Task { |
272 | | std::shared_ptr<Runnable> runnable; |
273 | | |
274 | | // Time at which the entry was submitted to the pool. |
275 | | std::chrono::time_point<std::chrono::system_clock> submit_time; |
276 | | }; |
277 | | |
278 | | // Creates a new thread pool using a builder. |
279 | | explicit ThreadPool(const ThreadPoolBuilder& builder); |
280 | | |
281 | | // Initializes the thread pool by starting the minimum number of threads. |
282 | | Status init(); |
283 | | |
284 | | // Dispatcher responsible for dequeueing and executing the tasks |
285 | | void dispatch_thread(); |
286 | | |
287 | | // Create new thread. |
288 | | // |
289 | | // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call. |
290 | | // NOTE: For performance reasons, _lock should not be held. |
291 | | Status create_thread(); |
292 | | |
293 | | // Aborts if the current thread is a member of this thread pool. |
294 | | void check_not_pool_thread_unlocked(); |
295 | | |
296 | | // Submits a task to be run via token. |
297 | | Status do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token); |
298 | | |
299 | | // Releases token 't' and invalidates it. |
300 | | void release_token(ThreadPoolToken* t); |
301 | | |
302 | | const std::string _name; |
303 | | int _min_threads; |
304 | | int _max_threads; |
305 | | const int _max_queue_size; |
306 | | const std::chrono::milliseconds _idle_timeout; |
307 | | |
308 | | // Overall status of the pool. Set to an error when the pool is shut down. |
309 | | // |
310 | | // Protected by '_lock'. |
311 | | Status _pool_status; |
312 | | |
313 | | // Synchronizes many of the members of the pool and all of its |
314 | | // condition variables. |
315 | | mutable std::mutex _lock; |
316 | | |
317 | | // Condition variable for "pool is idling". Waiters wake up when |
318 | | // _active_threads reaches zero. |
319 | | std::condition_variable _idle_cond; |
320 | | |
321 | | // Condition variable for "pool has no threads". Waiters wake up when |
322 | | // _num_threads and num_pending_threads_ are both 0. |
323 | | std::condition_variable _no_threads_cond; |
324 | | |
325 | | // Number of threads currently running. |
326 | | // |
327 | | // Protected by _lock. |
328 | | int _num_threads; |
329 | | |
330 | | // Number of threads which are in the process of starting. |
331 | | // When these threads start, they will decrement this counter and |
332 | | // accordingly increment '_num_threads'. |
333 | | // |
334 | | // Protected by _lock. |
335 | | int _num_threads_pending_start; |
336 | | |
337 | | // Number of threads currently running and executing client tasks. |
338 | | // |
339 | | // Protected by _lock. |
340 | | int _active_threads; |
341 | | |
342 | | // Total number of client tasks queued, either directly (_queue) or |
343 | | // indirectly (_tokens). |
344 | | // |
345 | | // Protected by _lock. |
346 | | int _total_queued_tasks; |
347 | | |
348 | | CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; |
349 | | |
350 | | // All allocated tokens. |
351 | | // |
352 | | // Protected by _lock. |
353 | | std::unordered_set<ThreadPoolToken*> _tokens; |
354 | | |
355 | | // FIFO of tokens from which tasks should be executed. Does not own the |
356 | | // tokens; they are owned by clients and are removed from the FIFO on shutdown. |
357 | | // |
358 | | // Protected by _lock. |
359 | | std::deque<ThreadPoolToken*> _queue; |
360 | | |
361 | | // Pointers to all running threads. Raw pointers are safe because a Thread |
362 | | // may only go out of scope after being removed from _threads. |
363 | | // |
364 | | // Protected by _lock. |
365 | | std::unordered_set<Thread*> _threads; |
366 | | |
367 | | // List of all threads currently waiting for work. |
368 | | // |
369 | | // A thread is added to the front of the list when it goes idle and is |
370 | | // removed from the front and signaled when new work arrives. This produces a |
371 | | // LIFO usage pattern that is more efficient than idling on a single |
372 | | // |
373 | | // Protected by _lock. |
374 | | struct IdleThread : public boost::intrusive::list_base_hook<> { |
375 | 2.67k | explicit IdleThread() {} |
376 | | |
377 | | // Condition variable for "queue is not empty". Waiters wake up when a new |
378 | | // task is queued. |
379 | | std::condition_variable not_empty; |
380 | | IdleThread(const IdleThread&) = delete; |
381 | | void operator=(const IdleThread&) = delete; |
382 | | }; |
383 | | boost::intrusive::list<IdleThread> _idle_threads; // NOLINT(build/include_what_you_use) |
384 | | |
385 | | // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. |
386 | | std::unique_ptr<ThreadPoolToken> _tokenless; |
387 | | |
388 | | ThreadPool(const ThreadPool&) = delete; |
389 | | void operator=(const ThreadPool&) = delete; |
390 | | }; |
391 | | |
392 | | // Entry point for token-based task submission and blocking for a particular |
393 | | // thread pool. Tokens can only be created via ThreadPool::new_token(). |
394 | | // |
395 | | // All functions are thread-safe. Mutable members are protected via the |
396 | | // ThreadPool's lock. |
397 | | class ThreadPoolToken { |
398 | | public: |
399 | | // Destroys the token. |
400 | | // |
401 | | // May be called on a token with outstanding tasks, as Shutdown() will be |
402 | | // called first to take care of them. |
403 | | ~ThreadPoolToken(); |
404 | | |
405 | | // Submits a Runnable class. |
406 | | Status submit(std::shared_ptr<Runnable> r); |
407 | | |
408 | | // Submits a function bound using std::bind(&FuncName, args...). |
409 | | Status submit_func(std::function<void()> f); |
410 | | |
411 | | // Marks the token as unusable for future submissions. Any queued tasks not |
412 | | // yet running are destroyed. If tasks are in flight, Shutdown() will wait |
413 | | // on their completion before returning. |
414 | | void shutdown(); |
415 | | |
416 | | // Waits until all the tasks submitted via this token are completed. |
417 | | void wait(); |
418 | | |
419 | | // Waits for all submissions using this token are complete, or until 'delta' |
420 | | // time elapses. |
421 | | // |
422 | | // Returns true if all submissions are complete, false otherwise. |
423 | | template <class Rep, class Period> |
424 | | bool wait_for(const std::chrono::duration<Rep, Period>& delta) { |
425 | | std::unique_lock<std::mutex> l(_pool->_lock); |
426 | | _pool->check_not_pool_thread_unlocked(); |
427 | | return _not_running_cond.wait_for(l, delta, [&]() { return !is_active(); }); |
428 | | } |
429 | | |
430 | | bool need_dispatch(); |
431 | | |
432 | 34 | size_t num_tasks() { |
433 | 34 | std::lock_guard<std::mutex> l(_pool->_lock); |
434 | 34 | return _entries.size(); |
435 | 34 | } |
436 | | |
437 | | private: |
438 | | // All possible token states. Legal state transitions: |
439 | | // IDLE -> RUNNING: task is submitted via token |
440 | | // IDLE -> QUIESCED: token or pool is shut down |
441 | | // RUNNING -> IDLE: worker thread finishes executing a task and |
442 | | // there are no more tasks queued to the token |
443 | | // RUNNING -> QUIESCING: token or pool is shut down while worker thread |
444 | | // is executing a task |
445 | | // RUNNING -> QUIESCED: token or pool is shut down |
446 | | // QUIESCING -> QUIESCED: worker thread finishes executing a task |
447 | | // belonging to a shut down token or pool |
448 | | enum class State { |
449 | | // Token has no queued tasks. |
450 | | IDLE, |
451 | | |
452 | | // A worker thread is running one of the token's previously queued tasks. |
453 | | RUNNING, |
454 | | |
455 | | // No new tasks may be submitted to the token. A worker thread is still |
456 | | // running a previously queued task. |
457 | | QUIESCING, |
458 | | |
459 | | // No new tasks may be submitted to the token. There are no active tasks |
460 | | // either. At this state, the token may only be destroyed. |
461 | | QUIESCED, |
462 | | }; |
463 | | |
464 | | // Writes a textual representation of the token state in 's' to 'o'. |
465 | | friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s); |
466 | | |
467 | | friend class ThreadPool; |
468 | | |
469 | | // Returns a textual representation of 's' suitable for debugging. |
470 | | static const char* state_to_string(State s); |
471 | | |
472 | | // Constructs a new token. |
473 | | // |
474 | | // The token may not outlive its thread pool ('pool'). |
475 | | ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, |
476 | | int max_concurrency = INT_MAX); |
477 | | |
478 | | // Changes this token's state to 'new_state' taking actions as needed. |
479 | | void transition(State new_state); |
480 | | |
481 | | // Returns true if this token has a task queued and ready to run, or if a |
482 | | // task belonging to this token is already running. |
483 | 8.82k | bool is_active() const { return _state == State::RUNNING || _state == State::QUIESCING; } |
484 | | |
485 | | // Returns true if new tasks may be submitted to this token. |
486 | 7.93k | bool may_submit_new_tasks() const { |
487 | 7.93k | return _state != State::QUIESCING && _state != State::QUIESCED; |
488 | 7.93k | } |
489 | | |
490 | 20.1k | State state() const { return _state; } |
491 | 2.68k | ThreadPool::ExecutionMode mode() const { return _mode; } |
492 | | |
493 | | // Token's configured execution mode. |
494 | | ThreadPool::ExecutionMode _mode; |
495 | | |
496 | | // Pointer to the token's thread pool. |
497 | | ThreadPool* _pool = nullptr; |
498 | | |
499 | | // Token state machine. |
500 | | State _state; |
501 | | |
502 | | // Queued client tasks. |
503 | | std::deque<ThreadPool::Task> _entries; |
504 | | |
505 | | // Condition variable for "token is idle". Waiters wake up when the token |
506 | | // transitions to IDLE or QUIESCED. |
507 | | std::condition_variable _not_running_cond; |
508 | | |
509 | | // Number of worker threads currently executing tasks belonging to this |
510 | | // token. |
511 | | int _active_threads; |
512 | | // The max number of tasks that can be ran concurrenlty. This is to limit |
513 | | // the concurrency of a thread pool token, and default is INT_MAX(no limited) |
514 | | int _max_concurrency; |
515 | | // Number of tasks which has been submitted to the thread pool's queue. |
516 | | int _num_submitted_tasks; |
517 | | // Number of tasks which has not been submitted to the thread pool's queue. |
518 | | int _num_unsubmitted_tasks; |
519 | | |
520 | | ThreadPoolToken(const ThreadPoolToken&) = delete; |
521 | | void operator=(const ThreadPoolToken&) = delete; |
522 | | }; |
523 | | |
524 | | } // namespace doris |