/root/doris/be/src/util/threadpool.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.h |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <gen_cpp/Types_types.h> |
24 | | |
25 | | #include <boost/intrusive/detail/algo_type.hpp> |
26 | | #include <boost/intrusive/list.hpp> |
27 | | #include <boost/intrusive/list_hook.hpp> |
28 | | #include <climits> |
29 | | #include <cstddef> |
30 | | // IWYU pragma: no_include <bits/chrono.h> |
31 | | #include <chrono> // IWYU pragma: keep |
32 | | #include <condition_variable> |
33 | | #include <deque> |
34 | | #include <functional> |
35 | | #include <iosfwd> |
36 | | #include <memory> |
37 | | #include <mutex> |
38 | | #include <string> |
39 | | #include <unordered_set> |
40 | | |
41 | | #include "agent/cgroup_cpu_ctl.h" |
42 | | #include "common/status.h" |
43 | | #include "util/metrics.h" |
44 | | #include "util/uid_util.h" |
45 | | #include "util/work_thread_pool.hpp" |
46 | | |
47 | | namespace doris { |
48 | | |
49 | | class Thread; |
50 | | class ThreadPool; |
51 | | class ThreadPoolToken; |
52 | | |
53 | | class Runnable { |
54 | | public: |
55 | | virtual void run() = 0; |
56 | 11.8k | virtual ~Runnable() = default; |
57 | | }; |
58 | | |
59 | | // ThreadPool takes a lot of arguments. We provide sane defaults with a builder. |
60 | | // |
61 | | // name: Used for debugging output and default names of the worker threads. |
62 | | // Since thread names are limited to 16 characters on Linux, it's good to |
63 | | // choose a short name here. |
64 | | // Required. |
65 | | // |
66 | | // trace_metric_prefix: used to prefix the names of TraceMetric counters. |
67 | | // When a task on a thread pool has an associated trace, the thread pool |
68 | | // implementation will increment TraceMetric counters to indicate the |
69 | | // amount of time spent waiting in the queue as well as the amount of wall |
70 | | // and CPU time spent executing. By default, these counters are prefixed |
71 | | // with the name of the thread pool. For example, if the pool is named |
72 | | // 'apply', then counters such as 'apply.queue_time_us' will be |
73 | | // incremented. |
74 | | // |
75 | | // The TraceMetrics implementation relies on the number of distinct counter |
76 | | // names being small. Thus, if the thread pool name itself is dynamically |
77 | | // generated, the default behavior described above would result in an |
78 | | // unbounded number of distinct counter names. The 'trace_metric_prefix' |
79 | | // setting can be used to override the prefix used in generating the trace |
80 | | // metric names. |
81 | | // |
82 | | // For example, the Raft thread pools are named "<tablet id>-raft" which |
83 | | // has unbounded cardinality (a server may have thousands of different |
84 | | // tablet IDs over its lifetime). In that case, setting the prefix to |
85 | | // "raft" will avoid any issues. |
86 | | // |
87 | | // min_threads: Minimum number of threads we'll have at any time. |
88 | | // Default: 0. |
89 | | // |
90 | | // max_threads: Maximum number of threads we'll have at any time. |
91 | | // Default: Number of CPUs detected on the system. |
92 | | // |
93 | | // max_queue_size: Maximum number of items to enqueue before returning a |
94 | | // Status::ServiceUnavailable message from Submit(). |
95 | | // Default: INT_MAX. |
96 | | // |
97 | | // idle_timeout: How long we'll keep around an idle thread before timing it out. |
98 | | // We always keep at least min_threads. |
99 | | // Default: 500 milliseconds. |
100 | | // |
101 | | // metrics: Histograms, counters, etc. to update on various threadpool events. |
102 | | // Default: not set. |
103 | | // |
104 | | class ThreadPoolBuilder { |
105 | | public: |
106 | | explicit ThreadPoolBuilder(std::string name, std::string workload_group = ""); |
107 | | |
108 | | // Note: We violate the style guide by returning mutable references here |
109 | | // in order to provide traditional Builder pattern conveniences. |
110 | | ThreadPoolBuilder& set_min_threads(int min_threads); |
111 | | ThreadPoolBuilder& set_max_threads(int max_threads); |
112 | | ThreadPoolBuilder& set_max_queue_size(int max_queue_size); |
113 | | ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl); |
114 | | template <class Rep, class Period> |
115 | | ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { |
116 | | _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); |
117 | | return *this; |
118 | | } |
119 | | // Instantiate a new ThreadPool with the existing builder arguments. |
120 | | template <typename ThreadPoolType> |
121 | 321 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { |
122 | 321 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { |
123 | 5 | pool->reset(new ThreadPoolType(*this)); |
124 | 316 | RETURN_IF_ERROR((*pool)->init()); |
125 | 316 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { |
126 | 5 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); |
127 | 5 | } else { |
128 | 316 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); |
129 | 316 | } |
130 | 316 | return Status::OK(); |
131 | 321 | } _ZNK5doris17ThreadPoolBuilder5buildINS_10ThreadPoolEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS5_EE Line | Count | Source | 121 | 316 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { | 122 | 316 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { | 123 | 316 | pool->reset(new ThreadPoolType(*this)); | 124 | 316 | RETURN_IF_ERROR((*pool)->init()); | 125 | 316 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { | 126 | 316 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); | 127 | 316 | } else { | 128 | 316 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); | 129 | 316 | } | 130 | 316 | return Status::OK(); | 131 | 316 | } |
_ZNK5doris17ThreadPoolBuilder5buildINS_14WorkThreadPoolILb1EEEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS6_EE Line | Count | Source | 121 | 5 | Status build(std::unique_ptr<ThreadPoolType>* pool) const { | 122 | 5 | if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { | 123 | 5 | pool->reset(new ThreadPoolType(*this)); | 124 | 5 | RETURN_IF_ERROR((*pool)->init()); | 125 | 5 | } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { | 126 | 5 | pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); | 127 | 5 | } else { | 128 | 5 | static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); | 129 | 5 | } | 130 | 5 | return Status::OK(); | 131 | 5 | } |
|
132 | | |
133 | | ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; |
134 | | void operator=(const ThreadPoolBuilder&) = delete; |
135 | | |
136 | | private: |
137 | | friend class ThreadPool; |
138 | | const std::string _name; |
139 | | const std::string _workload_group; |
140 | | int _min_threads; |
141 | | int _max_threads; |
142 | | int _max_queue_size; |
143 | | std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; |
144 | | std::chrono::milliseconds _idle_timeout; |
145 | | |
146 | | template <typename T> |
147 | | static constexpr bool always_false_v = false; |
148 | | }; |
149 | | |
150 | | // Thread pool with a variable number of threads. |
151 | | // |
152 | | // Tasks submitted directly to the thread pool enter a FIFO queue and are |
153 | | // dispatched to a worker thread when one becomes free. Tasks may also be |
154 | | // submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions |
155 | | // can then be used to block on logical groups of tasks. |
156 | | // |
157 | | // A token operates in one of two ExecutionModes, determined at token |
158 | | // construction time: |
159 | | // 1. SERIAL: submitted tasks are run one at a time. |
160 | | // 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike |
161 | | // tasks submitted without a token, but the logical grouping that tokens |
162 | | // impart can be useful when a pool is shared by many contexts (e.g. to |
163 | | // safely shut down one context, to derive context-specific metrics, etc.). |
164 | | // |
165 | | // Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are |
166 | | // processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are |
167 | | // processed in a round-robin fashion, one task at a time. This prevents them |
168 | | // from starving one another. However, tokenless (and CONCURRENT token-based) |
169 | | // tasks can starve SERIAL token-based tasks. |
170 | | // |
171 | | // Usage Example: |
172 | | // static void Func(int n) { ... } |
173 | | // class Task : public Runnable { ... } |
174 | | // |
175 | | // std::unique_ptr<ThreadPool> thread_pool; |
176 | | // CHECK_OK( |
177 | | // ThreadPoolBuilder("my_pool") |
178 | | // .set_min_threads(0) |
179 | | // .set_max_threads(5) |
180 | | // .set_max_queue_size(10) |
181 | | // .set_idle_timeout(2000ms)) |
182 | | // .Build(&thread_pool)); |
183 | | // thread_pool->Submit(shared_ptr<Runnable>(new Task())); |
184 | | // thread_pool->SubmitFunc(std::bind(&Func, 10)); |
185 | | class ThreadPool { |
186 | | public: |
187 | | ~ThreadPool(); |
188 | | |
189 | | // Wait for the running tasks to complete and then shutdown the threads. |
190 | | // All the other pending tasks in the queue will be removed. |
191 | | // NOTE: That the user may implement an external abort logic for the |
192 | | // runnables, that must be called before Shutdown(), if the system |
193 | | // should know about the non-execution of these tasks, or the runnable |
194 | | // require an explicit "abort" notification to exit from the run loop. |
195 | | void shutdown(); |
196 | | |
197 | | // Submits a Runnable class. |
198 | | Status submit(std::shared_ptr<Runnable> r); |
199 | | |
200 | | // Submits a function bound using std::bind(&FuncName, args...). |
201 | | Status submit_func(std::function<void()> f); |
202 | | |
203 | | // Waits until all the tasks are completed. |
204 | | void wait(); |
205 | | |
206 | | // Waits for the pool to reach the idle state, or until 'delta' time elapses. |
207 | | // Returns true if the pool reached the idle state, false otherwise. |
208 | | template <class Rep, class Period> |
209 | | bool wait_for(const std::chrono::duration<Rep, Period>& delta) { |
210 | | std::unique_lock<std::mutex> l(_lock); |
211 | | check_not_pool_thread_unlocked(); |
212 | | return _idle_cond.wait_for( |
213 | | l, delta, [&]() { return _total_queued_tasks <= 0 && _active_threads <= 0; }); |
214 | | } |
215 | | Status set_min_threads(int min_threads); |
216 | | Status set_max_threads(int max_threads); |
217 | | |
218 | | // Allocates a new token for use in token-based task submission. All tokens |
219 | | // must be destroyed before their ThreadPool is destroyed. |
220 | | // |
221 | | // There is no limit on the number of tokens that may be allocated. |
222 | | enum class ExecutionMode { |
223 | | // Tasks submitted via this token will be executed serially. |
224 | | SERIAL, |
225 | | |
226 | | // Tasks submitted via this token may be executed concurrently. |
227 | | CONCURRENT |
228 | | }; |
229 | | std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int max_concurrency = INT_MAX); |
230 | | |
231 | | // Return the number of threads currently running (or in the process of starting up) |
232 | | // for this thread pool. |
233 | 24 | int num_threads() const { |
234 | 24 | std::lock_guard<std::mutex> l(_lock); |
235 | 24 | return _num_threads + _num_threads_pending_start; |
236 | 24 | } |
237 | | |
238 | 13 | int max_threads() const { |
239 | 13 | std::lock_guard<std::mutex> l(_lock); |
240 | 13 | return _max_threads; |
241 | 13 | } |
242 | | |
243 | 4 | int min_threads() const { |
244 | 4 | std::lock_guard<std::mutex> l(_lock); |
245 | 4 | return _min_threads; |
246 | 4 | } |
247 | | |
248 | 0 | int num_threads_pending_start() const { |
249 | 0 | std::lock_guard<std::mutex> l(_lock); |
250 | 0 | return _num_threads_pending_start; |
251 | 0 | } |
252 | | |
253 | 0 | int num_active_threads() const { |
254 | 0 | std::lock_guard<std::mutex> l(_lock); |
255 | 0 | return _active_threads; |
256 | 0 | } |
257 | | |
258 | 0 | int get_queue_size() const { |
259 | 0 | std::lock_guard<std::mutex> l(_lock); |
260 | 0 | return _total_queued_tasks; |
261 | 0 | } |
262 | | |
263 | 0 | int get_max_queue_size() const { |
264 | 0 | std::lock_guard<std::mutex> l(_lock); |
265 | 0 | return _max_queue_size; |
266 | 0 | } |
267 | | |
268 | 0 | std::vector<int> debug_info() const { |
269 | 0 | std::lock_guard<std::mutex> l(_lock); |
270 | 0 | std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads, |
271 | 0 | _max_threads}; |
272 | 0 | return arr; |
273 | 0 | } |
274 | | |
275 | 0 | std::string get_info() const { |
276 | 0 | std::lock_guard<std::mutex> l(_lock); |
277 | 0 | return fmt::format("ThreadPool(name={}, threads(active/pending)=({}/{}), queued_task={})", |
278 | 0 | _name, _active_threads, _num_threads_pending_start, _total_queued_tasks); |
279 | 0 | } |
280 | | |
281 | | ThreadPool(const ThreadPool&) = delete; |
282 | | void operator=(const ThreadPool&) = delete; |
283 | | |
284 | | private: |
285 | | friend class ThreadPoolBuilder; |
286 | | friend class ThreadPoolToken; |
287 | | |
288 | | // Client-provided task to be executed by this pool. |
289 | | struct Task { |
290 | | std::shared_ptr<Runnable> runnable; |
291 | | |
292 | | // Time at which the entry was submitted to the pool. |
293 | | MonotonicStopWatch submit_time_wather; |
294 | | }; |
295 | | |
296 | | // Creates a new thread pool using a builder. |
297 | | explicit ThreadPool(const ThreadPoolBuilder& builder); |
298 | | |
299 | | // Initializes the thread pool by starting the minimum number of threads. |
300 | | Status init(); |
301 | | |
302 | | // Dispatcher responsible for dequeueing and executing the tasks |
303 | | void dispatch_thread(); |
304 | | |
305 | | // Create new thread. |
306 | | // |
307 | | // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call. |
308 | | // NOTE: For performance reasons, _lock should not be held. |
309 | | Status create_thread(); |
310 | | |
311 | | // Aborts if the current thread is a member of this thread pool. |
312 | | void check_not_pool_thread_unlocked(); |
313 | | |
314 | | // Submits a task to be run via token. |
315 | | Status do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token); |
316 | | |
317 | | // Releases token 't' and invalidates it. |
318 | | void release_token(ThreadPoolToken* t); |
319 | | |
320 | | //NOTE: not thread safe, caller should keep it thread-safe by using lock |
321 | | Status try_create_thread(int thread_num, std::lock_guard<std::mutex>&); |
322 | | |
323 | | const std::string _name; |
324 | | const std::string _workload_group; |
325 | | int _min_threads; |
326 | | int _max_threads; |
327 | | const int _max_queue_size; |
328 | | const std::chrono::milliseconds _idle_timeout; |
329 | | |
330 | | // Overall status of the pool. Set to an error when the pool is shut down. |
331 | | // |
332 | | // Protected by '_lock'. |
333 | | Status _pool_status; |
334 | | |
335 | | // Synchronizes many of the members of the pool and all of its |
336 | | // condition variables. |
337 | | mutable std::mutex _lock; |
338 | | |
339 | | // Condition variable for "pool is idling". Waiters wake up when |
340 | | // _active_threads reaches zero. |
341 | | std::condition_variable _idle_cond; |
342 | | |
343 | | // Condition variable for "pool has no threads". Waiters wake up when |
344 | | // _num_threads and num_pending_threads_ are both 0. |
345 | | std::condition_variable _no_threads_cond; |
346 | | |
347 | | // Number of threads currently running. |
348 | | // |
349 | | // Protected by _lock. |
350 | | int _num_threads; |
351 | | |
352 | | // Number of threads which are in the process of starting. |
353 | | // When these threads start, they will decrement this counter and |
354 | | // accordingly increment '_num_threads'. |
355 | | // |
356 | | // Protected by _lock. |
357 | | int _num_threads_pending_start; |
358 | | |
359 | | // Number of threads currently running and executing client tasks. |
360 | | // |
361 | | // Protected by _lock. |
362 | | int _active_threads; |
363 | | |
364 | | // Total number of client tasks queued, either directly (_queue) or |
365 | | // indirectly (_tokens). |
366 | | // |
367 | | // Protected by _lock. |
368 | | int _total_queued_tasks; |
369 | | |
370 | | std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; |
371 | | |
372 | | // All allocated tokens. |
373 | | // |
374 | | // Protected by _lock. |
375 | | std::unordered_set<ThreadPoolToken*> _tokens; |
376 | | |
377 | | // FIFO of tokens from which tasks should be executed. Does not own the |
378 | | // tokens; they are owned by clients and are removed from the FIFO on shutdown. |
379 | | // |
380 | | // Protected by _lock. |
381 | | std::deque<ThreadPoolToken*> _queue; |
382 | | |
383 | | // Pointers to all running threads. Raw pointers are safe because a Thread |
384 | | // may only go out of scope after being removed from _threads. |
385 | | // |
386 | | // Protected by _lock. |
387 | | std::unordered_set<Thread*> _threads; |
388 | | |
389 | | // List of all threads currently waiting for work. |
390 | | // |
391 | | // A thread is added to the front of the list when it goes idle and is |
392 | | // removed from the front and signaled when new work arrives. This produces a |
393 | | // LIFO usage pattern that is more efficient than idling on a single |
394 | | // |
395 | | // Protected by _lock. |
396 | | struct IdleThread : public boost::intrusive::list_base_hook<> { |
397 | 1.60k | explicit IdleThread() = default; |
398 | | |
399 | | // Condition variable for "queue is not empty". Waiters wake up when a new |
400 | | // task is queued. |
401 | | std::condition_variable not_empty; |
402 | | IdleThread(const IdleThread&) = delete; |
403 | | void operator=(const IdleThread&) = delete; |
404 | | }; |
405 | | boost::intrusive::list<IdleThread> _idle_threads; // NOLINT(build/include_what_you_use) |
406 | | |
407 | | // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. |
408 | | std::unique_ptr<ThreadPoolToken> _tokenless; |
409 | | const UniqueId _id; |
410 | | |
411 | | std::shared_ptr<MetricEntity> _metric_entity; |
412 | | IntGauge* thread_pool_active_threads = nullptr; |
413 | | IntGauge* thread_pool_queue_size = nullptr; |
414 | | IntGauge* thread_pool_max_queue_size = nullptr; |
415 | | IntGauge* thread_pool_max_threads = nullptr; |
416 | | IntCounter* thread_pool_task_execution_time_ns_total = nullptr; |
417 | | IntCounter* thread_pool_task_execution_count_total = nullptr; |
418 | | IntCounter* thread_pool_task_wait_worker_time_ns_total = nullptr; |
419 | | IntCounter* thread_pool_task_wait_worker_count_total = nullptr; |
420 | | |
421 | | IntCounter* thread_pool_submit_failed = nullptr; |
422 | | }; |
423 | | |
424 | | // Entry point for token-based task submission and blocking for a particular |
425 | | // thread pool. Tokens can only be created via ThreadPool::new_token(). |
426 | | // |
427 | | // All functions are thread-safe. Mutable members are protected via the |
428 | | // ThreadPool's lock. |
429 | | class ThreadPoolToken { |
430 | | public: |
431 | | // Destroys the token. |
432 | | // |
433 | | // May be called on a token with outstanding tasks, as Shutdown() will be |
434 | | // called first to take care of them. |
435 | | ~ThreadPoolToken(); |
436 | | |
437 | | // Submits a Runnable class. |
438 | | Status submit(std::shared_ptr<Runnable> r); |
439 | | |
440 | | // Submits a function bound using std::bind(&FuncName, args...). |
441 | | Status submit_func(std::function<void()> f); |
442 | | |
443 | | // Marks the token as unusable for future submissions. Any queued tasks not |
444 | | // yet running are destroyed. If tasks are in flight, Shutdown() will wait |
445 | | // on their completion before returning. |
446 | | void shutdown(); |
447 | | |
448 | | // Waits until all the tasks submitted via this token are completed. |
449 | | void wait(); |
450 | | |
451 | | // Waits for all submissions using this token are complete, or until 'delta' |
452 | | // time elapses. |
453 | | // |
454 | | // Returns true if all submissions are complete, false otherwise. |
455 | | template <class Rep, class Period> |
456 | | bool wait_for(const std::chrono::duration<Rep, Period>& delta) { |
457 | | std::unique_lock<std::mutex> l(_pool->_lock); |
458 | | _pool->check_not_pool_thread_unlocked(); |
459 | | return _not_running_cond.wait_for(l, delta, [&]() { return !is_active(); }); |
460 | | } |
461 | | |
462 | | bool need_dispatch(); |
463 | | |
464 | 34 | size_t num_tasks() { |
465 | 34 | std::lock_guard<std::mutex> l(_pool->_lock); |
466 | 34 | return _entries.size(); |
467 | 34 | } |
468 | | |
469 | | ThreadPoolToken(const ThreadPoolToken&) = delete; |
470 | | void operator=(const ThreadPoolToken&) = delete; |
471 | | |
472 | | private: |
473 | | // All possible token states. Legal state transitions: |
474 | | // IDLE -> RUNNING: task is submitted via token |
475 | | // IDLE -> QUIESCED: token or pool is shut down |
476 | | // RUNNING -> IDLE: worker thread finishes executing a task and |
477 | | // there are no more tasks queued to the token |
478 | | // RUNNING -> QUIESCING: token or pool is shut down while worker thread |
479 | | // is executing a task |
480 | | // RUNNING -> QUIESCED: token or pool is shut down |
481 | | // QUIESCING -> QUIESCED: worker thread finishes executing a task |
482 | | // belonging to a shut down token or pool |
483 | | enum class State { |
484 | | // Token has no queued tasks. |
485 | | IDLE, |
486 | | |
487 | | // A worker thread is running one of the token's previously queued tasks. |
488 | | RUNNING, |
489 | | |
490 | | // No new tasks may be submitted to the token. A worker thread is still |
491 | | // running a previously queued task. |
492 | | QUIESCING, |
493 | | |
494 | | // No new tasks may be submitted to the token. There are no active tasks |
495 | | // either. At this state, the token may only be destroyed. |
496 | | QUIESCED, |
497 | | }; |
498 | | |
499 | | // Writes a textual representation of the token state in 's' to 'o'. |
500 | | friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s); |
501 | | |
502 | | friend class ThreadPool; |
503 | | |
504 | | // Returns a textual representation of 's' suitable for debugging. |
505 | | static const char* state_to_string(State s); |
506 | | |
507 | | // Constructs a new token. |
508 | | // |
509 | | // The token may not outlive its thread pool ('pool'). |
510 | | ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, |
511 | | int max_concurrency = INT_MAX); |
512 | | |
513 | | // Changes this token's state to 'new_state' taking actions as needed. |
514 | | void transition(State new_state); |
515 | | |
516 | | // Returns true if this token has a task queued and ready to run, or if a |
517 | | // task belonging to this token is already running. |
518 | 11.5k | bool is_active() const { return _state == State::RUNNING || _state == State::QUIESCING; } |
519 | | |
520 | | // Returns true if new tasks may be submitted to this token. |
521 | 11.9k | bool may_submit_new_tasks() const { |
522 | 11.9k | return _state != State::QUIESCING && _state != State::QUIESCED; |
523 | 11.9k | } |
524 | | |
525 | 25.8k | State state() const { return _state; } |
526 | 5.06k | ThreadPool::ExecutionMode mode() const { return _mode; } |
527 | | |
528 | | // Token's configured execution mode. |
529 | | ThreadPool::ExecutionMode _mode; |
530 | | |
531 | | // Pointer to the token's thread pool. |
532 | | ThreadPool* _pool = nullptr; |
533 | | |
534 | | // Token state machine. |
535 | | State _state; |
536 | | |
537 | | // Queued client tasks. |
538 | | std::deque<ThreadPool::Task> _entries; |
539 | | |
540 | | // Condition variable for "token is idle". Waiters wake up when the token |
541 | | // transitions to IDLE or QUIESCED. |
542 | | std::condition_variable _not_running_cond; |
543 | | |
544 | | // Number of worker threads currently executing tasks belonging to this |
545 | | // token. |
546 | | int _active_threads; |
547 | | // The max number of tasks that can be ran concurrenlty. This is to limit |
548 | | // the concurrency of a thread pool token, and default is INT_MAX(no limited) |
549 | | int _max_concurrency; |
550 | | // Number of tasks which has been submitted to the thread pool's queue. |
551 | | int _num_submitted_tasks; |
552 | | // Number of tasks which has not been submitted to the thread pool's queue. |
553 | | int _num_unsubmitted_tasks; |
554 | | }; |
555 | | |
556 | | } // namespace doris |