Coverage Report

Created: 2026-06-24 17:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/threadpool.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.h
19
// and modified by Doris
20
21
#pragma once
22
23
#include <gen_cpp/Types_types.h>
24
25
#include <boost/intrusive/detail/algo_type.hpp>
26
#include <boost/intrusive/list.hpp>
27
#include <boost/intrusive/list_hook.hpp>
28
#include <climits>
29
#include <cstddef>
30
// IWYU pragma: no_include <bits/chrono.h>
31
#include <chrono> // IWYU pragma: keep
32
#include <condition_variable>
33
#include <deque>
34
#include <functional>
35
#include <iosfwd>
36
#include <memory>
37
#include <mutex>
38
#include <string>
39
#include <unordered_set>
40
41
#include "agent/cgroup_cpu_ctl.h"
42
#include "common/metrics/metrics.h"
43
#include "common/status.h"
44
#include "util/uid_util.h"
45
#include "util/work_thread_pool.hpp"
46
47
namespace doris {
48
49
class Thread;
50
class ThreadPool;
51
class ThreadPoolToken;
52
53
class Runnable {
54
public:
55
    virtual void run() = 0;
56
1.47M
    virtual ~Runnable() = default;
57
};
58
59
// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
60
//
61
// name: Used for debugging output and default names of the worker threads.
62
//    Since thread names are limited to 16 characters on Linux, it's good to
63
//    choose a short name here.
64
//    Required.
65
//
66
// trace_metric_prefix: used to prefix the names of TraceMetric counters.
67
//    When a task on a thread pool has an associated trace, the thread pool
68
//    implementation will increment TraceMetric counters to indicate the
69
//    amount of time spent waiting in the queue as well as the amount of wall
70
//    and CPU time spent executing. By default, these counters are prefixed
71
//    with the name of the thread pool. For example, if the pool is named
72
//    'apply', then counters such as 'apply.queue_time_us' will be
73
//    incremented.
74
//
75
//    The TraceMetrics implementation relies on the number of distinct counter
76
//    names being small. Thus, if the thread pool name itself is dynamically
77
//    generated, the default behavior described above would result in an
78
//    unbounded number of distinct counter names. The 'trace_metric_prefix'
79
//    setting can be used to override the prefix used in generating the trace
80
//    metric names.
81
//
82
//    For example, the Raft thread pools are named "<tablet id>-raft" which
83
//    has unbounded cardinality (a server may have thousands of different
84
//    tablet IDs over its lifetime). In that case, setting the prefix to
85
//    "raft" will avoid any issues.
86
//
87
// min_threads: Minimum number of threads we'll have at any time.
88
//    Default: 0.
89
//
90
// max_threads: Maximum number of threads we'll have at any time.
91
//    Default: Number of CPUs detected on the system.
92
//
93
// max_queue_size: Maximum number of items to enqueue before returning a
94
//    Status::ServiceUnavailable message from Submit().
95
//    Default: INT_MAX.
96
//
97
// idle_timeout: How long we'll keep around an idle thread before timing it out.
98
//    We always keep at least min_threads.
99
//    Default: 500 milliseconds.
100
//
101
// metrics: Histograms, counters, etc. to update on various threadpool events.
102
//    Default: not set.
103
//
104
class ThreadPoolBuilder {
105
public:
106
    explicit ThreadPoolBuilder(std::string name, std::string workload_group = "");
107
108
    // Note: We violate the style guide by returning mutable references here
109
    // in order to provide traditional Builder pattern conveniences.
110
    ThreadPoolBuilder& set_min_threads(int min_threads);
111
    ThreadPoolBuilder& set_max_threads(int max_threads);
112
    ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
113
    ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl);
114
    template <class Rep, class Period>
115
    ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
116
        _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
117
        return *this;
118
    }
119
    // Instantiate a new ThreadPool with the existing builder arguments.
120
    template <typename ThreadPoolType>
121
903
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
122
903
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
123
857
            pool->reset(new ThreadPoolType(*this));
124
857
            RETURN_IF_ERROR((*pool)->init());
125
857
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
126
46
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
127
        } else {
128
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
129
        }
130
857
        return Status::OK();
131
903
    }
_ZNK5doris17ThreadPoolBuilder5buildINS_10ThreadPoolEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS5_EE
Line
Count
Source
121
857
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
122
857
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
123
857
            pool->reset(new ThreadPoolType(*this));
124
857
            RETURN_IF_ERROR((*pool)->init());
125
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
126
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
127
        } else {
128
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
129
        }
130
857
        return Status::OK();
131
857
    }
_ZNK5doris17ThreadPoolBuilder5buildINS_14WorkThreadPoolILb1EEEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS6_EE
Line
Count
Source
121
46
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
122
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
123
            pool->reset(new ThreadPoolType(*this));
124
            RETURN_IF_ERROR((*pool)->init());
125
46
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
126
46
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
127
        } else {
128
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
129
        }
130
46
        return Status::OK();
131
46
    }
132
133
    ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
134
    void operator=(const ThreadPoolBuilder&) = delete;
135
136
private:
137
    friend class ThreadPool;
138
    const std::string _name;
139
    const std::string _workload_group;
140
    int _min_threads;
141
    int _max_threads;
142
    int _max_queue_size;
143
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
144
    std::chrono::milliseconds _idle_timeout;
145
146
    template <typename T>
147
    static constexpr bool always_false_v = false;
148
};
149
150
// Thread pool with a variable number of threads.
151
//
152
// Tasks submitted directly to the thread pool enter a FIFO queue and are
153
// dispatched to a worker thread when one becomes free. Tasks may also be
154
// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
155
// can then be used to block on logical groups of tasks.
156
//
157
// A token operates in one of two ExecutionModes, determined at token
158
// construction time:
159
// 1. SERIAL: submitted tasks are run one at a time.
160
// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
161
//    tasks submitted without a token, but the logical grouping that tokens
162
//    impart can be useful when a pool is shared by many contexts (e.g. to
163
//    safely shut down one context, to derive context-specific metrics, etc.).
164
//
165
// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
166
// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
167
// processed in a round-robin fashion, one task at a time. This prevents them
168
// from starving one another. However, tokenless (and CONCURRENT token-based)
169
// tasks can starve SERIAL token-based tasks.
170
//
171
// Usage Example:
172
//    static void Func(int n) { ... }
173
//    class Task : public Runnable { ... }
174
//
175
//    std::unique_ptr<ThreadPool> thread_pool;
176
//    CHECK_OK(
177
//        ThreadPoolBuilder("my_pool")
178
//            .set_min_threads(0)
179
//            .set_max_threads(5)
180
//            .set_max_queue_size(10)
181
//            .set_idle_timeout(2000ms))
182
//            .Build(&thread_pool));
183
//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
184
//    thread_pool->SubmitFunc(std::bind(&Func, 10));
185
class ThreadPool {
186
public:
187
    ~ThreadPool();
188
189
    // Wait for the running tasks to complete and then shutdown the threads.
190
    // All the other pending tasks in the queue will be removed.
191
    // NOTE: That the user may implement an external abort logic for the
192
    //       runnables, that must be called before Shutdown(), if the system
193
    //       should know about the non-execution of these tasks, or the runnable
194
    //       require an explicit "abort" notification to exit from the run loop.
195
    void shutdown();
196
197
    // Submits a Runnable class.
198
    Status submit(std::shared_ptr<Runnable> r);
199
200
    // Submits a function bound using std::bind(&FuncName, args...).
201
    Status submit_func(std::function<void()> f);
202
203
    // Waits until all the tasks are completed.
204
    void wait();
205
206
    // Waits for the pool to reach the idle state, or until 'delta' time elapses.
207
    // Returns true if the pool reached the idle state, false otherwise.
208
    template <class Rep, class Period>
209
    bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
210
        std::unique_lock<std::mutex> l(_lock);
211
        check_not_pool_thread_unlocked();
212
        return _idle_cond.wait_for(
213
                l, delta, [&]() { return _total_queued_tasks <= 0 && _active_threads <= 0; });
214
    }
215
    Status set_min_threads(int min_threads);
216
    Status set_max_threads(int max_threads);
217
218
    // Allocates a new token for use in token-based task submission. All tokens
219
    // must be destroyed before their ThreadPool is destroyed.
220
    //
221
    // There is no limit on the number of tokens that may be allocated.
222
    enum class ExecutionMode {
223
        // Tasks submitted via this token will be executed serially.
224
        SERIAL,
225
226
        // Tasks submitted via this token may be executed concurrently.
227
        CONCURRENT
228
    };
229
    std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int max_concurrency = INT_MAX);
230
231
    // Return the number of threads currently running (or in the process of starting up)
232
    // for this thread pool.
233
25
    int num_threads() const {
234
25
        std::lock_guard<std::mutex> l(_lock);
235
25
        return _num_threads + _num_threads_pending_start;
236
25
    }
237
238
374k
    int max_threads() const {
239
374k
        std::lock_guard<std::mutex> l(_lock);
240
374k
        return _max_threads;
241
374k
    }
242
243
317k
    int min_threads() const {
244
317k
        std::lock_guard<std::mutex> l(_lock);
245
317k
        return _min_threads;
246
317k
    }
247
248
1
    int num_threads_pending_start() const {
249
1
        std::lock_guard<std::mutex> l(_lock);
250
1
        return _num_threads_pending_start;
251
1
    }
252
253
49.7k
    int num_active_threads() const {
254
49.7k
        std::lock_guard<std::mutex> l(_lock);
255
49.7k
        return _active_threads;
256
49.7k
    }
257
258
102k
    int get_queue_size() const {
259
102k
        std::lock_guard<std::mutex> l(_lock);
260
102k
        return _total_queued_tasks;
261
102k
    }
262
263
49.7k
    int get_max_queue_size() const {
264
49.7k
        std::lock_guard<std::mutex> l(_lock);
265
49.7k
        return _max_queue_size;
266
49.7k
    }
267
268
225
    std::vector<int> debug_info() const {
269
225
        std::lock_guard<std::mutex> l(_lock);
270
225
        std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads,
271
225
                                _max_threads};
272
225
        return arr;
273
225
    }
274
275
0
    std::string get_info() const {
276
0
        std::lock_guard<std::mutex> l(_lock);
277
0
        return fmt::format("ThreadPool(name={}, threads(active/pending)=({}/{}), queued_task={})",
278
0
                           _name, _active_threads, _num_threads_pending_start, _total_queued_tasks);
279
0
    }
280
281
    ThreadPool(const ThreadPool&) = delete;
282
    void operator=(const ThreadPool&) = delete;
283
284
private:
285
    friend class ThreadPoolBuilder;
286
    friend class ThreadPoolToken;
287
288
    // Client-provided task to be executed by this pool.
289
    struct Task {
290
        std::shared_ptr<Runnable> runnable;
291
292
        // Time at which the entry was submitted to the pool.
293
        MonotonicStopWatch submit_time_wather;
294
    };
295
296
    // Creates a new thread pool using a builder.
297
    explicit ThreadPool(const ThreadPoolBuilder& builder);
298
299
    // Initializes the thread pool by starting the minimum number of threads.
300
    Status init();
301
302
    // Dispatcher responsible for dequeueing and executing the tasks
303
    void dispatch_thread();
304
305
    // Create new thread.
306
    //
307
    // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call.
308
    // NOTE: For performance reasons, _lock should usually not be held.
309
    // do_submit() may hold it when creating the first worker, so a thread creation failure
310
    // cannot leave a submitted task queued without any worker.
311
    Status create_thread();
312
313
    // Aborts if the current thread is a member of this thread pool.
314
    void check_not_pool_thread_unlocked();
315
316
    // Submits a task to be run via token.
317
    Status do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
318
319
    // Releases token 't' and invalidates it.
320
    void release_token(ThreadPoolToken* t);
321
322
    //NOTE: not thread safe, caller should keep it thread-safe by using lock
323
    Status try_create_thread(int thread_num, std::lock_guard<std::mutex>&);
324
325
    const std::string _name;
326
    const std::string _workload_group;
327
    int _min_threads;
328
    int _max_threads;
329
    const int _max_queue_size;
330
    const std::chrono::milliseconds _idle_timeout;
331
332
    // Overall status of the pool. Set to an error when the pool is shut down.
333
    //
334
    // Protected by '_lock'.
335
    Status _pool_status;
336
337
    // Synchronizes many of the members of the pool and all of its
338
    // condition variables.
339
    mutable std::mutex _lock;
340
341
    // Condition variable for "pool is idling". Waiters wake up when
342
    // _active_threads reaches zero.
343
    std::condition_variable _idle_cond;
344
345
    // Condition variable for "pool has no threads". Waiters wake up when
346
    // _num_threads and num_pending_threads_ are both 0.
347
    std::condition_variable _no_threads_cond;
348
349
    // Number of threads currently running.
350
    //
351
    // Protected by _lock.
352
    int _num_threads;
353
354
    // Number of threads which are in the process of starting.
355
    // When these threads start, they will decrement this counter and
356
    // accordingly increment '_num_threads'.
357
    //
358
    // Protected by _lock.
359
    int _num_threads_pending_start;
360
361
    // Number of threads currently running and executing client tasks.
362
    //
363
    // Protected by _lock.
364
    int _active_threads;
365
366
    // Total number of client tasks queued, either directly (_queue) or
367
    // indirectly (_tokens).
368
    //
369
    // Protected by _lock.
370
    int _total_queued_tasks;
371
372
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
373
374
    // All allocated tokens.
375
    //
376
    // Protected by _lock.
377
    std::unordered_set<ThreadPoolToken*> _tokens;
378
379
    // FIFO of tokens from which tasks should be executed. Does not own the
380
    // tokens; they are owned by clients and are removed from the FIFO on shutdown.
381
    //
382
    // Protected by _lock.
383
    std::deque<ThreadPoolToken*> _queue;
384
385
    // Pointers to all running threads. Raw pointers are safe because a Thread
386
    // may only go out of scope after being removed from _threads.
387
    //
388
    // Protected by _lock.
389
    std::unordered_set<Thread*> _threads;
390
391
    // List of all threads currently waiting for work.
392
    //
393
    // A thread is added to the front of the list when it goes idle and is
394
    // removed from the front and signaled when new work arrives. This produces a
395
    // LIFO usage pattern that is more efficient than idling on a single
396
    //
397
    // Protected by _lock.
398
    struct IdleThread : public boost::intrusive::list_base_hook<> {
399
48.1k
        explicit IdleThread() = default;
400
401
        // Condition variable for "queue is not empty". Waiters wake up when a new
402
        // task is queued.
403
        std::condition_variable not_empty;
404
        IdleThread(const IdleThread&) = delete;
405
        void operator=(const IdleThread&) = delete;
406
    };
407
    boost::intrusive::list<IdleThread> _idle_threads; // NOLINT(build/include_what_you_use)
408
409
    // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
410
    std::unique_ptr<ThreadPoolToken> _tokenless;
411
    const UniqueId _id;
412
413
    std::shared_ptr<MetricEntity> _metric_entity;
414
    IntGauge* thread_pool_active_threads = nullptr;
415
    IntGauge* thread_pool_queue_size = nullptr;
416
    IntGauge* thread_pool_max_queue_size = nullptr;
417
    IntGauge* thread_pool_max_threads = nullptr;
418
    IntCounter* thread_pool_task_execution_time_ns_total = nullptr;
419
    IntCounter* thread_pool_task_execution_count_total = nullptr;
420
    IntCounter* thread_pool_task_wait_worker_time_ns_total = nullptr;
421
    IntCounter* thread_pool_task_wait_worker_count_total = nullptr;
422
423
    IntCounter* thread_pool_submit_failed = nullptr;
424
};
425
426
// Entry point for token-based task submission and blocking for a particular
427
// thread pool. Tokens can only be created via ThreadPool::new_token().
428
//
429
// All functions are thread-safe. Mutable members are protected via the
430
// ThreadPool's lock.
431
class ThreadPoolToken {
432
public:
433
    // Destroys the token.
434
    //
435
    // May be called on a token with outstanding tasks, as Shutdown() will be
436
    // called first to take care of them.
437
    ~ThreadPoolToken();
438
439
    // Submits a Runnable class.
440
    Status submit(std::shared_ptr<Runnable> r);
441
442
    // Submits a function bound using std::bind(&FuncName, args...).
443
    Status submit_func(std::function<void()> f);
444
445
    // Marks the token as unusable for future submissions. Any queued tasks not
446
    // yet running are destroyed. If tasks are in flight, Shutdown() will wait
447
    // on their completion before returning.
448
    void shutdown();
449
450
    // Waits until all the tasks submitted via this token are completed.
451
    void wait();
452
453
    // Waits for all submissions using this token are complete, or until 'delta'
454
    // time elapses.
455
    //
456
    // Returns true if all submissions are complete, false otherwise.
457
    template <class Rep, class Period>
458
    bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
459
        std::unique_lock<std::mutex> l(_pool->_lock);
460
        _pool->check_not_pool_thread_unlocked();
461
        return _not_running_cond.wait_for(l, delta, [&]() { return !is_active(); });
462
    }
463
464
    bool need_dispatch();
465
466
157k
    size_t num_tasks() {
467
157k
        std::lock_guard<std::mutex> l(_pool->_lock);
468
157k
        return _entries.size();
469
157k
    }
470
471
    ThreadPoolToken(const ThreadPoolToken&) = delete;
472
    void operator=(const ThreadPoolToken&) = delete;
473
474
private:
475
    // All possible token states. Legal state transitions:
476
    //   IDLE      -> RUNNING: task is submitted via token
477
    //   IDLE      -> QUIESCED: token or pool is shut down
478
    //   RUNNING   -> IDLE: worker thread finishes executing a task and
479
    //                      there are no more tasks queued to the token
480
    //   RUNNING   -> QUIESCING: token or pool is shut down while worker thread
481
    //                           is executing a task
482
    //   RUNNING   -> QUIESCED: token or pool is shut down
483
    //   QUIESCING -> QUIESCED:  worker thread finishes executing a task
484
    //                           belonging to a shut down token or pool
485
    enum class State {
486
        // Token has no queued tasks.
487
        IDLE,
488
489
        // A worker thread is running one of the token's previously queued tasks.
490
        RUNNING,
491
492
        // No new tasks may be submitted to the token. A worker thread is still
493
        // running a previously queued task.
494
        QUIESCING,
495
496
        // No new tasks may be submitted to the token. There are no active tasks
497
        // either. At this state, the token may only be destroyed.
498
        QUIESCED,
499
    };
500
501
    // Writes a textual representation of the token state in 's' to 'o'.
502
    friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s);
503
504
    friend class ThreadPool;
505
506
    // Returns a textual representation of 's' suitable for debugging.
507
    static const char* state_to_string(State s);
508
509
    // Constructs a new token.
510
    //
511
    // The token may not outlive its thread pool ('pool').
512
    ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
513
                    int max_concurrency = INT_MAX);
514
515
    // Changes this token's state to 'new_state' taking actions as needed.
516
    void transition(State new_state);
517
518
    // Returns true if this token has a task queued and ready to run, or if a
519
    // task belonging to this token is already running.
520
2.14M
    bool is_active() const { return _state == State::RUNNING || _state == State::QUIESCING; }
521
522
    // Returns true if new tasks may be submitted to this token.
523
1.49M
    bool may_submit_new_tasks() const {
524
1.49M
        return _state != State::QUIESCING && _state != State::QUIESCED;
525
1.49M
    }
526
527
5.19M
    State state() const { return _state; }
528
760k
    ThreadPool::ExecutionMode mode() const { return _mode; }
529
530
    // Token's configured execution mode.
531
    ThreadPool::ExecutionMode _mode;
532
533
    // Pointer to the token's thread pool.
534
    ThreadPool* _pool = nullptr;
535
536
    // Token state machine.
537
    State _state;
538
539
    // Queued client tasks.
540
    std::deque<ThreadPool::Task> _entries;
541
542
    // Condition variable for "token is idle". Waiters wake up when the token
543
    // transitions to IDLE or QUIESCED.
544
    std::condition_variable _not_running_cond;
545
546
    // Number of worker threads currently executing tasks belonging to this
547
    // token.
548
    int _active_threads;
549
    // The max number of tasks that can be ran concurrenlty. This is to limit
550
    // the concurrency of a thread pool token, and default is INT_MAX(no limited)
551
    int _max_concurrency;
552
    // Number of tasks which has been submitted to the thread pool's queue.
553
    int _num_submitted_tasks;
554
    // Number of tasks which has not been submitted to the thread pool's queue.
555
    int _num_unsubmitted_tasks;
556
};
557
558
} // namespace doris