Coverage Report

Created: 2025-05-12 20:32

/root/doris/be/src/util/threadpool.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
19
// and modified by Doris
20
21
#include "util/threadpool.h"
22
23
#include <algorithm>
24
#include <cstdint>
25
#include <limits>
26
#include <ostream>
27
#include <thread>
28
#include <utility>
29
30
#include "absl/strings/substitute.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "util/debug/sanitizer_scopes.h"
34
#include "util/debug_points.h"
35
#include "util/doris_metrics.h"
36
#include "util/metrics.h"
37
#include "util/scoped_cleanup.h"
38
#include "util/stopwatch.hpp"
39
#include "util/thread.h"
40
41
namespace doris {
42
// The name of these varialbs will be useds as metric name in prometheus.
43
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
45
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
46
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
47
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
48
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total,
49
                                     MetricUnit::NANOSECONDS);
50
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT);
51
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total,
52
                                     MetricUnit::NANOSECONDS);
53
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT);
54
using namespace ErrorCode;
55
56
using std::string;
57
58
class FunctionRunnable : public Runnable {
59
public:
60
10.7k
    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
61
62
5.09k
    void run() override { _func(); }
63
64
private:
65
    std::function<void()> _func;
66
};
67
68
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
69
        : _name(std::move(name)),
70
          _workload_group(std::move(workload_group)),
71
          _min_threads(0),
72
          _max_threads(std::thread::hardware_concurrency()),
73
          _max_queue_size(std::numeric_limits<int>::max()),
74
327
          _idle_timeout(std::chrono::milliseconds(500)) {}
75
76
291
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
77
291
    CHECK_GE(min_threads, 0);
78
291
    _min_threads = min_threads;
79
291
    return *this;
80
291
}
81
82
299
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
83
299
    CHECK_GT(max_threads, 0);
84
299
    _max_threads = max_threads;
85
299
    return *this;
86
299
}
87
88
96
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
89
96
    _max_queue_size = max_queue_size;
90
96
    return *this;
91
96
}
92
93
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
94
1
        std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
95
1
    _cgroup_cpu_ctl = cgroup_cpu_ctl;
96
1
    return *this;
97
1
}
98
99
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
100
                                 int max_concurrency)
101
        : _mode(mode),
102
          _pool(pool),
103
          _state(State::IDLE),
104
          _active_threads(0),
105
          _max_concurrency(max_concurrency),
106
          _num_submitted_tasks(0),
107
2.48k
          _num_unsubmitted_tasks(0) {
108
2.48k
    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
109
1
        _mode = ThreadPool::ExecutionMode::SERIAL;
110
1
    }
111
2.48k
}
112
113
2.46k
ThreadPoolToken::~ThreadPoolToken() {
114
2.46k
    shutdown();
115
2.46k
    _pool->release_token(this);
116
2.46k
}
117
118
9.37k
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
119
9.37k
    return _pool->do_submit(std::move(r), this);
120
9.37k
}
121
122
9.40k
Status ThreadPoolToken::submit_func(std::function<void()> f) {
123
9.40k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
124
9.40k
}
125
126
3.72k
void ThreadPoolToken::shutdown() {
127
3.72k
    std::unique_lock<std::mutex> l(_pool->_lock);
128
3.72k
    _pool->check_not_pool_thread_unlocked();
129
130
    // Clear the queue under the lock, but defer the releasing of the tasks
131
    // outside the lock, in case there are concurrent threads wanting to access
132
    // the ThreadPool. The task's destructors may acquire locks, etc, so this
133
    // also prevents lock inversions.
134
3.72k
    std::deque<ThreadPool::Task> to_release = std::move(_entries);
135
3.72k
    _pool->_total_queued_tasks -= to_release.size();
136
137
3.72k
    switch (state()) {
138
920
    case State::IDLE:
139
        // There were no tasks outstanding; we can quiesce the token immediately.
140
920
        transition(State::QUIESCED);
141
920
        break;
142
1.19k
    case State::RUNNING:
143
        // There were outstanding tasks. If any are still running, switch to
144
        // QUIESCING and wait for them to finish (the worker thread executing
145
        // the token's last task will switch the token to QUIESCED). Otherwise,
146
        // we can quiesce the token immediately.
147
148
        // Note: this is an O(n) operation, but it's expected to be infrequent.
149
        // Plus doing it this way (rather than switching to QUIESCING and waiting
150
        // for a worker thread to process the queue entry) helps retain state
151
        // transition symmetry with ThreadPool::shutdown.
152
14.5k
        for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
153
13.3k
            if (*it == this) {
154
1.20k
                it = _pool->_queue.erase(it);
155
12.1k
            } else {
156
12.1k
                it++;
157
12.1k
            }
158
13.3k
        }
159
160
1.19k
        if (_active_threads == 0) {
161
511
            transition(State::QUIESCED);
162
511
            break;
163
511
        }
164
681
        transition(State::QUIESCING);
165
681
        [[fallthrough]];
166
704
    case State::QUIESCING:
167
        // The token is already quiescing. Just wait for a worker thread to
168
        // switch it to QUIESCED.
169
1.40k
        _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
170
704
        break;
171
1.58k
    default:
172
1.58k
        break;
173
3.72k
    }
174
3.72k
}
175
176
730
void ThreadPoolToken::wait() {
177
730
    std::unique_lock<std::mutex> l(_pool->_lock);
178
730
    _pool->check_not_pool_thread_unlocked();
179
952
    _not_running_cond.wait(l, [this]() { return !is_active(); });
180
730
}
181
182
7.88k
void ThreadPoolToken::transition(State new_state) {
183
7.88k
#ifndef NDEBUG
184
7.88k
    CHECK_NE(_state, new_state);
185
186
7.88k
    switch (_state) {
187
4.22k
    case State::IDLE:
188
4.22k
        CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
189
4.22k
        if (new_state == State::RUNNING) {
190
2.96k
            CHECK(!_entries.empty());
191
2.96k
        } else {
192
1.26k
            CHECK(_entries.empty());
193
1.26k
            CHECK_EQ(_active_threads, 0);
194
1.26k
        }
195
4.22k
        break;
196
2.96k
    case State::RUNNING:
197
2.96k
        CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
198
2.96k
              new_state == State::QUIESCED);
199
2.96k
        CHECK(_entries.empty());
200
2.96k
        if (new_state == State::QUIESCING) {
201
691
            CHECK_GT(_active_threads, 0);
202
691
        }
203
2.96k
        break;
204
691
    case State::QUIESCING:
205
691
        CHECK(new_state == State::QUIESCED);
206
691
        CHECK_EQ(_active_threads, 0);
207
691
        break;
208
0
    case State::QUIESCED:
209
0
        CHECK(false); // QUIESCED is a terminal state
210
0
        break;
211
0
    default:
212
0
        throw Exception(Status::FatalError("Unknown token state: {}", _state));
213
7.88k
    }
214
7.88k
#endif
215
216
    // Take actions based on the state we're entering.
217
7.88k
    switch (new_state) {
218
1.76k
    case State::IDLE:
219
4.22k
    case State::QUIESCED:
220
4.22k
        _not_running_cond.notify_all();
221
4.22k
        break;
222
3.65k
    default:
223
3.65k
        break;
224
7.88k
    }
225
226
7.88k
    _state = new_state;
227
7.88k
}
228
229
0
const char* ThreadPoolToken::state_to_string(State s) {
230
0
    switch (s) {
231
0
    case State::IDLE:
232
0
        return "IDLE";
233
0
        break;
234
0
    case State::RUNNING:
235
0
        return "RUNNING";
236
0
        break;
237
0
    case State::QUIESCING:
238
0
        return "QUIESCING";
239
0
        break;
240
0
    case State::QUIESCED:
241
0
        return "QUIESCED";
242
0
        break;
243
0
    }
244
0
    return "<cannot reach here>";
245
0
}
246
247
7.74k
bool ThreadPoolToken::need_dispatch() {
248
7.74k
    return _state == ThreadPoolToken::State::IDLE ||
249
7.74k
           (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
250
4.78k
            _num_submitted_tasks < _max_concurrency);
251
7.74k
}
252
253
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
254
        : _name(builder._name),
255
          _workload_group(builder._workload_group),
256
          _min_threads(builder._min_threads),
257
          _max_threads(builder._max_threads),
258
          _max_queue_size(builder._max_queue_size),
259
          _idle_timeout(builder._idle_timeout),
260
          _pool_status(Status::Uninitialized("The pool was not initialized.")),
261
          _num_threads(0),
262
          _num_threads_pending_start(0),
263
          _active_threads(0),
264
          _total_queued_tasks(0),
265
          _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
266
          _tokenless(new_token(ExecutionMode::CONCURRENT)),
267
322
          _id(UniqueId::gen_uid()) {}
268
269
310
ThreadPool::~ThreadPool() {
270
    // There should only be one live token: the one used in tokenless submission.
271
310
    CHECK_EQ(1, _tokens.size()) << absl::Substitute(
272
0
            "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
273
310
    shutdown();
274
310
}
275
276
327
Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) {
277
1.35k
    for (int i = 0; i < thread_num; i++) {
278
1.03k
        Status status = create_thread();
279
1.03k
        if (status.ok()) {
280
1.03k
            _num_threads_pending_start++;
281
1.03k
        } else {
282
0
            LOG(WARNING) << "Thread pool " << _name << " failed to create thread: " << status;
283
0
            return status;
284
0
        }
285
1.03k
    }
286
327
    return Status::OK();
287
327
}
288
289
322
Status ThreadPool::init() {
290
322
    if (!_pool_status.is<UNINITIALIZED>()) {
291
0
        return Status::NotSupported("The thread pool {} is already initialized", _name);
292
0
    }
293
322
    _pool_status = Status::OK();
294
295
322
    {
296
322
        std::lock_guard<std::mutex> l(_lock);
297
        // create thread failed should not cause threadpool init failed,
298
        // because thread can be created later such as when submit a task.
299
322
        static_cast<void>(try_create_thread(_min_threads, l));
300
322
    }
301
302
    // _id of thread pool is used to make sure when we create thread pool with same name, we can
303
    // get different _metric_entity
304
    // If not, we will have problem when we deregister entity and register hook.
305
322
    _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
306
322
            fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
307
322
                                                   {"workload_group", _workload_group},
308
322
                                                   {"id", _id.to_string()}});
309
310
322
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
311
322
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
312
322
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
313
322
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
314
322
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total);
315
322
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total);
316
322
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total);
317
322
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total);
318
322
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
319
320
322
    _metric_entity->register_hook("update", [this]() {
321
0
        {
322
0
            std::lock_guard<std::mutex> l(_lock);
323
0
            if (!_pool_status.ok()) {
324
0
                return;
325
0
            }
326
0
        }
327
328
0
        thread_pool_active_threads->set_value(num_active_threads());
329
0
        thread_pool_queue_size->set_value(get_queue_size());
330
0
        thread_pool_max_queue_size->set_value(get_max_queue_size());
331
0
        thread_pool_max_threads->set_value(max_threads());
332
0
    });
333
322
    return Status::OK();
334
322
}
335
336
533
void ThreadPool::shutdown() {
337
    // Why access to doris_metrics is safe here?
338
    // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
339
    // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
340
    // ExecEnv::destroy().
341
533
    DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
342
533
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
343
533
    std::unique_lock<std::mutex> l(_lock);
344
533
    check_not_pool_thread_unlocked();
345
346
    // Note: this is the same error seen at submission if the pool is at
347
    // capacity, so clients can't tell them apart. This isn't really a practical
348
    // concern though because shutting down a pool typically requires clients to
349
    // be quiesced first, so there's no danger of a client getting confused.
350
    // Not print stack trace here
351
533
    _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
352
533
            "The thread pool {} has been shut down.", _name);
353
354
    // Clear the various queues under the lock, but defer the releasing
355
    // of the tasks outside the lock, in case there are concurrent threads
356
    // wanting to access the ThreadPool. The task's destructors may acquire
357
    // locks, etc, so this also prevents lock inversions.
358
533
    _queue.clear();
359
360
533
    std::deque<std::deque<Task>> to_release;
361
595
    for (auto* t : _tokens) {
362
595
        if (!t->_entries.empty()) {
363
3
            to_release.emplace_back(std::move(t->_entries));
364
3
        }
365
595
        switch (t->state()) {
366
346
        case ThreadPoolToken::State::IDLE:
367
            // The token is idle; we can quiesce it immediately.
368
346
            t->transition(ThreadPoolToken::State::QUIESCED);
369
346
            break;
370
10
        case ThreadPoolToken::State::RUNNING:
371
            // The token has tasks associated with it. If they're merely queued
372
            // (i.e. there are no active threads), the tasks will have been removed
373
            // above and we can quiesce immediately. Otherwise, we need to wait for
374
            // the threads to finish.
375
10
            t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
376
10
                                                 : ThreadPoolToken::State::QUIESCED);
377
10
            break;
378
239
        default:
379
239
            break;
380
595
        }
381
595
    }
382
383
    // The queues are empty. Wake any sleeping worker threads and wait for all
384
    // of them to exit. Some worker threads will exit immediately upon waking,
385
    // while others will exit after they finish executing an outstanding task.
386
533
    _total_queued_tasks = 0;
387
1.63k
    while (!_idle_threads.empty()) {
388
1.09k
        _idle_threads.front().not_empty.notify_one();
389
1.09k
        _idle_threads.pop_front();
390
1.09k
    }
391
392
821
    _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
393
394
    // All the threads have exited. Check the state of each token.
395
595
    for (auto* t : _tokens) {
396
595
        DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
397
595
               t->state() == ThreadPoolToken::State::QUIESCED);
398
595
    }
399
533
}
400
401
2.48k
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
402
2.48k
    std::lock_guard<std::mutex> l(_lock);
403
2.48k
    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
404
2.48k
    if (!_tokens.insert(t.get()).second) {
405
0
        throw Exception(Status::InternalError("duplicate token"));
406
0
    }
407
2.48k
    return t;
408
2.48k
}
409
410
2.46k
void ThreadPool::release_token(ThreadPoolToken* t) {
411
2.46k
    std::lock_guard<std::mutex> l(_lock);
412
2.46k
    CHECK(!t->is_active()) << absl::Substitute("Token with state $0 may not be released",
413
0
                                               ThreadPoolToken::state_to_string(t->state()));
414
2.46k
    CHECK_EQ(1, _tokens.erase(t));
415
2.46k
}
416
417
1.59k
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
418
1.59k
    return do_submit(std::move(r), _tokenless.get());
419
1.59k
}
420
421
1.36k
Status ThreadPool::submit_func(std::function<void()> f) {
422
1.36k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
423
1.36k
}
424
425
11.0k
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
426
11.0k
    DCHECK(token);
427
428
11.0k
    std::unique_lock<std::mutex> l(_lock);
429
11.0k
    if (PREDICT_FALSE(!_pool_status.ok())) {
430
1
        return _pool_status;
431
1
    }
432
433
11.0k
    if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
434
3.32k
        return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
435
3.32k
    }
436
437
    // Size limit check.
438
7.71k
    int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
439
7.71k
                                 static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
440
7.71k
    if (capacity_remaining < 1) {
441
4
        thread_pool_submit_failed->increment(1);
442
4
        return Status::Error<SERVICE_UNAVAILABLE>(
443
4
                "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
444
4
                _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
445
4
                _max_queue_size);
446
4
    }
447
448
    // Should we create another thread?
449
450
    // We assume that each current inactive thread will grab one item from the
451
    // queue.  If it seems like we'll need another thread, we create one.
452
    //
453
    // Rather than creating the thread here, while holding the lock, we defer
454
    // it to down below. This is because thread creation can be rather slow
455
    // (hundreds of milliseconds in some cases) and we'd like to allow the
456
    // existing threads to continue to process tasks while we do so.
457
    //
458
    // In theory, a currently active thread could finish immediately after this
459
    // calculation but before our new worker starts running. This would mean we
460
    // created a thread we didn't really need. However, this race is unavoidable
461
    // and harmless.
462
    //
463
    // Of course, we never create more than _max_threads threads no matter what.
464
7.71k
    int threads_from_this_submit =
465
7.71k
            token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
466
7.71k
    int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
467
7.71k
    int additional_threads =
468
7.71k
            static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
469
7.71k
    bool need_a_thread = false;
470
7.71k
    if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
471
626
        need_a_thread = true;
472
626
        _num_threads_pending_start++;
473
626
    }
474
475
7.71k
    Task task;
476
7.71k
    task.runnable = std::move(r);
477
7.71k
    task.submit_time_wather.start();
478
479
    // Add the task to the token's queue.
480
7.71k
    ThreadPoolToken::State state = token->state();
481
7.71k
    DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
482
7.71k
    token->_entries.emplace_back(std::move(task));
483
    // When we need to execute the task in the token, we submit the token object to the queue.
484
    // There are currently two places where tokens will be submitted to the queue:
485
    // 1. When submitting a new task, if the token is still in the IDLE state,
486
    //    or the concurrency of the token has not reached the online level, it will be added to the queue.
487
    // 2. When the dispatch thread finishes executing a task:
488
    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
489
    //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
490
    //       then submitted to the queue.
491
7.71k
    if (token->need_dispatch()) {
492
5.71k
        _queue.emplace_back(token);
493
5.71k
        ++token->_num_submitted_tasks;
494
5.71k
        if (state == ThreadPoolToken::State::IDLE) {
495
2.96k
            token->transition(ThreadPoolToken::State::RUNNING);
496
2.96k
        }
497
5.71k
    } else {
498
2.00k
        ++token->_num_unsubmitted_tasks;
499
2.00k
    }
500
7.71k
    _total_queued_tasks++;
501
502
    // Wake up an idle thread for this task. Choosing the thread at the front of
503
    // the list ensures LIFO semantics as idling threads are also added to the front.
504
    //
505
    // If there are no idle threads, the new task remains on the queue and is
506
    // processed by an active thread (or a thread we're about to create) at some
507
    // point in the future.
508
7.71k
    if (!_idle_threads.empty()) {
509
1.58k
        _idle_threads.front().not_empty.notify_one();
510
1.58k
        _idle_threads.pop_front();
511
1.58k
    }
512
7.71k
    l.unlock();
513
514
7.71k
    if (need_a_thread) {
515
626
        Status status = create_thread();
516
626
        if (!status.ok()) {
517
0
            l.lock();
518
0
            _num_threads_pending_start--;
519
0
            if (_num_threads + _num_threads_pending_start == 0) {
520
                // If we have no threads, we can't do any work.
521
0
                return status;
522
0
            }
523
            // If we failed to create a thread, but there are still some other
524
            // worker threads, log a warning message and continue.
525
0
            LOG(WARNING) << "Thread pool " << _name
526
0
                         << " failed to create thread: " << status.to_string();
527
0
        }
528
626
    }
529
530
7.71k
    return Status::OK();
531
7.71k
}
532
533
120
void ThreadPool::wait() {
534
120
    std::unique_lock<std::mutex> l(_lock);
535
120
    check_not_pool_thread_unlocked();
536
239
    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
537
120
}
538
539
1.65k
void ThreadPool::dispatch_thread() {
540
1.65k
    std::unique_lock<std::mutex> l(_lock);
541
1.65k
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
542
1.65k
    if (!_threads.insert(Thread::current_thread()).second) {
543
0
        throw Exception(Status::InternalError("duplicate token"));
544
0
    }
545
1.65k
    DCHECK_GT(_num_threads_pending_start, 0);
546
1.65k
    _num_threads++;
547
1.65k
    _num_threads_pending_start--;
548
549
1.65k
    if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) {
550
0
        static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
551
0
    }
552
553
    // Owned by this worker thread and added/removed from _idle_threads as needed.
554
1.65k
    IdleThread me;
555
556
270k
    while (true) {
557
        // Note: Status::Aborted() is used to indicate normal shutdown.
558
270k
        if (!_pool_status.ok()) {
559
1.13k
            VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
560
1.13k
            break;
561
1.13k
        }
562
563
269k
        if (_num_threads + _num_threads_pending_start > _max_threads) {
564
2
            break;
565
2
        }
566
567
269k
        if (_queue.empty()) {
568
            // There's no work to do, let's go idle.
569
            //
570
            // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
571
265k
            _idle_threads.push_front(me);
572
265k
            SCOPED_CLEANUP({
573
                // For some wake ups (i.e. shutdown or do_submit) this thread is
574
                // guaranteed to be unlinked after being awakened. In others (i.e.
575
                // spurious wake-up or Wait timeout), it'll still be linked.
576
265k
                if (me.is_linked()) {
577
265k
                    _idle_threads.erase(_idle_threads.iterator_to(me));
578
265k
                }
579
265k
            });
580
265k
            if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
581
                // After much investigation, it appears that pthread condition variables have
582
                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
583
                // another thread did in fact signal. Apparently after a timeout there is some
584
                // brief period during which another thread may actually grab the internal mutex
585
                // protecting the state, signal, and release again before we get the mutex. So,
586
                // we'll recheck the empty queue case regardless.
587
263k
                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
588
507
                    VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
589
0
                                << std::chrono::duration_cast<std::chrono::milliseconds>(
590
0
                                           _idle_timeout)
591
0
                                           .count()
592
0
                                << "ms of idle time.";
593
507
                    break;
594
507
                }
595
263k
            }
596
264k
            continue;
597
265k
        }
598
599
4.36k
        MonotonicStopWatch task_execution_time_watch;
600
4.36k
        task_execution_time_watch.start();
601
        // Get the next token and task to execute.
602
4.36k
        ThreadPoolToken* token = _queue.front();
603
4.36k
        _queue.pop_front();
604
4.36k
        DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
605
4.36k
        DCHECK(!token->_entries.empty());
606
4.36k
        Task task = std::move(token->_entries.front());
607
4.36k
        thread_pool_task_wait_worker_time_ns_total->increment(
608
4.36k
                task.submit_time_wather.elapsed_time());
609
4.36k
        thread_pool_task_wait_worker_count_total->increment(1);
610
4.36k
        token->_entries.pop_front();
611
4.36k
        token->_active_threads++;
612
4.36k
        --_total_queued_tasks;
613
4.36k
        ++_active_threads;
614
615
4.36k
        l.unlock();
616
617
        // Execute the task
618
4.36k
        task.runnable->run();
619
        // Destruct the task while we do not hold the lock.
620
        //
621
        // The task's destructor may be expensive if it has a lot of bound
622
        // objects, and we don't want to block submission of the threadpool.
623
        // In the worst case, the destructor might even try to do something
624
        // with this threadpool, and produce a deadlock.
625
4.36k
        task.runnable.reset();
626
4.36k
        l.lock();
627
4.36k
        thread_pool_task_execution_time_ns_total->increment(
628
4.36k
                task_execution_time_watch.elapsed_time());
629
4.36k
        thread_pool_task_execution_count_total->increment(1);
630
        // Possible states:
631
        // 1. The token was shut down while we ran its task. Transition to QUIESCED.
632
        // 2. The token has no more queued tasks. Transition back to IDLE.
633
        // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
634
4.36k
        ThreadPoolToken::State state = token->state();
635
4.36k
        DCHECK(state == ThreadPoolToken::State::RUNNING ||
636
4.36k
               state == ThreadPoolToken::State::QUIESCING);
637
4.36k
        --token->_active_threads;
638
4.36k
        --token->_num_submitted_tasks;
639
640
        // handle shutdown && idle
641
4.36k
        if (token->_active_threads == 0) {
642
3.59k
            if (state == ThreadPoolToken::State::QUIESCING) {
643
691
                DCHECK(token->_entries.empty());
644
691
                token->transition(ThreadPoolToken::State::QUIESCED);
645
2.90k
            } else if (token->_entries.empty()) {
646
1.76k
                token->transition(ThreadPoolToken::State::IDLE);
647
1.76k
            }
648
3.59k
        }
649
650
        // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
651
4.36k
        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
652
653
        // If token->state is running and there are unsubmitted tasks in the token, we put
654
        // the token back.
655
4.36k
        if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
656
            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
657
            // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
658
            // threads are running for the token.
659
849
            _queue.emplace_back(token);
660
849
            ++token->_num_submitted_tasks;
661
849
            --token->_num_unsubmitted_tasks;
662
849
        }
663
664
4.36k
        if (--_active_threads == 0) {
665
917
            _idle_cond.notify_all();
666
917
        }
667
4.36k
    }
668
669
    // It's important that we hold the lock between exiting the loop and dropping
670
    // _num_threads. Otherwise it's possible someone else could come along here
671
    // and add a new task just as the last running thread is about to exit.
672
1.65k
    CHECK(l.owns_lock());
673
674
1.65k
    CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
675
1.65k
    _num_threads--;
676
1.65k
    if (_num_threads + _num_threads_pending_start == 0) {
677
787
        _no_threads_cond.notify_all();
678
679
        // Sanity check: if we're the last thread exiting, the queue ought to be
680
        // empty. Otherwise it will never get processed.
681
787
        CHECK(_queue.empty());
682
787
        DCHECK_EQ(0, _total_queued_tasks);
683
787
    }
684
1.65k
}
685
686
1.65k
Status ThreadPool::create_thread() {
687
1.65k
    return Thread::create("thread pool", absl::Substitute("$0 [worker]", _name),
688
1.65k
                          &ThreadPool::dispatch_thread, this, nullptr);
689
1.65k
}
690
691
5.10k
void ThreadPool::check_not_pool_thread_unlocked() {
692
5.10k
    Thread* current = Thread::current_thread();
693
5.10k
    if (_threads.contains(current)) {
694
0
        throw Exception(
695
0
                Status::FatalError("Thread belonging to thread pool {} with "
696
0
                                   "name {} called pool function that would result in deadlock",
697
0
                                   _name, current->name()));
698
0
    }
699
5.10k
}
700
701
4
Status ThreadPool::set_min_threads(int min_threads) {
702
4
    std::lock_guard<std::mutex> l(_lock);
703
4
    if (min_threads > _max_threads) {
704
        // min threads can not be set greater than max threads
705
1
        return Status::InternalError("set thread pool {} min_threads failed", _name);
706
1
    }
707
3
    _min_threads = min_threads;
708
3
    if (min_threads > _num_threads + _num_threads_pending_start) {
709
0
        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
710
0
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
711
0
    }
712
3
    return Status::OK();
713
3
}
714
715
8
Status ThreadPool::set_max_threads(int max_threads) {
716
8
    std::lock_guard<std::mutex> l(_lock);
717
8
    DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", {
718
8
        _max_threads = max_threads;
719
8
        return Status::OK();
720
8
    })
721
8
    if (_min_threads > max_threads) {
722
        // max threads can not be set less than min threads
723
1
        return Status::InternalError("set thread pool {} max_threads failed", _name);
724
1
    }
725
726
7
    _max_threads = max_threads;
727
7
    if (_max_threads > _num_threads + _num_threads_pending_start) {
728
5
        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
729
5
        addition_threads = std::min(addition_threads, _total_queued_tasks);
730
5
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
731
5
    }
732
7
    return Status::OK();
733
7
}
734
735
0
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
736
0
    return o << ThreadPoolToken::state_to_string(s);
737
0
}
738
739
} // namespace doris