Coverage Report

Created: 2025-04-23 18:22

/root/doris/be/src/util/threadpool.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.h
19
// and modified by Doris
20
21
#pragma once
22
23
#include <gen_cpp/Types_types.h>
24
25
#include <boost/intrusive/detail/algo_type.hpp>
26
#include <boost/intrusive/list.hpp>
27
#include <boost/intrusive/list_hook.hpp>
28
#include <climits>
29
#include <cstddef>
30
// IWYU pragma: no_include <bits/chrono.h>
31
#include <chrono> // IWYU pragma: keep
32
#include <condition_variable>
33
#include <deque>
34
#include <functional>
35
#include <iosfwd>
36
#include <memory>
37
#include <mutex>
38
#include <string>
39
#include <unordered_set>
40
41
#include "agent/cgroup_cpu_ctl.h"
42
#include "common/status.h"
43
#include "util/metrics.h"
44
#include "util/uid_util.h"
45
#include "util/work_thread_pool.hpp"
46
47
namespace doris {
48
49
class Thread;
50
class ThreadPool;
51
class ThreadPoolToken;
52
53
class Runnable {
54
public:
55
    virtual void run() = 0;
56
11.8k
    virtual ~Runnable() = default;
57
};
58
59
// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
60
//
61
// name: Used for debugging output and default names of the worker threads.
62
//    Since thread names are limited to 16 characters on Linux, it's good to
63
//    choose a short name here.
64
//    Required.
65
//
66
// trace_metric_prefix: used to prefix the names of TraceMetric counters.
67
//    When a task on a thread pool has an associated trace, the thread pool
68
//    implementation will increment TraceMetric counters to indicate the
69
//    amount of time spent waiting in the queue as well as the amount of wall
70
//    and CPU time spent executing. By default, these counters are prefixed
71
//    with the name of the thread pool. For example, if the pool is named
72
//    'apply', then counters such as 'apply.queue_time_us' will be
73
//    incremented.
74
//
75
//    The TraceMetrics implementation relies on the number of distinct counter
76
//    names being small. Thus, if the thread pool name itself is dynamically
77
//    generated, the default behavior described above would result in an
78
//    unbounded number of distinct counter names. The 'trace_metric_prefix'
79
//    setting can be used to override the prefix used in generating the trace
80
//    metric names.
81
//
82
//    For example, the Raft thread pools are named "<tablet id>-raft" which
83
//    has unbounded cardinality (a server may have thousands of different
84
//    tablet IDs over its lifetime). In that case, setting the prefix to
85
//    "raft" will avoid any issues.
86
//
87
// min_threads: Minimum number of threads we'll have at any time.
88
//    Default: 0.
89
//
90
// max_threads: Maximum number of threads we'll have at any time.
91
//    Default: Number of CPUs detected on the system.
92
//
93
// max_queue_size: Maximum number of items to enqueue before returning a
94
//    Status::ServiceUnavailable message from Submit().
95
//    Default: INT_MAX.
96
//
97
// idle_timeout: How long we'll keep around an idle thread before timing it out.
98
//    We always keep at least min_threads.
99
//    Default: 500 milliseconds.
100
//
101
// metrics: Histograms, counters, etc. to update on various threadpool events.
102
//    Default: not set.
103
//
104
class ThreadPoolBuilder {
105
public:
106
    explicit ThreadPoolBuilder(std::string name, std::string workload_group = "");
107
108
    // Note: We violate the style guide by returning mutable references here
109
    // in order to provide traditional Builder pattern conveniences.
110
    ThreadPoolBuilder& set_min_threads(int min_threads);
111
    ThreadPoolBuilder& set_max_threads(int max_threads);
112
    ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
113
    ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl);
114
    template <class Rep, class Period>
115
    ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
116
        _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
117
        return *this;
118
    }
119
    // Instantiate a new ThreadPool with the existing builder arguments.
120
    template <typename ThreadPoolType>
121
321
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
122
321
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
123
5
            pool->reset(new ThreadPoolType(*this));
124
316
            RETURN_IF_ERROR((*pool)->init());
125
316
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
126
5
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
127
5
        } else {
128
316
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
129
316
        }
130
316
        return Status::OK();
131
321
    }
_ZNK5doris17ThreadPoolBuilder5buildINS_10ThreadPoolEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS5_EE
Line
Count
Source
121
316
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
122
316
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
123
316
            pool->reset(new ThreadPoolType(*this));
124
316
            RETURN_IF_ERROR((*pool)->init());
125
316
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
126
316
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
127
316
        } else {
128
316
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
129
316
        }
130
316
        return Status::OK();
131
316
    }
_ZNK5doris17ThreadPoolBuilder5buildINS_14WorkThreadPoolILb1EEEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS6_EE
Line
Count
Source
121
5
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
122
5
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
123
5
            pool->reset(new ThreadPoolType(*this));
124
5
            RETURN_IF_ERROR((*pool)->init());
125
5
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
126
5
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
127
5
        } else {
128
5
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
129
5
        }
130
5
        return Status::OK();
131
5
    }
132
133
    ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
134
    void operator=(const ThreadPoolBuilder&) = delete;
135
136
private:
137
    friend class ThreadPool;
138
    const std::string _name;
139
    const std::string _workload_group;
140
    int _min_threads;
141
    int _max_threads;
142
    int _max_queue_size;
143
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
144
    std::chrono::milliseconds _idle_timeout;
145
146
    template <typename T>
147
    static constexpr bool always_false_v = false;
148
};
149
150
// Thread pool with a variable number of threads.
151
//
152
// Tasks submitted directly to the thread pool enter a FIFO queue and are
153
// dispatched to a worker thread when one becomes free. Tasks may also be
154
// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
155
// can then be used to block on logical groups of tasks.
156
//
157
// A token operates in one of two ExecutionModes, determined at token
158
// construction time:
159
// 1. SERIAL: submitted tasks are run one at a time.
160
// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
161
//    tasks submitted without a token, but the logical grouping that tokens
162
//    impart can be useful when a pool is shared by many contexts (e.g. to
163
//    safely shut down one context, to derive context-specific metrics, etc.).
164
//
165
// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
166
// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
167
// processed in a round-robin fashion, one task at a time. This prevents them
168
// from starving one another. However, tokenless (and CONCURRENT token-based)
169
// tasks can starve SERIAL token-based tasks.
170
//
171
// Usage Example:
172
//    static void Func(int n) { ... }
173
//    class Task : public Runnable { ... }
174
//
175
//    std::unique_ptr<ThreadPool> thread_pool;
176
//    CHECK_OK(
177
//        ThreadPoolBuilder("my_pool")
178
//            .set_min_threads(0)
179
//            .set_max_threads(5)
180
//            .set_max_queue_size(10)
181
//            .set_idle_timeout(2000ms))
182
//            .Build(&thread_pool));
183
//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
184
//    thread_pool->SubmitFunc(std::bind(&Func, 10));
185
class ThreadPool {
186
public:
187
    ~ThreadPool();
188
189
    // Wait for the running tasks to complete and then shutdown the threads.
190
    // All the other pending tasks in the queue will be removed.
191
    // NOTE: That the user may implement an external abort logic for the
192
    //       runnables, that must be called before Shutdown(), if the system
193
    //       should know about the non-execution of these tasks, or the runnable
194
    //       require an explicit "abort" notification to exit from the run loop.
195
    void shutdown();
196
197
    // Submits a Runnable class.
198
    Status submit(std::shared_ptr<Runnable> r);
199
200
    // Submits a function bound using std::bind(&FuncName, args...).
201
    Status submit_func(std::function<void()> f);
202
203
    // Waits until all the tasks are completed.
204
    void wait();
205
206
    // Waits for the pool to reach the idle state, or until 'delta' time elapses.
207
    // Returns true if the pool reached the idle state, false otherwise.
208
    template <class Rep, class Period>
209
    bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
210
        std::unique_lock<std::mutex> l(_lock);
211
        check_not_pool_thread_unlocked();
212
        return _idle_cond.wait_for(
213
                l, delta, [&]() { return _total_queued_tasks <= 0 && _active_threads <= 0; });
214
    }
215
    Status set_min_threads(int min_threads);
216
    Status set_max_threads(int max_threads);
217
218
    // Allocates a new token for use in token-based task submission. All tokens
219
    // must be destroyed before their ThreadPool is destroyed.
220
    //
221
    // There is no limit on the number of tokens that may be allocated.
222
    enum class ExecutionMode {
223
        // Tasks submitted via this token will be executed serially.
224
        SERIAL,
225
226
        // Tasks submitted via this token may be executed concurrently.
227
        CONCURRENT
228
    };
229
    std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int max_concurrency = INT_MAX);
230
231
    // Return the number of threads currently running (or in the process of starting up)
232
    // for this thread pool.
233
24
    int num_threads() const {
234
24
        std::lock_guard<std::mutex> l(_lock);
235
24
        return _num_threads + _num_threads_pending_start;
236
24
    }
237
238
13
    int max_threads() const {
239
13
        std::lock_guard<std::mutex> l(_lock);
240
13
        return _max_threads;
241
13
    }
242
243
4
    int min_threads() const {
244
4
        std::lock_guard<std::mutex> l(_lock);
245
4
        return _min_threads;
246
4
    }
247
248
0
    int num_threads_pending_start() const {
249
0
        std::lock_guard<std::mutex> l(_lock);
250
0
        return _num_threads_pending_start;
251
0
    }
252
253
0
    int num_active_threads() const {
254
0
        std::lock_guard<std::mutex> l(_lock);
255
0
        return _active_threads;
256
0
    }
257
258
0
    int get_queue_size() const {
259
0
        std::lock_guard<std::mutex> l(_lock);
260
0
        return _total_queued_tasks;
261
0
    }
262
263
0
    int get_max_queue_size() const {
264
0
        std::lock_guard<std::mutex> l(_lock);
265
0
        return _max_queue_size;
266
0
    }
267
268
0
    std::vector<int> debug_info() const {
269
0
        std::lock_guard<std::mutex> l(_lock);
270
0
        std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads,
271
0
                                _max_threads};
272
0
        return arr;
273
0
    }
274
275
0
    std::string get_info() const {
276
0
        std::lock_guard<std::mutex> l(_lock);
277
0
        return fmt::format("ThreadPool(name={}, threads(active/pending)=({}/{}), queued_task={})",
278
0
                           _name, _active_threads, _num_threads_pending_start, _total_queued_tasks);
279
0
    }
280
281
    ThreadPool(const ThreadPool&) = delete;
282
    void operator=(const ThreadPool&) = delete;
283
284
private:
285
    friend class ThreadPoolBuilder;
286
    friend class ThreadPoolToken;
287
288
    // Client-provided task to be executed by this pool.
289
    struct Task {
290
        std::shared_ptr<Runnable> runnable;
291
292
        // Time at which the entry was submitted to the pool.
293
        MonotonicStopWatch submit_time_wather;
294
    };
295
296
    // Creates a new thread pool using a builder.
297
    explicit ThreadPool(const ThreadPoolBuilder& builder);
298
299
    // Initializes the thread pool by starting the minimum number of threads.
300
    Status init();
301
302
    // Dispatcher responsible for dequeueing and executing the tasks
303
    void dispatch_thread();
304
305
    // Create new thread.
306
    //
307
    // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call.
308
    // NOTE: For performance reasons, _lock should not be held.
309
    Status create_thread();
310
311
    // Aborts if the current thread is a member of this thread pool.
312
    void check_not_pool_thread_unlocked();
313
314
    // Submits a task to be run via token.
315
    Status do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
316
317
    // Releases token 't' and invalidates it.
318
    void release_token(ThreadPoolToken* t);
319
320
    //NOTE: not thread safe, caller should keep it thread-safe by using lock
321
    Status try_create_thread(int thread_num, std::lock_guard<std::mutex>&);
322
323
    const std::string _name;
324
    const std::string _workload_group;
325
    int _min_threads;
326
    int _max_threads;
327
    const int _max_queue_size;
328
    const std::chrono::milliseconds _idle_timeout;
329
330
    // Overall status of the pool. Set to an error when the pool is shut down.
331
    //
332
    // Protected by '_lock'.
333
    Status _pool_status;
334
335
    // Synchronizes many of the members of the pool and all of its
336
    // condition variables.
337
    mutable std::mutex _lock;
338
339
    // Condition variable for "pool is idling". Waiters wake up when
340
    // _active_threads reaches zero.
341
    std::condition_variable _idle_cond;
342
343
    // Condition variable for "pool has no threads". Waiters wake up when
344
    // _num_threads and num_pending_threads_ are both 0.
345
    std::condition_variable _no_threads_cond;
346
347
    // Number of threads currently running.
348
    //
349
    // Protected by _lock.
350
    int _num_threads;
351
352
    // Number of threads which are in the process of starting.
353
    // When these threads start, they will decrement this counter and
354
    // accordingly increment '_num_threads'.
355
    //
356
    // Protected by _lock.
357
    int _num_threads_pending_start;
358
359
    // Number of threads currently running and executing client tasks.
360
    //
361
    // Protected by _lock.
362
    int _active_threads;
363
364
    // Total number of client tasks queued, either directly (_queue) or
365
    // indirectly (_tokens).
366
    //
367
    // Protected by _lock.
368
    int _total_queued_tasks;
369
370
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
371
372
    // All allocated tokens.
373
    //
374
    // Protected by _lock.
375
    std::unordered_set<ThreadPoolToken*> _tokens;
376
377
    // FIFO of tokens from which tasks should be executed. Does not own the
378
    // tokens; they are owned by clients and are removed from the FIFO on shutdown.
379
    //
380
    // Protected by _lock.
381
    std::deque<ThreadPoolToken*> _queue;
382
383
    // Pointers to all running threads. Raw pointers are safe because a Thread
384
    // may only go out of scope after being removed from _threads.
385
    //
386
    // Protected by _lock.
387
    std::unordered_set<Thread*> _threads;
388
389
    // List of all threads currently waiting for work.
390
    //
391
    // A thread is added to the front of the list when it goes idle and is
392
    // removed from the front and signaled when new work arrives. This produces a
393
    // LIFO usage pattern that is more efficient than idling on a single
394
    //
395
    // Protected by _lock.
396
    struct IdleThread : public boost::intrusive::list_base_hook<> {
397
1.60k
        explicit IdleThread() = default;
398
399
        // Condition variable for "queue is not empty". Waiters wake up when a new
400
        // task is queued.
401
        std::condition_variable not_empty;
402
        IdleThread(const IdleThread&) = delete;
403
        void operator=(const IdleThread&) = delete;
404
    };
405
    boost::intrusive::list<IdleThread> _idle_threads; // NOLINT(build/include_what_you_use)
406
407
    // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
408
    std::unique_ptr<ThreadPoolToken> _tokenless;
409
    const UniqueId _id;
410
411
    std::shared_ptr<MetricEntity> _metric_entity;
412
    IntGauge* thread_pool_active_threads = nullptr;
413
    IntGauge* thread_pool_queue_size = nullptr;
414
    IntGauge* thread_pool_max_queue_size = nullptr;
415
    IntGauge* thread_pool_max_threads = nullptr;
416
    IntCounter* thread_pool_task_execution_time_ns_total = nullptr;
417
    IntCounter* thread_pool_task_execution_count_total = nullptr;
418
    IntCounter* thread_pool_task_wait_worker_time_ns_total = nullptr;
419
    IntCounter* thread_pool_task_wait_worker_count_total = nullptr;
420
421
    IntCounter* thread_pool_submit_failed = nullptr;
422
};
423
424
// Entry point for token-based task submission and blocking for a particular
425
// thread pool. Tokens can only be created via ThreadPool::new_token().
426
//
427
// All functions are thread-safe. Mutable members are protected via the
428
// ThreadPool's lock.
429
class ThreadPoolToken {
430
public:
431
    // Destroys the token.
432
    //
433
    // May be called on a token with outstanding tasks, as Shutdown() will be
434
    // called first to take care of them.
435
    ~ThreadPoolToken();
436
437
    // Submits a Runnable class.
438
    Status submit(std::shared_ptr<Runnable> r);
439
440
    // Submits a function bound using std::bind(&FuncName, args...).
441
    Status submit_func(std::function<void()> f);
442
443
    // Marks the token as unusable for future submissions. Any queued tasks not
444
    // yet running are destroyed. If tasks are in flight, Shutdown() will wait
445
    // on their completion before returning.
446
    void shutdown();
447
448
    // Waits until all the tasks submitted via this token are completed.
449
    void wait();
450
451
    // Waits for all submissions using this token are complete, or until 'delta'
452
    // time elapses.
453
    //
454
    // Returns true if all submissions are complete, false otherwise.
455
    template <class Rep, class Period>
456
    bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
457
        std::unique_lock<std::mutex> l(_pool->_lock);
458
        _pool->check_not_pool_thread_unlocked();
459
        return _not_running_cond.wait_for(l, delta, [&]() { return !is_active(); });
460
    }
461
462
    bool need_dispatch();
463
464
34
    size_t num_tasks() {
465
34
        std::lock_guard<std::mutex> l(_pool->_lock);
466
34
        return _entries.size();
467
34
    }
468
469
    ThreadPoolToken(const ThreadPoolToken&) = delete;
470
    void operator=(const ThreadPoolToken&) = delete;
471
472
private:
473
    // All possible token states. Legal state transitions:
474
    //   IDLE      -> RUNNING: task is submitted via token
475
    //   IDLE      -> QUIESCED: token or pool is shut down
476
    //   RUNNING   -> IDLE: worker thread finishes executing a task and
477
    //                      there are no more tasks queued to the token
478
    //   RUNNING   -> QUIESCING: token or pool is shut down while worker thread
479
    //                           is executing a task
480
    //   RUNNING   -> QUIESCED: token or pool is shut down
481
    //   QUIESCING -> QUIESCED:  worker thread finishes executing a task
482
    //                           belonging to a shut down token or pool
483
    enum class State {
484
        // Token has no queued tasks.
485
        IDLE,
486
487
        // A worker thread is running one of the token's previously queued tasks.
488
        RUNNING,
489
490
        // No new tasks may be submitted to the token. A worker thread is still
491
        // running a previously queued task.
492
        QUIESCING,
493
494
        // No new tasks may be submitted to the token. There are no active tasks
495
        // either. At this state, the token may only be destroyed.
496
        QUIESCED,
497
    };
498
499
    // Writes a textual representation of the token state in 's' to 'o'.
500
    friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s);
501
502
    friend class ThreadPool;
503
504
    // Returns a textual representation of 's' suitable for debugging.
505
    static const char* state_to_string(State s);
506
507
    // Constructs a new token.
508
    //
509
    // The token may not outlive its thread pool ('pool').
510
    ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
511
                    int max_concurrency = INT_MAX);
512
513
    // Changes this token's state to 'new_state' taking actions as needed.
514
    void transition(State new_state);
515
516
    // Returns true if this token has a task queued and ready to run, or if a
517
    // task belonging to this token is already running.
518
11.5k
    bool is_active() const { return _state == State::RUNNING || _state == State::QUIESCING; }
519
520
    // Returns true if new tasks may be submitted to this token.
521
11.9k
    bool may_submit_new_tasks() const {
522
11.9k
        return _state != State::QUIESCING && _state != State::QUIESCED;
523
11.9k
    }
524
525
25.8k
    State state() const { return _state; }
526
5.06k
    ThreadPool::ExecutionMode mode() const { return _mode; }
527
528
    // Token's configured execution mode.
529
    ThreadPool::ExecutionMode _mode;
530
531
    // Pointer to the token's thread pool.
532
    ThreadPool* _pool = nullptr;
533
534
    // Token state machine.
535
    State _state;
536
537
    // Queued client tasks.
538
    std::deque<ThreadPool::Task> _entries;
539
540
    // Condition variable for "token is idle". Waiters wake up when the token
541
    // transitions to IDLE or QUIESCED.
542
    std::condition_variable _not_running_cond;
543
544
    // Number of worker threads currently executing tasks belonging to this
545
    // token.
546
    int _active_threads;
547
    // The max number of tasks that can be ran concurrenlty. This is to limit
548
    // the concurrency of a thread pool token, and default is INT_MAX(no limited)
549
    int _max_concurrency;
550
    // Number of tasks which has been submitted to the thread pool's queue.
551
    int _num_submitted_tasks;
552
    // Number of tasks which has not been submitted to the thread pool's queue.
553
    int _num_unsubmitted_tasks;
554
};
555
556
} // namespace doris