Coverage Report

Created: 2025-05-23 06:40

/root/doris/be/src/util/threadpool.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
19
// and modified by Doris
20
21
#include "util/threadpool.h"
22
23
#include <algorithm>
24
#include <cstdint>
25
#include <limits>
26
#include <ostream>
27
#include <thread>
28
#include <utility>
29
30
#include "absl/strings/substitute.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "util/debug_points.h"
34
#include "util/doris_metrics.h"
35
#include "util/metrics.h"
36
#include "util/scoped_cleanup.h"
37
#include "util/stopwatch.hpp"
38
#include "util/thread.h"
39
40
namespace doris {
41
// The name of these varialbs will be useds as metric name in prometheus.
42
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
43
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
45
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
46
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
47
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total,
48
                                     MetricUnit::NANOSECONDS);
49
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT);
50
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total,
51
                                     MetricUnit::NANOSECONDS);
52
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT);
53
using namespace ErrorCode;
54
55
using std::string;
56
57
class FunctionRunnable : public Runnable {
58
public:
59
5.07M
    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
60
61
5.07M
    void run() override { _func(); }
62
63
private:
64
    std::function<void()> _func;
65
};
66
67
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
68
        : _name(std::move(name)),
69
          _workload_group(std::move(workload_group)),
70
          _min_threads(0),
71
          _max_threads(std::thread::hardware_concurrency()),
72
          _max_queue_size(std::numeric_limits<int>::max()),
73
585
          _idle_timeout(std::chrono::milliseconds(500)) {}
74
75
546
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
76
546
    CHECK_GE(min_threads, 0);
77
546
    _min_threads = min_threads;
78
546
    return *this;
79
546
}
80
81
557
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
82
557
    CHECK_GT(max_threads, 0);
83
557
    _max_threads = max_threads;
84
557
    return *this;
85
557
}
86
87
193
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
88
193
    _max_queue_size = max_queue_size;
89
193
    return *this;
90
193
}
91
92
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
93
82
        std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
94
82
    _cgroup_cpu_ctl = cgroup_cpu_ctl;
95
82
    return *this;
96
82
}
97
98
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
99
                                 int max_concurrency)
100
        : _mode(mode),
101
          _pool(pool),
102
          _state(State::IDLE),
103
          _active_threads(0),
104
          _max_concurrency(max_concurrency),
105
          _num_submitted_tasks(0),
106
814k
          _num_unsubmitted_tasks(0) {
107
814k
    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
108
66.0k
        _mode = ThreadPool::ExecutionMode::SERIAL;
109
66.0k
    }
110
814k
}
111
112
813k
ThreadPoolToken::~ThreadPoolToken() {
113
813k
    shutdown();
114
813k
    _pool->release_token(this);
115
813k
}
116
117
2.73M
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
118
2.73M
    return _pool->do_submit(std::move(r), this);
119
2.73M
}
120
121
2.74M
Status ThreadPoolToken::submit_func(std::function<void()> f) {
122
2.74M
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
123
2.74M
}
124
125
1.13M
void ThreadPoolToken::shutdown() {
126
1.13M
    std::unique_lock<std::mutex> l(_pool->_lock);
127
1.13M
    _pool->check_not_pool_thread_unlocked();
128
129
    // Clear the queue under the lock, but defer the releasing of the tasks
130
    // outside the lock, in case there are concurrent threads wanting to access
131
    // the ThreadPool. The task's destructors may acquire locks, etc, so this
132
    // also prevents lock inversions.
133
1.13M
    std::deque<ThreadPool::Task> to_release = std::move(_entries);
134
1.13M
    _pool->_total_queued_tasks -= to_release.size();
135
136
1.13M
    switch (state()) {
137
810k
    case State::IDLE:
138
        // There were no tasks outstanding; we can quiesce the token immediately.
139
810k
        transition(State::QUIESCED);
140
810k
        break;
141
2.46k
    case State::RUNNING:
142
        // There were outstanding tasks. If any are still running, switch to
143
        // QUIESCING and wait for them to finish (the worker thread executing
144
        // the token's last task will switch the token to QUIESCED). Otherwise,
145
        // we can quiesce the token immediately.
146
147
        // Note: this is an O(n) operation, but it's expected to be infrequent.
148
        // Plus doing it this way (rather than switching to QUIESCING and waiting
149
        // for a worker thread to process the queue entry) helps retain state
150
        // transition symmetry with ThreadPool::shutdown.
151
14.4k
        for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
152
11.9k
            if (*it == this) {
153
1.14k
                it = _pool->_queue.erase(it);
154
10.8k
            } else {
155
10.8k
                it++;
156
10.8k
            }
157
11.9k
        }
158
159
2.46k
        if (_active_threads == 0) {
160
530
            transition(State::QUIESCED);
161
530
            break;
162
530
        }
163
1.93k
        transition(State::QUIESCING);
164
1.93k
        [[fallthrough]];
165
1.94k
    case State::QUIESCING:
166
        // The token is already quiescing. Just wait for a worker thread to
167
        // switch it to QUIESCED.
168
3.89k
        _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
169
1.94k
        break;
170
326k
    default:
171
326k
        break;
172
1.13M
    }
173
1.13M
}
174
175
558k
void ThreadPoolToken::wait() {
176
558k
    std::unique_lock<std::mutex> l(_pool->_lock);
177
558k
    _pool->check_not_pool_thread_unlocked();
178
608k
    _not_running_cond.wait(l, [this]() { return !is_active(); });
179
558k
}
180
181
3.42M
void ThreadPoolToken::transition(State new_state) {
182
3.42M
#ifndef NDEBUG
183
3.42M
    CHECK_NE(_state, new_state);
184
185
3.42M
    switch (_state) {
186
2.11M
    case State::IDLE:
187
2.11M
        CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
188
2.11M
        if (new_state == State::RUNNING) {
189
1.30M
            CHECK(!_entries.empty());
190
1.30M
        } else {
191
811k
            CHECK(_entries.empty());
192
811k
            CHECK_EQ(_active_threads, 0);
193
811k
        }
194
2.11M
        break;
195
1.30M
    case State::RUNNING:
196
1.30M
        CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
197
1.30M
              new_state == State::QUIESCED);
198
1.30M
        CHECK(_entries.empty());
199
1.30M
        if (new_state == State::QUIESCING) {
200
1.94k
            CHECK_GT(_active_threads, 0);
201
1.94k
        }
202
1.30M
        break;
203
1.94k
    case State::QUIESCING:
204
1.94k
        CHECK(new_state == State::QUIESCED);
205
1.94k
        CHECK_EQ(_active_threads, 0);
206
1.94k
        break;
207
0
    case State::QUIESCED:
208
0
        CHECK(false); // QUIESCED is a terminal state
209
0
        break;
210
0
    default:
211
0
        throw Exception(Status::FatalError("Unknown token state: {}", _state));
212
3.42M
    }
213
3.42M
#endif
214
215
    // Take actions based on the state we're entering.
216
3.42M
    switch (new_state) {
217
1.30M
    case State::IDLE:
218
2.11M
    case State::QUIESCED:
219
2.11M
        _not_running_cond.notify_all();
220
2.11M
        break;
221
1.31M
    default:
222
1.31M
        break;
223
3.42M
    }
224
225
3.42M
    _state = new_state;
226
3.42M
}
227
228
0
const char* ThreadPoolToken::state_to_string(State s) {
229
0
    switch (s) {
230
0
    case State::IDLE:
231
0
        return "IDLE";
232
0
        break;
233
0
    case State::RUNNING:
234
0
        return "RUNNING";
235
0
        break;
236
0
    case State::QUIESCING:
237
0
        return "QUIESCING";
238
0
        break;
239
0
    case State::QUIESCED:
240
0
        return "QUIESCED";
241
0
        break;
242
0
    }
243
0
    return "<cannot reach here>";
244
0
}
245
246
5.22M
bool ThreadPoolToken::need_dispatch() {
247
5.22M
    return _state == ThreadPoolToken::State::IDLE ||
248
5.22M
           (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
249
3.91M
            _num_submitted_tasks < _max_concurrency);
250
5.22M
}
251
252
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
253
        : _name(builder._name),
254
          _workload_group(builder._workload_group),
255
          _min_threads(builder._min_threads),
256
          _max_threads(builder._max_threads),
257
          _max_queue_size(builder._max_queue_size),
258
          _idle_timeout(builder._idle_timeout),
259
          _pool_status(Status::Uninitialized("The pool was not initialized.")),
260
          _num_threads(0),
261
          _num_threads_pending_start(0),
262
          _active_threads(0),
263
          _total_queued_tasks(0),
264
          _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
265
          _tokenless(new_token(ExecutionMode::CONCURRENT)),
266
562
          _id(UniqueId::gen_uid()) {}
267
268
397
ThreadPool::~ThreadPool() {
269
    // There should only be one live token: the one used in tokenless submission.
270
397
    CHECK_EQ(1, _tokens.size()) << absl::Substitute(
271
0
            "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
272
397
    shutdown();
273
397
}
274
275
569
Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) {
276
5.42k
    for (int i = 0; i < thread_num; i++) {
277
4.85k
        Status status = create_thread();
278
4.85k
        if (status.ok()) {
279
4.85k
            _num_threads_pending_start++;
280
4.85k
        } else {
281
0
            LOG(WARNING) << "Thread pool " << _name << " failed to create thread: " << status;
282
0
            return status;
283
0
        }
284
4.85k
    }
285
569
    return Status::OK();
286
569
}
287
288
562
Status ThreadPool::init() {
289
562
    if (!_pool_status.is<UNINITIALIZED>()) {
290
0
        return Status::NotSupported("The thread pool {} is already initialized", _name);
291
0
    }
292
562
    _pool_status = Status::OK();
293
294
562
    {
295
562
        std::lock_guard<std::mutex> l(_lock);
296
        // create thread failed should not cause threadpool init failed,
297
        // because thread can be created later such as when submit a task.
298
562
        static_cast<void>(try_create_thread(_min_threads, l));
299
562
    }
300
301
    // _id of thread pool is used to make sure when we create thread pool with same name, we can
302
    // get different _metric_entity
303
    // If not, we will have problem when we deregister entity and register hook.
304
562
    _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
305
562
            fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
306
562
                                                   {"workload_group", _workload_group},
307
562
                                                   {"id", _id.to_string()}});
308
309
562
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
310
562
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
311
562
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
312
562
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
313
562
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total);
314
562
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total);
315
562
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total);
316
562
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total);
317
562
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
318
319
29.8k
    _metric_entity->register_hook("update", [this]() {
320
29.8k
        {
321
29.8k
            std::lock_guard<std::mutex> l(_lock);
322
29.8k
            if (!_pool_status.ok()) {
323
0
                return;
324
0
            }
325
29.8k
        }
326
327
29.8k
        thread_pool_active_threads->set_value(num_active_threads());
328
29.8k
        thread_pool_queue_size->set_value(get_queue_size());
329
29.8k
        thread_pool_max_queue_size->set_value(get_max_queue_size());
330
29.8k
        thread_pool_max_threads->set_value(max_threads());
331
29.8k
    });
332
562
    return Status::OK();
333
562
}
334
335
715
void ThreadPool::shutdown() {
336
    // Why access to doris_metrics is safe here?
337
    // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
338
    // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
339
    // ExecEnv::destroy().
340
715
    DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
341
715
    std::unique_lock<std::mutex> l(_lock);
342
715
    check_not_pool_thread_unlocked();
343
344
    // Note: this is the same error seen at submission if the pool is at
345
    // capacity, so clients can't tell them apart. This isn't really a practical
346
    // concern though because shutting down a pool typically requires clients to
347
    // be quiesced first, so there's no danger of a client getting confused.
348
    // Not print stack trace here
349
715
    _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
350
715
            "The thread pool {} has been shut down.", _name);
351
352
    // Clear the various queues under the lock, but defer the releasing
353
    // of the tasks outside the lock, in case there are concurrent threads
354
    // wanting to access the ThreadPool. The task's destructors may acquire
355
    // locks, etc, so this also prevents lock inversions.
356
715
    _queue.clear();
357
358
715
    std::deque<std::deque<Task>> to_release;
359
717
    for (auto* t : _tokens) {
360
717
        if (!t->_entries.empty()) {
361
3
            to_release.emplace_back(std::move(t->_entries));
362
3
        }
363
717
        switch (t->state()) {
364
385
        case ThreadPoolToken::State::IDLE:
365
            // The token is idle; we can quiesce it immediately.
366
385
            t->transition(ThreadPoolToken::State::QUIESCED);
367
385
            break;
368
14
        case ThreadPoolToken::State::RUNNING:
369
            // The token has tasks associated with it. If they're merely queued
370
            // (i.e. there are no active threads), the tasks will have been removed
371
            // above and we can quiesce immediately. Otherwise, we need to wait for
372
            // the threads to finish.
373
14
            t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
374
14
                                                 : ThreadPoolToken::State::QUIESCED);
375
14
            break;
376
318
        default:
377
318
            break;
378
717
        }
379
717
    }
380
381
    // The queues are empty. Wake any sleeping worker threads and wait for all
382
    // of them to exit. Some worker threads will exit immediately upon waking,
383
    // while others will exit after they finish executing an outstanding task.
384
715
    _total_queued_tasks = 0;
385
3.04k
    while (!_idle_threads.empty()) {
386
2.32k
        _idle_threads.front().not_empty.notify_one();
387
2.32k
        _idle_threads.pop_front();
388
2.32k
    }
389
390
1.08k
    _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
391
392
    // All the threads have exited. Check the state of each token.
393
717
    for (auto* t : _tokens) {
394
717
        DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
395
717
               t->state() == ThreadPoolToken::State::QUIESCED);
396
717
    }
397
715
}
398
399
813k
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
400
813k
    std::lock_guard<std::mutex> l(_lock);
401
813k
    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
402
813k
    if (!_tokens.insert(t.get()).second) {
403
0
        throw Exception(Status::InternalError("duplicate token"));
404
0
    }
405
813k
    return t;
406
813k
}
407
408
813k
void ThreadPool::release_token(ThreadPoolToken* t) {
409
813k
    std::lock_guard<std::mutex> l(_lock);
410
18.4E
    CHECK(!t->is_active()) << absl::Substitute("Token with state $0 may not be released",
411
18.4E
                                               ThreadPoolToken::state_to_string(t->state()));
412
813k
    CHECK_EQ(1, _tokens.erase(t));
413
813k
}
414
415
2.48M
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
416
2.48M
    return do_submit(std::move(r), _tokenless.get());
417
2.48M
}
418
419
2.33M
Status ThreadPool::submit_func(std::function<void()> f) {
420
2.33M
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
421
2.33M
}
422
423
5.22M
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
424
5.22M
    DCHECK(token);
425
426
5.22M
    std::unique_lock<std::mutex> l(_lock);
427
5.22M
    if (PREDICT_FALSE(!_pool_status.ok())) {
428
1
        return _pool_status;
429
1
    }
430
431
5.22M
    if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
432
3.29k
        return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
433
3.29k
    }
434
435
    // Size limit check.
436
5.22M
    int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
437
5.22M
                                 static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
438
5.22M
    if (capacity_remaining < 1) {
439
4
        thread_pool_submit_failed->increment(1);
440
4
        return Status::Error<SERVICE_UNAVAILABLE>(
441
4
                "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
442
4
                _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
443
4
                _max_queue_size);
444
4
    }
445
446
    // Should we create another thread?
447
448
    // We assume that each current inactive thread will grab one item from the
449
    // queue.  If it seems like we'll need another thread, we create one.
450
    //
451
    // Rather than creating the thread here, while holding the lock, we defer
452
    // it to down below. This is because thread creation can be rather slow
453
    // (hundreds of milliseconds in some cases) and we'd like to allow the
454
    // existing threads to continue to process tasks while we do so.
455
    //
456
    // In theory, a currently active thread could finish immediately after this
457
    // calculation but before our new worker starts running. This would mean we
458
    // created a thread we didn't really need. However, this race is unavoidable
459
    // and harmless.
460
    //
461
    // Of course, we never create more than _max_threads threads no matter what.
462
5.22M
    int threads_from_this_submit =
463
5.22M
            token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
464
5.22M
    int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
465
5.22M
    int additional_threads =
466
5.22M
            static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
467
5.22M
    bool need_a_thread = false;
468
5.22M
    if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
469
27.5k
        need_a_thread = true;
470
27.5k
        _num_threads_pending_start++;
471
27.5k
    }
472
473
5.22M
    Task task;
474
5.22M
    task.runnable = std::move(r);
475
5.22M
    task.submit_time_wather.start();
476
477
    // Add the task to the token's queue.
478
5.22M
    ThreadPoolToken::State state = token->state();
479
5.22M
    DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
480
5.22M
    token->_entries.emplace_back(std::move(task));
481
    // When we need to execute the task in the token, we submit the token object to the queue.
482
    // There are currently two places where tokens will be submitted to the queue:
483
    // 1. When submitting a new task, if the token is still in the IDLE state,
484
    //    or the concurrency of the token has not reached the online level, it will be added to the queue.
485
    // 2. When the dispatch thread finishes executing a task:
486
    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
487
    //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
488
    //       then submitted to the queue.
489
5.22M
    if (token->need_dispatch()) {
490
3.69M
        _queue.emplace_back(token);
491
3.69M
        ++token->_num_submitted_tasks;
492
3.69M
        if (state == ThreadPoolToken::State::IDLE) {
493
1.30M
            token->transition(ThreadPoolToken::State::RUNNING);
494
1.30M
        }
495
3.69M
    } else {
496
1.52M
        ++token->_num_unsubmitted_tasks;
497
1.52M
    }
498
5.22M
    _total_queued_tasks++;
499
500
    // Wake up an idle thread for this task. Choosing the thread at the front of
501
    // the list ensures LIFO semantics as idling threads are also added to the front.
502
    //
503
    // If there are no idle threads, the new task remains on the queue and is
504
    // processed by an active thread (or a thread we're about to create) at some
505
    // point in the future.
506
5.22M
    if (!_idle_threads.empty()) {
507
3.84M
        _idle_threads.front().not_empty.notify_one();
508
3.84M
        _idle_threads.pop_front();
509
3.84M
    }
510
5.22M
    l.unlock();
511
512
5.22M
    if (need_a_thread) {
513
27.5k
        Status status = create_thread();
514
27.5k
        if (!status.ok()) {
515
0
            l.lock();
516
0
            _num_threads_pending_start--;
517
0
            if (_num_threads + _num_threads_pending_start == 0) {
518
                // If we have no threads, we can't do any work.
519
0
                return status;
520
0
            }
521
            // If we failed to create a thread, but there are still some other
522
            // worker threads, log a warning message and continue.
523
0
            LOG(WARNING) << "Thread pool " << _name
524
0
                         << " failed to create thread: " << status.to_string();
525
0
        }
526
27.5k
    }
527
528
5.22M
    return Status::OK();
529
5.22M
}
530
531
173
void ThreadPool::wait() {
532
173
    std::unique_lock<std::mutex> l(_lock);
533
173
    check_not_pool_thread_unlocked();
534
295
    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
535
173
}
536
537
32.3k
void ThreadPool::dispatch_thread() {
538
32.3k
    std::unique_lock<std::mutex> l(_lock);
539
32.3k
    if (!_threads.insert(Thread::current_thread()).second) {
540
0
        throw Exception(Status::InternalError("duplicate token"));
541
0
    }
542
32.3k
    DCHECK_GT(_num_threads_pending_start, 0);
543
32.3k
    _num_threads++;
544
32.3k
    _num_threads_pending_start--;
545
546
32.3k
    if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) {
547
0
        static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
548
0
    }
549
550
    // Owned by this worker thread and added/removed from _idle_threads as needed.
551
32.3k
    IdleThread me;
552
553
20.3M
    while (true) {
554
        // Note: Status::Aborted() is used to indicate normal shutdown.
555
20.3M
        if (!_pool_status.ok()) {
556
2.46k
            VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
557
2.46k
            break;
558
2.46k
        }
559
560
20.3M
        if (_num_threads + _num_threads_pending_start > _max_threads) {
561
2
            break;
562
2
        }
563
564
20.3M
        if (_queue.empty()) {
565
            // There's no work to do, let's go idle.
566
            //
567
            // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
568
15.0M
            _idle_threads.push_front(me);
569
15.0M
            SCOPED_CLEANUP({
570
                // For some wake ups (i.e. shutdown or do_submit) this thread is
571
                // guaranteed to be unlinked after being awakened. In others (i.e.
572
                // spurious wake-up or Wait timeout), it'll still be linked.
573
15.0M
                if (me.is_linked()) {
574
15.0M
                    _idle_threads.erase(_idle_threads.iterator_to(me));
575
15.0M
                }
576
15.0M
            });
577
15.0M
            if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
578
                // After much investigation, it appears that pthread condition variables have
579
                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
580
                // another thread did in fact signal. Apparently after a timeout there is some
581
                // brief period during which another thread may actually grab the internal mutex
582
                // protecting the state, signal, and release again before we get the mutex. So,
583
                // we'll recheck the empty queue case regardless.
584
11.2M
                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
585
27.4k
                    VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
586
0
                                << std::chrono::duration_cast<std::chrono::milliseconds>(
587
0
                                           _idle_timeout)
588
0
                                           .count()
589
0
                                << "ms of idle time.";
590
27.4k
                    break;
591
27.4k
                }
592
11.2M
            }
593
15.0M
            continue;
594
15.0M
        }
595
596
5.21M
        MonotonicStopWatch task_execution_time_watch;
597
5.21M
        task_execution_time_watch.start();
598
        // Get the next token and task to execute.
599
5.21M
        ThreadPoolToken* token = _queue.front();
600
5.21M
        _queue.pop_front();
601
5.21M
        DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
602
5.21M
        DCHECK(!token->_entries.empty());
603
5.21M
        Task task = std::move(token->_entries.front());
604
5.21M
        thread_pool_task_wait_worker_time_ns_total->increment(
605
5.21M
                task.submit_time_wather.elapsed_time());
606
5.21M
        thread_pool_task_wait_worker_count_total->increment(1);
607
5.21M
        token->_entries.pop_front();
608
5.21M
        token->_active_threads++;
609
5.21M
        --_total_queued_tasks;
610
5.21M
        ++_active_threads;
611
612
5.21M
        l.unlock();
613
614
        // Execute the task
615
5.21M
        task.runnable->run();
616
        // Destruct the task while we do not hold the lock.
617
        //
618
        // The task's destructor may be expensive if it has a lot of bound
619
        // objects, and we don't want to block submission of the threadpool.
620
        // In the worst case, the destructor might even try to do something
621
        // with this threadpool, and produce a deadlock.
622
5.21M
        task.runnable.reset();
623
5.21M
        l.lock();
624
5.21M
        thread_pool_task_execution_time_ns_total->increment(
625
5.21M
                task_execution_time_watch.elapsed_time());
626
5.21M
        thread_pool_task_execution_count_total->increment(1);
627
        // Possible states:
628
        // 1. The token was shut down while we ran its task. Transition to QUIESCED.
629
        // 2. The token has no more queued tasks. Transition back to IDLE.
630
        // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
631
5.21M
        ThreadPoolToken::State state = token->state();
632
5.21M
        DCHECK(state == ThreadPoolToken::State::RUNNING ||
633
5.21M
               state == ThreadPoolToken::State::QUIESCING);
634
5.21M
        --token->_active_threads;
635
5.21M
        --token->_num_submitted_tasks;
636
637
        // handle shutdown && idle
638
5.21M
        if (token->_active_threads == 0) {
639
2.83M
            if (state == ThreadPoolToken::State::QUIESCING) {
640
1.94k
                DCHECK(token->_entries.empty());
641
1.94k
                token->transition(ThreadPoolToken::State::QUIESCED);
642
2.83M
            } else if (token->_entries.empty()) {
643
1.30M
                token->transition(ThreadPoolToken::State::IDLE);
644
1.30M
            }
645
2.83M
        }
646
647
        // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
648
5.21M
        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
649
650
        // If token->state is running and there are unsubmitted tasks in the token, we put
651
        // the token back.
652
5.21M
        if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
653
            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
654
            // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
655
            // threads are running for the token.
656
1.52M
            _queue.emplace_back(token);
657
1.52M
            ++token->_num_submitted_tasks;
658
1.52M
            --token->_num_unsubmitted_tasks;
659
1.52M
        }
660
661
5.21M
        if (--_active_threads == 0) {
662
1.26M
            _idle_cond.notify_all();
663
1.26M
        }
664
5.21M
    }
665
666
    // It's important that we hold the lock between exiting the loop and dropping
667
    // _num_threads. Otherwise it's possible someone else could come along here
668
    // and add a new task just as the last running thread is about to exit.
669
32.3k
    CHECK(l.owns_lock());
670
671
32.3k
    CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
672
32.3k
    _num_threads--;
673
32.3k
    if (_num_threads + _num_threads_pending_start == 0) {
674
952
        _no_threads_cond.notify_all();
675
676
        // Sanity check: if we're the last thread exiting, the queue ought to be
677
        // empty. Otherwise it will never get processed.
678
952
        CHECK(_queue.empty());
679
952
        DCHECK_EQ(0, _total_queued_tasks);
680
952
    }
681
32.3k
}
682
683
32.3k
Status ThreadPool::create_thread() {
684
32.3k
    return Thread::create("thread pool", absl::Substitute("$0 [worker]", _name),
685
32.3k
                          &ThreadPool::dispatch_thread, this, nullptr);
686
32.3k
}
687
688
1.69M
void ThreadPool::check_not_pool_thread_unlocked() {
689
1.69M
    Thread* current = Thread::current_thread();
690
1.69M
    if (_threads.contains(current)) {
691
0
        throw Exception(
692
0
                Status::FatalError("Thread belonging to thread pool {} with "
693
0
                                   "name {} called pool function that would result in deadlock",
694
0
                                   _name, current->name()));
695
0
    }
696
1.69M
}
697
698
6
Status ThreadPool::set_min_threads(int min_threads) {
699
6
    std::lock_guard<std::mutex> l(_lock);
700
6
    if (min_threads > _max_threads) {
701
        // min threads can not be set greater than max threads
702
1
        return Status::InternalError("set thread pool {} min_threads failed", _name);
703
1
    }
704
5
    _min_threads = min_threads;
705
5
    if (min_threads > _num_threads + _num_threads_pending_start) {
706
1
        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
707
1
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
708
1
    }
709
5
    return Status::OK();
710
5
}
711
712
10
Status ThreadPool::set_max_threads(int max_threads) {
713
10
    std::lock_guard<std::mutex> l(_lock);
714
10
    DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", {
715
10
        _max_threads = max_threads;
716
10
        return Status::OK();
717
10
    })
718
10
    if (_min_threads > max_threads) {
719
        // max threads can not be set less than min threads
720
1
        return Status::InternalError("set thread pool {} max_threads failed", _name);
721
1
    }
722
723
9
    _max_threads = max_threads;
724
9
    if (_max_threads > _num_threads + _num_threads_pending_start) {
725
6
        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
726
6
        addition_threads = std::min(addition_threads, _total_queued_tasks);
727
6
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
728
6
    }
729
9
    return Status::OK();
730
9
}
731
732
0
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
733
0
    return o << ThreadPoolToken::state_to_string(s);
734
0
}
735
736
} // namespace doris