Coverage Report

Created: 2024-11-22 20:18

/root/doris/be/src/util/threadpool.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.h
19
// and modified by Doris
20
21
#pragma once
22
23
#include <limits.h>
24
#include <stddef.h>
25
26
#include <boost/intrusive/detail/algo_type.hpp>
27
#include <boost/intrusive/list.hpp>
28
#include <boost/intrusive/list_hook.hpp>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <chrono> // IWYU pragma: keep
31
#include <condition_variable>
32
#include <deque>
33
#include <functional>
34
#include <iosfwd>
35
#include <memory>
36
#include <mutex>
37
#include <string>
38
#include <unordered_set>
39
40
#include "agent/cgroup_cpu_ctl.h"
41
#include "common/status.h"
42
#include "util/work_thread_pool.hpp"
43
44
namespace doris {
45
46
class Thread;
47
class ThreadPool;
48
class ThreadPoolToken;
49
50
class Runnable {
51
public:
52
    virtual void run() = 0;
53
7.91k
    virtual ~Runnable() {}
54
};
55
56
// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
57
//
58
// name: Used for debugging output and default names of the worker threads.
59
//    Since thread names are limited to 16 characters on Linux, it's good to
60
//    choose a short name here.
61
//    Required.
62
//
63
// trace_metric_prefix: used to prefix the names of TraceMetric counters.
64
//    When a task on a thread pool has an associated trace, the thread pool
65
//    implementation will increment TraceMetric counters to indicate the
66
//    amount of time spent waiting in the queue as well as the amount of wall
67
//    and CPU time spent executing. By default, these counters are prefixed
68
//    with the name of the thread pool. For example, if the pool is named
69
//    'apply', then counters such as 'apply.queue_time_us' will be
70
//    incremented.
71
//
72
//    The TraceMetrics implementation relies on the number of distinct counter
73
//    names being small. Thus, if the thread pool name itself is dynamically
74
//    generated, the default behavior described above would result in an
75
//    unbounded number of distinct counter names. The 'trace_metric_prefix'
76
//    setting can be used to override the prefix used in generating the trace
77
//    metric names.
78
//
79
//    For example, the Raft thread pools are named "<tablet id>-raft" which
80
//    has unbounded cardinality (a server may have thousands of different
81
//    tablet IDs over its lifetime). In that case, setting the prefix to
82
//    "raft" will avoid any issues.
83
//
84
// min_threads: Minimum number of threads we'll have at any time.
85
//    Default: 0.
86
//
87
// max_threads: Maximum number of threads we'll have at any time.
88
//    Default: Number of CPUs detected on the system.
89
//
90
// max_queue_size: Maximum number of items to enqueue before returning a
91
//    Status::ServiceUnavailable message from Submit().
92
//    Default: INT_MAX.
93
//
94
// idle_timeout: How long we'll keep around an idle thread before timing it out.
95
//    We always keep at least min_threads.
96
//    Default: 500 milliseconds.
97
//
98
// metrics: Histograms, counters, etc. to update on various threadpool events.
99
//    Default: not set.
100
//
101
class ThreadPoolBuilder {
102
public:
103
    explicit ThreadPoolBuilder(std::string name);
104
105
    // Note: We violate the style guide by returning mutable references here
106
    // in order to provide traditional Builder pattern conveniences.
107
    ThreadPoolBuilder& set_min_threads(int min_threads);
108
    ThreadPoolBuilder& set_max_threads(int max_threads);
109
    ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
110
    ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl);
111
    template <class Rep, class Period>
112
7
    ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
113
7
        _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
114
7
        return *this;
115
7
    }
_ZN5doris17ThreadPoolBuilder16set_idle_timeoutIlSt5ratioILl1ELl1000EEEERS0_RKNSt6chrono8durationIT_T0_EE
Line
Count
Source
112
6
    ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
113
6
        _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
114
6
        return *this;
115
6
    }
_ZN5doris17ThreadPoolBuilder16set_idle_timeoutIlSt5ratioILl1ELl1000000EEEERS0_RKNSt6chrono8durationIT_T0_EE
Line
Count
Source
112
1
    ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
113
1
        _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
114
1
        return *this;
115
1
    }
116
    // Instantiate a new ThreadPool with the existing builder arguments.
117
    template <typename ThreadPoolType>
118
242
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
119
242
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
120
5
            pool->reset(new ThreadPoolType(*this));
121
237
            RETURN_IF_ERROR((*pool)->init());
122
237
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
123
5
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
124
5
        } else {
125
237
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
126
237
        }
127
237
        return Status::OK();
128
242
    }
_ZNK5doris17ThreadPoolBuilder5buildINS_10ThreadPoolEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS5_EE
Line
Count
Source
118
237
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
119
237
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
120
237
            pool->reset(new ThreadPoolType(*this));
121
237
            RETURN_IF_ERROR((*pool)->init());
122
237
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
123
237
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
124
237
        } else {
125
237
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
126
237
        }
127
237
        return Status::OK();
128
237
    }
_ZNK5doris17ThreadPoolBuilder5buildINS_14WorkThreadPoolILb1EEEEENS_6StatusEPSt10unique_ptrIT_St14default_deleteIS6_EE
Line
Count
Source
118
5
    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
119
5
        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
120
5
            pool->reset(new ThreadPoolType(*this));
121
5
            RETURN_IF_ERROR((*pool)->init());
122
5
        } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) {
123
5
            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name));
124
5
        } else {
125
5
            static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType");
126
5
        }
127
5
        return Status::OK();
128
5
    }
129
130
private:
131
    friend class ThreadPool;
132
    const std::string _name;
133
    int _min_threads;
134
    int _max_threads;
135
    int _max_queue_size;
136
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
137
    std::chrono::milliseconds _idle_timeout;
138
139
    ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
140
    void operator=(const ThreadPoolBuilder&) = delete;
141
142
    template <typename T>
143
    static constexpr bool always_false_v = false;
144
};
145
146
// Thread pool with a variable number of threads.
147
//
148
// Tasks submitted directly to the thread pool enter a FIFO queue and are
149
// dispatched to a worker thread when one becomes free. Tasks may also be
150
// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
151
// can then be used to block on logical groups of tasks.
152
//
153
// A token operates in one of two ExecutionModes, determined at token
154
// construction time:
155
// 1. SERIAL: submitted tasks are run one at a time.
156
// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
157
//    tasks submitted without a token, but the logical grouping that tokens
158
//    impart can be useful when a pool is shared by many contexts (e.g. to
159
//    safely shut down one context, to derive context-specific metrics, etc.).
160
//
161
// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
162
// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
163
// processed in a round-robin fashion, one task at a time. This prevents them
164
// from starving one another. However, tokenless (and CONCURRENT token-based)
165
// tasks can starve SERIAL token-based tasks.
166
//
167
// Usage Example:
168
//    static void Func(int n) { ... }
169
//    class Task : public Runnable { ... }
170
//
171
//    std::unique_ptr<ThreadPool> thread_pool;
172
//    CHECK_OK(
173
//        ThreadPoolBuilder("my_pool")
174
//            .set_min_threads(0)
175
//            .set_max_threads(5)
176
//            .set_max_queue_size(10)
177
//            .set_idle_timeout(2000ms))
178
//            .Build(&thread_pool));
179
//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
180
//    thread_pool->SubmitFunc(std::bind(&Func, 10));
181
class ThreadPool {
182
public:
183
    ~ThreadPool();
184
185
    // Wait for the running tasks to complete and then shutdown the threads.
186
    // All the other pending tasks in the queue will be removed.
187
    // NOTE: That the user may implement an external abort logic for the
188
    //       runnables, that must be called before Shutdown(), if the system
189
    //       should know about the non-execution of these tasks, or the runnable
190
    //       require an explicit "abort" notification to exit from the run loop.
191
    void shutdown();
192
193
    // Submits a Runnable class.
194
    Status submit(std::shared_ptr<Runnable> r);
195
196
    // Submits a function bound using std::bind(&FuncName, args...).
197
    Status submit_func(std::function<void()> f);
198
199
    // Waits until all the tasks are completed.
200
    void wait();
201
202
    // Waits for the pool to reach the idle state, or until 'delta' time elapses.
203
    // Returns true if the pool reached the idle state, false otherwise.
204
    template <class Rep, class Period>
205
    bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
206
        std::unique_lock<std::mutex> l(_lock);
207
        check_not_pool_thread_unlocked();
208
        return _idle_cond.wait_for(
209
                l, delta, [&]() { return _total_queued_tasks <= 0 && _active_threads <= 0; });
210
    }
211
    Status set_min_threads(int min_threads);
212
    Status set_max_threads(int max_threads);
213
214
    // Allocates a new token for use in token-based task submission. All tokens
215
    // must be destroyed before their ThreadPool is destroyed.
216
    //
217
    // There is no limit on the number of tokens that may be allocated.
218
    enum class ExecutionMode {
219
        // Tasks submitted via this token will be executed serially.
220
        SERIAL,
221
222
        // Tasks submitted via this token may be executed concurrently.
223
        CONCURRENT
224
    };
225
    std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int max_concurrency = INT_MAX);
226
227
    // Return the number of threads currently running (or in the process of starting up)
228
    // for this thread pool.
229
24
    int num_threads() const {
230
24
        std::lock_guard<std::mutex> l(_lock);
231
24
        return _num_threads + _num_threads_pending_start;
232
24
    }
233
234
7
    int max_threads() const {
235
7
        std::lock_guard<std::mutex> l(_lock);
236
7
        return _max_threads;
237
7
    }
238
239
4
    int min_threads() const {
240
4
        std::lock_guard<std::mutex> l(_lock);
241
4
        return _min_threads;
242
4
    }
243
244
0
    int num_threads_pending_start() const {
245
0
        std::lock_guard<std::mutex> l(_lock);
246
0
        return _num_threads_pending_start;
247
0
    }
248
249
0
    int num_active_threads() const {
250
0
        std::lock_guard<std::mutex> l(_lock);
251
0
        return _active_threads;
252
0
    }
253
254
0
    int get_queue_size() const {
255
0
        std::lock_guard<std::mutex> l(_lock);
256
0
        return _total_queued_tasks;
257
0
    }
258
259
0
    std::vector<int> debug_info() {
260
0
        std::lock_guard<std::mutex> l(_lock);
261
0
        std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads,
262
0
                                _max_threads};
263
0
        return arr;
264
0
    }
265
266
private:
267
    friend class ThreadPoolBuilder;
268
    friend class ThreadPoolToken;
269
270
    // Client-provided task to be executed by this pool.
271
    struct Task {
272
        std::shared_ptr<Runnable> runnable;
273
274
        // Time at which the entry was submitted to the pool.
275
        std::chrono::time_point<std::chrono::system_clock> submit_time;
276
    };
277
278
    // Creates a new thread pool using a builder.
279
    explicit ThreadPool(const ThreadPoolBuilder& builder);
280
281
    // Initializes the thread pool by starting the minimum number of threads.
282
    Status init();
283
284
    // Dispatcher responsible for dequeueing and executing the tasks
285
    void dispatch_thread();
286
287
    // Create new thread.
288
    //
289
    // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call.
290
    // NOTE: For performance reasons, _lock should not be held.
291
    Status create_thread();
292
293
    // Aborts if the current thread is a member of this thread pool.
294
    void check_not_pool_thread_unlocked();
295
296
    // Submits a task to be run via token.
297
    Status do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
298
299
    // Releases token 't' and invalidates it.
300
    void release_token(ThreadPoolToken* t);
301
302
    const std::string _name;
303
    int _min_threads;
304
    int _max_threads;
305
    const int _max_queue_size;
306
    const std::chrono::milliseconds _idle_timeout;
307
308
    // Overall status of the pool. Set to an error when the pool is shut down.
309
    //
310
    // Protected by '_lock'.
311
    Status _pool_status;
312
313
    // Synchronizes many of the members of the pool and all of its
314
    // condition variables.
315
    mutable std::mutex _lock;
316
317
    // Condition variable for "pool is idling". Waiters wake up when
318
    // _active_threads reaches zero.
319
    std::condition_variable _idle_cond;
320
321
    // Condition variable for "pool has no threads". Waiters wake up when
322
    // _num_threads and num_pending_threads_ are both 0.
323
    std::condition_variable _no_threads_cond;
324
325
    // Number of threads currently running.
326
    //
327
    // Protected by _lock.
328
    int _num_threads;
329
330
    // Number of threads which are in the process of starting.
331
    // When these threads start, they will decrement this counter and
332
    // accordingly increment '_num_threads'.
333
    //
334
    // Protected by _lock.
335
    int _num_threads_pending_start;
336
337
    // Number of threads currently running and executing client tasks.
338
    //
339
    // Protected by _lock.
340
    int _active_threads;
341
342
    // Total number of client tasks queued, either directly (_queue) or
343
    // indirectly (_tokens).
344
    //
345
    // Protected by _lock.
346
    int _total_queued_tasks;
347
348
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
349
350
    // All allocated tokens.
351
    //
352
    // Protected by _lock.
353
    std::unordered_set<ThreadPoolToken*> _tokens;
354
355
    // FIFO of tokens from which tasks should be executed. Does not own the
356
    // tokens; they are owned by clients and are removed from the FIFO on shutdown.
357
    //
358
    // Protected by _lock.
359
    std::deque<ThreadPoolToken*> _queue;
360
361
    // Pointers to all running threads. Raw pointers are safe because a Thread
362
    // may only go out of scope after being removed from _threads.
363
    //
364
    // Protected by _lock.
365
    std::unordered_set<Thread*> _threads;
366
367
    // List of all threads currently waiting for work.
368
    //
369
    // A thread is added to the front of the list when it goes idle and is
370
    // removed from the front and signaled when new work arrives. This produces a
371
    // LIFO usage pattern that is more efficient than idling on a single
372
    //
373
    // Protected by _lock.
374
    struct IdleThread : public boost::intrusive::list_base_hook<> {
375
1.40k
        explicit IdleThread() {}
376
377
        // Condition variable for "queue is not empty". Waiters wake up when a new
378
        // task is queued.
379
        std::condition_variable not_empty;
380
        IdleThread(const IdleThread&) = delete;
381
        void operator=(const IdleThread&) = delete;
382
    };
383
    boost::intrusive::list<IdleThread> _idle_threads; // NOLINT(build/include_what_you_use)
384
385
    // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
386
    std::unique_ptr<ThreadPoolToken> _tokenless;
387
388
    ThreadPool(const ThreadPool&) = delete;
389
    void operator=(const ThreadPool&) = delete;
390
};
391
392
// Entry point for token-based task submission and blocking for a particular
393
// thread pool. Tokens can only be created via ThreadPool::new_token().
394
//
395
// All functions are thread-safe. Mutable members are protected via the
396
// ThreadPool's lock.
397
class ThreadPoolToken {
398
public:
399
    // Destroys the token.
400
    //
401
    // May be called on a token with outstanding tasks, as Shutdown() will be
402
    // called first to take care of them.
403
    ~ThreadPoolToken();
404
405
    // Submits a Runnable class.
406
    Status submit(std::shared_ptr<Runnable> r);
407
408
    // Submits a function bound using std::bind(&FuncName, args...).
409
    Status submit_func(std::function<void()> f);
410
411
    // Marks the token as unusable for future submissions. Any queued tasks not
412
    // yet running are destroyed. If tasks are in flight, Shutdown() will wait
413
    // on their completion before returning.
414
    void shutdown();
415
416
    // Waits until all the tasks submitted via this token are completed.
417
    void wait();
418
419
    // Waits for all submissions using this token are complete, or until 'delta'
420
    // time elapses.
421
    //
422
    // Returns true if all submissions are complete, false otherwise.
423
    template <class Rep, class Period>
424
    bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
425
        std::unique_lock<std::mutex> l(_pool->_lock);
426
        _pool->check_not_pool_thread_unlocked();
427
        return _not_running_cond.wait_for(l, delta, [&]() { return !is_active(); });
428
    }
429
430
    bool need_dispatch();
431
432
34
    size_t num_tasks() {
433
34
        std::lock_guard<std::mutex> l(_pool->_lock);
434
34
        return _entries.size();
435
34
    }
436
437
private:
438
    // All possible token states. Legal state transitions:
439
    //   IDLE      -> RUNNING: task is submitted via token
440
    //   IDLE      -> QUIESCED: token or pool is shut down
441
    //   RUNNING   -> IDLE: worker thread finishes executing a task and
442
    //                      there are no more tasks queued to the token
443
    //   RUNNING   -> QUIESCING: token or pool is shut down while worker thread
444
    //                           is executing a task
445
    //   RUNNING   -> QUIESCED: token or pool is shut down
446
    //   QUIESCING -> QUIESCED:  worker thread finishes executing a task
447
    //                           belonging to a shut down token or pool
448
    enum class State {
449
        // Token has no queued tasks.
450
        IDLE,
451
452
        // A worker thread is running one of the token's previously queued tasks.
453
        RUNNING,
454
455
        // No new tasks may be submitted to the token. A worker thread is still
456
        // running a previously queued task.
457
        QUIESCING,
458
459
        // No new tasks may be submitted to the token. There are no active tasks
460
        // either. At this state, the token may only be destroyed.
461
        QUIESCED,
462
    };
463
464
    // Writes a textual representation of the token state in 's' to 'o'.
465
    friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s);
466
467
    friend class ThreadPool;
468
469
    // Returns a textual representation of 's' suitable for debugging.
470
    static const char* state_to_string(State s);
471
472
    // Constructs a new token.
473
    //
474
    // The token may not outlive its thread pool ('pool').
475
    ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
476
                    int max_concurrency = INT_MAX);
477
478
    // Changes this token's state to 'new_state' taking actions as needed.
479
    void transition(State new_state);
480
481
    // Returns true if this token has a task queued and ready to run, or if a
482
    // task belonging to this token is already running.
483
8.72k
    bool is_active() const { return _state == State::RUNNING || _state == State::QUIESCING; }
484
485
    // Returns true if new tasks may be submitted to this token.
486
7.92k
    bool may_submit_new_tasks() const {
487
7.92k
        return _state != State::QUIESCING && _state != State::QUIESCED;
488
7.92k
    }
489
490
19.5k
    State state() const { return _state; }
491
2.75k
    ThreadPool::ExecutionMode mode() const { return _mode; }
492
493
    // Token's configured execution mode.
494
    ThreadPool::ExecutionMode _mode;
495
496
    // Pointer to the token's thread pool.
497
    ThreadPool* _pool = nullptr;
498
499
    // Token state machine.
500
    State _state;
501
502
    // Queued client tasks.
503
    std::deque<ThreadPool::Task> _entries;
504
505
    // Condition variable for "token is idle". Waiters wake up when the token
506
    // transitions to IDLE or QUIESCED.
507
    std::condition_variable _not_running_cond;
508
509
    // Number of worker threads currently executing tasks belonging to this
510
    // token.
511
    int _active_threads;
512
    // The max number of tasks that can be ran concurrenlty. This is to limit
513
    // the concurrency of a thread pool token, and default is INT_MAX(no limited)
514
    int _max_concurrency;
515
    // Number of tasks which has been submitted to the thread pool's queue.
516
    int _num_submitted_tasks;
517
    // Number of tasks which has not been submitted to the thread pool's queue.
518
    int _num_unsubmitted_tasks;
519
520
    ThreadPoolToken(const ThreadPoolToken&) = delete;
521
    void operator=(const ThreadPoolToken&) = delete;
522
};
523
524
} // namespace doris