Coverage Report

Created: 2025-04-15 14:42

/root/doris/be/src/util/threadpool.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
19
// and modified by Doris
20
21
#include "util/threadpool.h"
22
23
#include <algorithm>
24
#include <cstdint>
25
#include <limits>
26
#include <ostream>
27
#include <thread>
28
#include <utility>
29
30
#include "common/logging.h"
31
#include "gutil/map-util.h"
32
#include "gutil/port.h"
33
#include "gutil/strings/substitute.h"
34
#include "util/debug/sanitizer_scopes.h"
35
#include "util/doris_metrics.h"
36
#include "util/metrics.h"
37
#include "util/scoped_cleanup.h"
38
#include "util/stopwatch.hpp"
39
#include "util/thread.h"
40
41
namespace doris {
42
// The name of these varialbs will be useds as metric name in prometheus.
43
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
45
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
46
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
47
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
48
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times,
49
                                   MetricUnit::NANOSECONDS);
50
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times,
51
                                   MetricUnit::NANOSECONDS);
52
using namespace ErrorCode;
53
54
using std::string;
55
using strings::Substitute;
56
57
class FunctionRunnable : public Runnable {
58
public:
59
7.37k
    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
60
61
4.27k
    void run() override { _func(); }
62
63
private:
64
    std::function<void()> _func;
65
};
66
67
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
68
        : _name(std::move(name)),
69
          _workload_group(std::move(workload_group)),
70
          _min_threads(0),
71
          _max_threads(std::thread::hardware_concurrency()),
72
          _max_queue_size(std::numeric_limits<int>::max()),
73
245
          _idle_timeout(std::chrono::milliseconds(500)) {}
74
75
209
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
76
209
    CHECK_GE(min_threads, 0);
77
209
    _min_threads = min_threads;
78
209
    return *this;
79
209
}
80
81
217
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
82
217
    CHECK_GT(max_threads, 0);
83
217
    _max_threads = max_threads;
84
217
    return *this;
85
217
}
86
87
17
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
88
17
    _max_queue_size = max_queue_size;
89
17
    return *this;
90
17
}
91
92
0
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) {
93
0
    _cgroup_cpu_ctl = cgroup_cpu_ctl;
94
0
    return *this;
95
0
}
96
97
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
98
                                 int max_concurrency)
99
        : _mode(mode),
100
          _pool(pool),
101
          _state(State::IDLE),
102
          _active_threads(0),
103
          _max_concurrency(max_concurrency),
104
          _num_submitted_tasks(0),
105
2.11k
          _num_unsubmitted_tasks(0) {
106
2.11k
    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
107
1
        _mode = ThreadPool::ExecutionMode::SERIAL;
108
1
    }
109
2.11k
}
110
111
2.11k
ThreadPoolToken::~ThreadPoolToken() {
112
2.11k
    shutdown();
113
2.11k
    _pool->release_token(this);
114
2.11k
}
115
116
5.98k
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
117
5.98k
    return _pool->do_submit(std::move(r), this);
118
5.98k
}
119
120
6.17k
Status ThreadPoolToken::submit_func(std::function<void()> f) {
121
6.17k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
122
6.17k
}
123
124
3.26k
void ThreadPoolToken::shutdown() {
125
3.26k
    std::unique_lock<std::mutex> l(_pool->_lock);
126
3.26k
    _pool->check_not_pool_thread_unlocked();
127
128
    // Clear the queue under the lock, but defer the releasing of the tasks
129
    // outside the lock, in case there are concurrent threads wanting to access
130
    // the ThreadPool. The task's destructors may acquire locks, etc, so this
131
    // also prevents lock inversions.
132
3.26k
    std::deque<ThreadPool::Task> to_release = std::move(_entries);
133
3.26k
    _pool->_total_queued_tasks -= to_release.size();
134
135
3.26k
    switch (state()) {
136
1.04k
    case State::IDLE:
137
        // There were no tasks outstanding; we can quiesce the token immediately.
138
1.04k
        transition(State::QUIESCED);
139
1.04k
        break;
140
779
    case State::RUNNING:
141
        // There were outstanding tasks. If any are still running, switch to
142
        // QUIESCING and wait for them to finish (the worker thread executing
143
        // the token's last task will switch the token to QUIESCED). Otherwise,
144
        // we can quiesce the token immediately.
145
146
        // Note: this is an O(n) operation, but it's expected to be infrequent.
147
        // Plus doing it this way (rather than switching to QUIESCING and waiting
148
        // for a worker thread to process the queue entry) helps retain state
149
        // transition symmetry with ThreadPool::shutdown.
150
5.12k
        for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
151
4.34k
            if (*it == this) {
152
506
                it = _pool->_queue.erase(it);
153
3.83k
            } else {
154
3.83k
                it++;
155
3.83k
            }
156
4.34k
        }
157
158
779
        if (_active_threads == 0) {
159
259
            transition(State::QUIESCED);
160
259
            break;
161
259
        }
162
520
        transition(State::QUIESCING);
163
520
        [[fallthrough]];
164
531
    case State::QUIESCING:
165
        // The token is already quiescing. Just wait for a worker thread to
166
        // switch it to QUIESCED.
167
1.06k
        _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
168
531
        break;
169
1.43k
    default:
170
1.43k
        break;
171
3.26k
    }
172
3.26k
}
173
174
915
void ThreadPoolToken::wait() {
175
915
    std::unique_lock<std::mutex> l(_pool->_lock);
176
915
    _pool->check_not_pool_thread_unlocked();
177
1.14k
    _not_running_cond.wait(l, [this]() { return !is_active(); });
178
915
}
179
180
7.02k
void ThreadPoolToken::transition(State new_state) {
181
7.02k
#ifndef NDEBUG
182
7.02k
    CHECK_NE(_state, new_state);
183
184
7.02k
    switch (_state) {
185
3.90k
    case State::IDLE:
186
3.90k
        CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
187
3.90k
        if (new_state == State::RUNNING) {
188
2.59k
            CHECK(!_entries.empty());
189
2.59k
        } else {
190
1.31k
            CHECK(_entries.empty());
191
1.31k
            CHECK_EQ(_active_threads, 0);
192
1.31k
        }
193
3.90k
        break;
194
2.59k
    case State::RUNNING:
195
2.59k
        CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
196
2.59k
              new_state == State::QUIESCED);
197
2.59k
        CHECK(_entries.empty());
198
2.59k
        if (new_state == State::QUIESCING) {
199
529
            CHECK_GT(_active_threads, 0);
200
529
        }
201
2.59k
        break;
202
529
    case State::QUIESCING:
203
529
        CHECK(new_state == State::QUIESCED);
204
529
        CHECK_EQ(_active_threads, 0);
205
529
        break;
206
0
    case State::QUIESCED:
207
0
        CHECK(false); // QUIESCED is a terminal state
208
0
        break;
209
0
    default:
210
0
        LOG(FATAL) << "Unknown token state: " << _state;
211
7.02k
    }
212
7.02k
#endif
213
214
    // Take actions based on the state we're entering.
215
7.02k
    switch (new_state) {
216
1.79k
    case State::IDLE:
217
3.90k
    case State::QUIESCED:
218
3.90k
        _not_running_cond.notify_all();
219
3.90k
        break;
220
3.12k
    default:
221
3.12k
        break;
222
7.02k
    }
223
224
7.02k
    _state = new_state;
225
7.02k
}
226
227
0
const char* ThreadPoolToken::state_to_string(State s) {
228
0
    switch (s) {
229
0
    case State::IDLE:
230
0
        return "IDLE";
231
0
        break;
232
0
    case State::RUNNING:
233
0
        return "RUNNING";
234
0
        break;
235
0
    case State::QUIESCING:
236
0
        return "QUIESCING";
237
0
        break;
238
0
    case State::QUIESCED:
239
0
        return "QUIESCED";
240
0
        break;
241
0
    }
242
0
    return "<cannot reach here>";
243
0
}
244
245
5.42k
bool ThreadPoolToken::need_dispatch() {
246
5.42k
    return _state == ThreadPoolToken::State::IDLE ||
247
5.42k
           (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
248
2.83k
            _num_submitted_tasks < _max_concurrency);
249
5.42k
}
250
251
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
252
        : _name(builder._name),
253
          _workload_group(builder._workload_group),
254
          _min_threads(builder._min_threads),
255
          _max_threads(builder._max_threads),
256
          _max_queue_size(builder._max_queue_size),
257
          _idle_timeout(builder._idle_timeout),
258
          _pool_status(Status::Uninitialized("The pool was not initialized.")),
259
          _num_threads(0),
260
          _num_threads_pending_start(0),
261
          _active_threads(0),
262
          _total_queued_tasks(0),
263
          _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
264
          _tokenless(new_token(ExecutionMode::CONCURRENT)),
265
240
          _id(UniqueId::gen_uid()) {}
266
267
240
ThreadPool::~ThreadPool() {
268
    // There should only be one live token: the one used in tokenless submission.
269
240
    CHECK_EQ(1, _tokens.size()) << strings::Substitute(
270
0
            "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
271
240
    shutdown();
272
240
}
273
274
240
Status ThreadPool::init() {
275
240
    if (!_pool_status.is<UNINITIALIZED>()) {
276
0
        return Status::NotSupported("The thread pool {} is already initialized", _name);
277
0
    }
278
240
    _pool_status = Status::OK();
279
240
    _num_threads_pending_start = _min_threads;
280
1.03k
    for (int i = 0; i < _min_threads; i++) {
281
795
        Status status = create_thread();
282
795
        if (!status.ok()) {
283
0
            shutdown();
284
0
            return status;
285
0
        }
286
795
    }
287
    // _id of thread pool is used to make sure when we create thread pool with same name, we can
288
    // get different _metric_entity
289
    // If not, we will have problem when we deregister entity and register hook.
290
240
    _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
291
240
            fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
292
240
                                                   {"workload_group", _workload_group},
293
240
                                                   {"id", _id.to_string()}});
294
295
240
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
296
240
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
297
240
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
298
240
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
299
240
    INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times);
300
240
    INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times);
301
240
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
302
303
240
    _metric_entity->register_hook("update", [this]() {
304
0
        {
305
0
            std::lock_guard<std::mutex> l(_lock);
306
0
            if (!_pool_status.ok()) {
307
0
                return;
308
0
            }
309
0
        }
310
311
0
        thread_pool_active_threads->set_value(num_active_threads());
312
0
        thread_pool_queue_size->set_value(get_queue_size());
313
0
        thread_pool_max_queue_size->set_value(get_max_queue_size());
314
0
        thread_pool_max_threads->set_value(max_threads());
315
0
        task_execution_time_ns_avg_in_last_1000_times->set_value(
316
0
                _task_execution_time_ns_statistic.mean());
317
0
        task_wait_worker_ns_avg_in_last_1000_times->set_value(
318
0
                _task_wait_worker_time_ns_statistic.mean());
319
0
    });
320
240
    return Status::OK();
321
240
}
322
323
394
void ThreadPool::shutdown() {
324
    // Why access to doris_metrics is safe here?
325
    // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
326
    // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
327
    // ExecEnv::destroy().
328
394
    DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
329
394
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
330
394
    std::unique_lock<std::mutex> l(_lock);
331
394
    check_not_pool_thread_unlocked();
332
333
    // Note: this is the same error seen at submission if the pool is at
334
    // capacity, so clients can't tell them apart. This isn't really a practical
335
    // concern though because shutting down a pool typically requires clients to
336
    // be quiesced first, so there's no danger of a client getting confused.
337
    // Not print stack trace here
338
394
    _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
339
394
            "The thread pool {} has been shut down.", _name);
340
341
    // Clear the various queues under the lock, but defer the releasing
342
    // of the tasks outside the lock, in case there are concurrent threads
343
    // wanting to access the ThreadPool. The task's destructors may acquire
344
    // locks, etc, so this also prevents lock inversions.
345
394
    _queue.clear();
346
347
394
    std::deque<std::deque<Task>> to_release;
348
474
    for (auto* t : _tokens) {
349
474
        if (!t->_entries.empty()) {
350
13
            to_release.emplace_back(std::move(t->_entries));
351
13
        }
352
474
        switch (t->state()) {
353
272
        case ThreadPoolToken::State::IDLE:
354
            // The token is idle; we can quiesce it immediately.
355
272
            t->transition(ThreadPoolToken::State::QUIESCED);
356
272
            break;
357
19
        case ThreadPoolToken::State::RUNNING:
358
            // The token has tasks associated with it. If they're merely queued
359
            // (i.e. there are no active threads), the tasks will have been removed
360
            // above and we can quiesce immediately. Otherwise, we need to wait for
361
            // the threads to finish.
362
19
            t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
363
19
                                                 : ThreadPoolToken::State::QUIESCED);
364
19
            break;
365
183
        default:
366
183
            break;
367
474
        }
368
474
    }
369
370
    // The queues are empty. Wake any sleeping worker threads and wait for all
371
    // of them to exit. Some worker threads will exit immediately upon waking,
372
    // while others will exit after they finish executing an outstanding task.
373
394
    _total_queued_tasks = 0;
374
1.28k
    while (!_idle_threads.empty()) {
375
887
        _idle_threads.front().not_empty.notify_one();
376
887
        _idle_threads.pop_front();
377
887
    }
378
379
612
    _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
380
381
    // All the threads have exited. Check the state of each token.
382
474
    for (auto* t : _tokens) {
383
474
        DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
384
474
               t->state() == ThreadPoolToken::State::QUIESCED);
385
474
    }
386
394
}
387
388
2.11k
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
389
2.11k
    std::lock_guard<std::mutex> l(_lock);
390
2.11k
    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
391
2.11k
    InsertOrDie(&_tokens, t.get());
392
2.11k
    return t;
393
2.11k
}
394
395
2.11k
void ThreadPool::release_token(ThreadPoolToken* t) {
396
2.11k
    std::lock_guard<std::mutex> l(_lock);
397
2.11k
    CHECK(!t->is_active()) << strings::Substitute("Token with state $0 may not be released",
398
0
                                                  ThreadPoolToken::state_to_string(t->state()));
399
2.11k
    CHECK_EQ(1, _tokens.erase(t));
400
2.11k
}
401
402
1.46k
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
403
1.46k
    return do_submit(std::move(r), _tokenless.get());
404
1.46k
}
405
406
1.30k
Status ThreadPool::submit_func(std::function<void()> f) {
407
1.30k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
408
1.30k
}
409
410
7.66k
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
411
7.66k
    DCHECK(token);
412
413
7.66k
    std::unique_lock<std::mutex> l(_lock);
414
7.66k
    if (PREDICT_FALSE(!_pool_status.ok())) {
415
1
        return _pool_status;
416
1
    }
417
418
7.66k
    if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
419
2.28k
        return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
420
2.28k
    }
421
422
    // Size limit check.
423
5.37k
    int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
424
5.37k
                                 static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
425
5.37k
    if (capacity_remaining < 1) {
426
4
        thread_pool_submit_failed->increment(1);
427
4
        return Status::Error<SERVICE_UNAVAILABLE>(
428
4
                "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
429
4
                _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
430
4
                _max_queue_size);
431
4
    }
432
433
    // Should we create another thread?
434
435
    // We assume that each current inactive thread will grab one item from the
436
    // queue.  If it seems like we'll need another thread, we create one.
437
    //
438
    // Rather than creating the thread here, while holding the lock, we defer
439
    // it to down below. This is because thread creation can be rather slow
440
    // (hundreds of milliseconds in some cases) and we'd like to allow the
441
    // existing threads to continue to process tasks while we do so.
442
    //
443
    // In theory, a currently active thread could finish immediately after this
444
    // calculation but before our new worker starts running. This would mean we
445
    // created a thread we didn't really need. However, this race is unavoidable
446
    // and harmless.
447
    //
448
    // Of course, we never create more than _max_threads threads no matter what.
449
5.37k
    int threads_from_this_submit =
450
5.37k
            token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
451
5.37k
    int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
452
5.37k
    int additional_threads =
453
5.37k
            static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
454
5.37k
    bool need_a_thread = false;
455
5.37k
    if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
456
623
        need_a_thread = true;
457
623
        _num_threads_pending_start++;
458
623
    }
459
460
5.37k
    Task task;
461
5.37k
    task.runnable = std::move(r);
462
5.37k
    task.submit_time_wather.start();
463
464
    // Add the task to the token's queue.
465
5.37k
    ThreadPoolToken::State state = token->state();
466
5.37k
    DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
467
5.37k
    token->_entries.emplace_back(std::move(task));
468
    // When we need to execute the task in the token, we submit the token object to the queue.
469
    // There are currently two places where tokens will be submitted to the queue:
470
    // 1. When submitting a new task, if the token is still in the IDLE state,
471
    //    or the concurrency of the token has not reached the online level, it will be added to the queue.
472
    // 2. When the dispatch thread finishes executing a task:
473
    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
474
    //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
475
    //       then submitted to the queue.
476
5.37k
    if (token->need_dispatch()) {
477
4.38k
        _queue.emplace_back(token);
478
4.38k
        ++token->_num_submitted_tasks;
479
4.38k
        if (state == ThreadPoolToken::State::IDLE) {
480
2.59k
            token->transition(ThreadPoolToken::State::RUNNING);
481
2.59k
        }
482
4.38k
    } else {
483
990
        ++token->_num_unsubmitted_tasks;
484
990
    }
485
5.37k
    _total_queued_tasks++;
486
487
    // Wake up an idle thread for this task. Choosing the thread at the front of
488
    // the list ensures LIFO semantics as idling threads are also added to the front.
489
    //
490
    // If there are no idle threads, the new task remains on the queue and is
491
    // processed by an active thread (or a thread we're about to create) at some
492
    // point in the future.
493
5.37k
    if (!_idle_threads.empty()) {
494
1.84k
        _idle_threads.front().not_empty.notify_one();
495
1.84k
        _idle_threads.pop_front();
496
1.84k
    }
497
5.37k
    l.unlock();
498
499
5.37k
    if (need_a_thread) {
500
623
        Status status = create_thread();
501
623
        if (!status.ok()) {
502
0
            l.lock();
503
0
            _num_threads_pending_start--;
504
0
            if (_num_threads + _num_threads_pending_start == 0) {
505
                // If we have no threads, we can't do any work.
506
0
                return status;
507
0
            }
508
            // If we failed to create a thread, but there are still some other
509
            // worker threads, log a warning message and continue.
510
0
            LOG(WARNING) << "Thread pool " << _name
511
0
                         << " failed to create thread: " << status.to_string();
512
0
        }
513
623
    }
514
515
5.37k
    return Status::OK();
516
5.37k
}
517
518
61
void ThreadPool::wait() {
519
61
    std::unique_lock<std::mutex> l(_lock);
520
61
    check_not_pool_thread_unlocked();
521
127
    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
522
61
}
523
524
1.42k
void ThreadPool::dispatch_thread() {
525
1.42k
    std::unique_lock<std::mutex> l(_lock);
526
1.42k
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
527
1.42k
    InsertOrDie(&_threads, Thread::current_thread());
528
1.42k
    DCHECK_GT(_num_threads_pending_start, 0);
529
1.42k
    _num_threads++;
530
1.42k
    _num_threads_pending_start--;
531
532
1.42k
    if (_cgroup_cpu_ctl != nullptr) {
533
0
        static_cast<void>(_cgroup_cpu_ctl->add_thread_to_cgroup());
534
0
    }
535
536
    // Owned by this worker thread and added/removed from _idle_threads as needed.
537
1.42k
    IdleThread me;
538
539
55.0k
    while (true) {
540
        // Note: Status::Aborted() is used to indicate normal shutdown.
541
55.0k
        if (!_pool_status.ok()) {
542
914
            VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
543
914
            break;
544
914
        }
545
546
54.1k
        if (_num_threads + _num_threads_pending_start > _max_threads) {
547
2
            break;
548
2
        }
549
550
54.1k
        if (_queue.empty()) {
551
            // There's no work to do, let's go idle.
552
            //
553
            // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
554
49.7k
            _idle_threads.push_front(me);
555
49.7k
            SCOPED_CLEANUP({
556
                // For some wake ups (i.e. shutdown or do_submit) this thread is
557
                // guaranteed to be unlinked after being awakened. In others (i.e.
558
                // spurious wake-up or Wait timeout), it'll still be linked.
559
49.7k
                if (me.is_linked()) {
560
49.7k
                    _idle_threads.erase(_idle_threads.iterator_to(me));
561
49.7k
                }
562
49.7k
            });
563
49.7k
            if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
564
                // After much investigation, it appears that pthread condition variables have
565
                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
566
                // another thread did in fact signal. Apparently after a timeout there is some
567
                // brief period during which another thread may actually grab the internal mutex
568
                // protecting the state, signal, and release again before we get the mutex. So,
569
                // we'll recheck the empty queue case regardless.
570
47.0k
                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
571
505
                    VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
572
0
                                << std::chrono::duration_cast<std::chrono::milliseconds>(
573
0
                                           _idle_timeout)
574
0
                                           .count()
575
0
                                << "ms of idle time.";
576
505
                    break;
577
505
                }
578
47.0k
            }
579
49.2k
            continue;
580
49.7k
        }
581
582
4.45k
        MonotonicStopWatch task_execution_time_watch;
583
4.45k
        task_execution_time_watch.start();
584
        // Get the next token and task to execute.
585
4.45k
        ThreadPoolToken* token = _queue.front();
586
4.45k
        _queue.pop_front();
587
4.45k
        DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
588
4.45k
        DCHECK(!token->_entries.empty());
589
4.45k
        Task task = std::move(token->_entries.front());
590
4.45k
        _task_wait_worker_time_ns_statistic.add(task.submit_time_wather.elapsed_time());
591
4.45k
        token->_entries.pop_front();
592
4.45k
        token->_active_threads++;
593
4.45k
        --_total_queued_tasks;
594
4.45k
        ++_active_threads;
595
596
4.45k
        l.unlock();
597
598
        // Execute the task
599
4.45k
        task.runnable->run();
600
        // Destruct the task while we do not hold the lock.
601
        //
602
        // The task's destructor may be expensive if it has a lot of bound
603
        // objects, and we don't want to block submission of the threadpool.
604
        // In the worst case, the destructor might even try to do something
605
        // with this threadpool, and produce a deadlock.
606
4.45k
        task.runnable.reset();
607
4.45k
        l.lock();
608
4.45k
        _task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time());
609
        // Possible states:
610
        // 1. The token was shut down while we ran its task. Transition to QUIESCED.
611
        // 2. The token has no more queued tasks. Transition back to IDLE.
612
        // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
613
4.45k
        ThreadPoolToken::State state = token->state();
614
4.45k
        DCHECK(state == ThreadPoolToken::State::RUNNING ||
615
4.45k
               state == ThreadPoolToken::State::QUIESCING);
616
4.45k
        --token->_active_threads;
617
4.45k
        --token->_num_submitted_tasks;
618
619
        // handle shutdown && idle
620
4.45k
        if (token->_active_threads == 0) {
621
3.08k
            if (state == ThreadPoolToken::State::QUIESCING) {
622
529
                DCHECK(token->_entries.empty());
623
529
                token->transition(ThreadPoolToken::State::QUIESCED);
624
2.55k
            } else if (token->_entries.empty()) {
625
1.79k
                token->transition(ThreadPoolToken::State::IDLE);
626
1.79k
            }
627
3.08k
        }
628
629
        // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
630
4.45k
        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
631
632
        // If token->state is running and there are unsubmitted tasks in the token, we put
633
        // the token back.
634
4.45k
        if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
635
            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
636
            // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
637
            // threads are running for the token.
638
616
            _queue.emplace_back(token);
639
616
            ++token->_num_submitted_tasks;
640
616
            --token->_num_unsubmitted_tasks;
641
616
        }
642
643
4.45k
        if (--_active_threads == 0) {
644
817
            _idle_cond.notify_all();
645
817
        }
646
4.45k
    }
647
648
    // It's important that we hold the lock between exiting the loop and dropping
649
    // _num_threads. Otherwise it's possible someone else could come along here
650
    // and add a new task just as the last running thread is about to exit.
651
1.42k
    CHECK(l.owns_lock());
652
653
1.42k
    CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
654
1.42k
    _num_threads--;
655
1.42k
    if (_num_threads + _num_threads_pending_start == 0) {
656
716
        _no_threads_cond.notify_all();
657
658
        // Sanity check: if we're the last thread exiting, the queue ought to be
659
        // empty. Otherwise it will never get processed.
660
716
        CHECK(_queue.empty());
661
716
        DCHECK_EQ(0, _total_queued_tasks);
662
716
    }
663
1.42k
}
664
665
1.42k
Status ThreadPool::create_thread() {
666
1.42k
    return Thread::create("thread pool", strings::Substitute("$0 [worker]", _name),
667
1.42k
                          &ThreadPool::dispatch_thread, this, nullptr);
668
1.42k
}
669
670
4.63k
void ThreadPool::check_not_pool_thread_unlocked() {
671
4.63k
    Thread* current = Thread::current_thread();
672
4.63k
    if (ContainsKey(_threads, current)) {
673
0
        LOG(FATAL) << strings::Substitute(
674
0
                "Thread belonging to thread pool '$0' with "
675
0
                "name '$1' called pool function that would result in deadlock",
676
0
                _name, current->name());
677
0
    }
678
4.63k
}
679
680
4
Status ThreadPool::set_min_threads(int min_threads) {
681
4
    std::lock_guard<std::mutex> l(_lock);
682
4
    if (min_threads > _max_threads) {
683
        // min threads can not be set greater than max threads
684
1
        return Status::InternalError("set thread pool {} min_threads failed", _name);
685
1
    }
686
3
    _min_threads = min_threads;
687
3
    if (min_threads > _num_threads + _num_threads_pending_start) {
688
0
        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
689
0
        _num_threads_pending_start += addition_threads;
690
0
        for (int i = 0; i < addition_threads; i++) {
691
0
            Status status = create_thread();
692
0
            if (!status.ok()) {
693
0
                _num_threads_pending_start--;
694
0
                LOG(WARNING) << "Thread pool " << _name
695
0
                             << " failed to create thread: " << status.to_string();
696
0
                return status;
697
0
            }
698
0
        }
699
0
    }
700
3
    return Status::OK();
701
3
}
702
703
7
Status ThreadPool::set_max_threads(int max_threads) {
704
7
    std::lock_guard<std::mutex> l(_lock);
705
7
    if (_min_threads > max_threads) {
706
        // max threads can not be set less than min threads
707
1
        return Status::InternalError("set thread pool {} max_threads failed", _name);
708
1
    }
709
710
6
    _max_threads = max_threads;
711
6
    if (_max_threads > _num_threads + _num_threads_pending_start) {
712
4
        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
713
4
        addition_threads = std::min(addition_threads, _total_queued_tasks);
714
4
        _num_threads_pending_start += addition_threads;
715
7
        for (int i = 0; i < addition_threads; i++) {
716
3
            Status status = create_thread();
717
3
            if (!status.ok()) {
718
0
                _num_threads_pending_start--;
719
0
                LOG(WARNING) << "Thread pool " << _name
720
0
                             << " failed to create thread: " << status.to_string();
721
0
                return status;
722
0
            }
723
3
        }
724
4
    }
725
6
    return Status::OK();
726
6
}
727
728
0
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
729
0
    return o << ThreadPoolToken::state_to_string(s);
730
0
}
731
732
} // namespace doris