Coverage Report

Created: 2025-04-22 12:18

/root/doris/be/src/util/threadpool.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
19
// and modified by Doris
20
21
#include "util/threadpool.h"
22
23
#include <algorithm>
24
#include <cstdint>
25
#include <limits>
26
#include <ostream>
27
#include <thread>
28
#include <utility>
29
30
#include "common/exception.h"
31
#include "common/logging.h"
32
#include "gutil/port.h"
33
#include "gutil/strings/substitute.h"
34
#include "util/debug/sanitizer_scopes.h"
35
#include "util/debug_points.h"
36
#include "util/doris_metrics.h"
37
#include "util/metrics.h"
38
#include "util/scoped_cleanup.h"
39
#include "util/stopwatch.hpp"
40
#include "util/thread.h"
41
42
namespace doris {
43
// The name of these varialbs will be useds as metric name in prometheus.
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
45
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
46
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
47
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
48
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
49
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total,
50
                                     MetricUnit::NANOSECONDS);
51
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT);
52
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total,
53
                                     MetricUnit::NANOSECONDS);
54
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT);
55
using namespace ErrorCode;
56
57
using std::string;
58
using strings::Substitute;
59
60
class FunctionRunnable : public Runnable {
61
public:
62
34.3k
    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
63
64
28.9k
    void run() override { _func(); }
65
66
private:
67
    std::function<void()> _func;
68
};
69
70
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
71
        : _name(std::move(name)),
72
          _workload_group(std::move(workload_group)),
73
          _min_threads(0),
74
          _max_threads(std::thread::hardware_concurrency()),
75
          _max_queue_size(std::numeric_limits<int>::max()),
76
515
          _idle_timeout(std::chrono::milliseconds(500)) {}
77
78
477
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
79
477
    CHECK_GE(min_threads, 0);
80
477
    _min_threads = min_threads;
81
477
    return *this;
82
477
}
83
84
487
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
85
487
    CHECK_GT(max_threads, 0);
86
487
    _max_threads = max_threads;
87
487
    return *this;
88
487
}
89
90
172
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
91
172
    _max_queue_size = max_queue_size;
92
172
    return *this;
93
172
}
94
95
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
96
79
        std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
97
79
    _cgroup_cpu_ctl = cgroup_cpu_ctl;
98
79
    return *this;
99
79
}
100
101
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
102
                                 int max_concurrency)
103
        : _mode(mode),
104
          _pool(pool),
105
          _state(State::IDLE),
106
          _active_threads(0),
107
          _max_concurrency(max_concurrency),
108
          _num_submitted_tasks(0),
109
2.64k
          _num_unsubmitted_tasks(0) {
110
2.64k
    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
111
1
        _mode = ThreadPool::ExecutionMode::SERIAL;
112
1
    }
113
2.64k
}
114
115
2.54k
ThreadPoolToken::~ThreadPoolToken() {
116
2.54k
    shutdown();
117
2.54k
    _pool->release_token(this);
118
2.54k
}
119
120
8.98k
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
121
8.98k
    return _pool->do_submit(std::move(r), this);
122
8.98k
}
123
124
9.01k
Status ThreadPoolToken::submit_func(std::function<void()> f) {
125
9.01k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
126
9.01k
}
127
128
3.84k
void ThreadPoolToken::shutdown() {
129
3.84k
    std::unique_lock<std::mutex> l(_pool->_lock);
130
3.84k
    _pool->check_not_pool_thread_unlocked();
131
132
    // Clear the queue under the lock, but defer the releasing of the tasks
133
    // outside the lock, in case there are concurrent threads wanting to access
134
    // the ThreadPool. The task's destructors may acquire locks, etc, so this
135
    // also prevents lock inversions.
136
3.84k
    std::deque<ThreadPool::Task> to_release = std::move(_entries);
137
3.84k
    _pool->_total_queued_tasks -= to_release.size();
138
139
3.84k
    switch (state()) {
140
962
    case State::IDLE:
141
        // There were no tasks outstanding; we can quiesce the token immediately.
142
962
        transition(State::QUIESCED);
143
962
        break;
144
1.12k
    case State::RUNNING:
145
        // There were outstanding tasks. If any are still running, switch to
146
        // QUIESCING and wait for them to finish (the worker thread executing
147
        // the token's last task will switch the token to QUIESCED). Otherwise,
148
        // we can quiesce the token immediately.
149
150
        // Note: this is an O(n) operation, but it's expected to be infrequent.
151
        // Plus doing it this way (rather than switching to QUIESCING and waiting
152
        // for a worker thread to process the queue entry) helps retain state
153
        // transition symmetry with ThreadPool::shutdown.
154
11.9k
        for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
155
10.8k
            if (*it == this) {
156
1.02k
                it = _pool->_queue.erase(it);
157
9.82k
            } else {
158
9.82k
                it++;
159
9.82k
            }
160
10.8k
        }
161
162
1.12k
        if (_active_threads == 0) {
163
469
            transition(State::QUIESCED);
164
469
            break;
165
469
        }
166
660
        transition(State::QUIESCING);
167
660
        [[fallthrough]];
168
668
    case State::QUIESCING:
169
        // The token is already quiescing. Just wait for a worker thread to
170
        // switch it to QUIESCED.
171
1.33k
        _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
172
668
        break;
173
1.74k
    default:
174
1.74k
        break;
175
3.84k
    }
176
3.84k
}
177
178
784
void ThreadPoolToken::wait() {
179
784
    std::unique_lock<std::mutex> l(_pool->_lock);
180
784
    _pool->check_not_pool_thread_unlocked();
181
1.00k
    _not_running_cond.wait(l, [this]() { return !is_active(); });
182
784
}
183
184
8.08k
void ThreadPoolToken::transition(State new_state) {
185
8.08k
#ifndef NDEBUG
186
8.08k
    CHECK_NE(_state, new_state);
187
188
8.08k
    switch (_state) {
189
4.40k
    case State::IDLE:
190
4.40k
        CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
191
4.40k
        if (new_state == State::RUNNING) {
192
3.00k
            CHECK(!_entries.empty());
193
3.00k
        } else {
194
1.39k
            CHECK(_entries.empty());
195
1.39k
            CHECK_EQ(_active_threads, 0);
196
1.39k
        }
197
4.40k
        break;
198
2.99k
    case State::RUNNING:
199
2.99k
        CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
200
2.99k
              new_state == State::QUIESCED);
201
2.99k
        CHECK(_entries.empty());
202
2.99k
        if (new_state == State::QUIESCING) {
203
678
            CHECK_GT(_active_threads, 0);
204
678
        }
205
2.99k
        break;
206
678
    case State::QUIESCING:
207
678
        CHECK(new_state == State::QUIESCED);
208
678
        CHECK_EQ(_active_threads, 0);
209
678
        break;
210
0
    case State::QUIESCED:
211
0
        CHECK(false); // QUIESCED is a terminal state
212
0
        break;
213
0
    default:
214
0
        throw Exception(Status::FatalError("Unknown token state: {}", _state));
215
8.08k
    }
216
8.08k
#endif
217
218
    // Take actions based on the state we're entering.
219
8.08k
    switch (new_state) {
220
1.85k
    case State::IDLE:
221
4.39k
    case State::QUIESCED:
222
4.39k
        _not_running_cond.notify_all();
223
4.39k
        break;
224
3.68k
    default:
225
3.68k
        break;
226
8.08k
    }
227
228
8.08k
    _state = new_state;
229
8.08k
}
230
231
0
const char* ThreadPoolToken::state_to_string(State s) {
232
0
    switch (s) {
233
0
    case State::IDLE:
234
0
        return "IDLE";
235
0
        break;
236
0
    case State::RUNNING:
237
0
        return "RUNNING";
238
0
        break;
239
0
    case State::QUIESCING:
240
0
        return "QUIESCING";
241
0
        break;
242
0
    case State::QUIESCED:
243
0
        return "QUIESCED";
244
0
        break;
245
0
    }
246
0
    return "<cannot reach here>";
247
0
}
248
249
31.3k
bool ThreadPoolToken::need_dispatch() {
250
31.3k
    return _state == ThreadPoolToken::State::IDLE ||
251
31.3k
           (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
252
28.3k
            _num_submitted_tasks < _max_concurrency);
253
31.3k
}
254
255
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
256
        : _name(builder._name),
257
          _workload_group(builder._workload_group),
258
          _min_threads(builder._min_threads),
259
          _max_threads(builder._max_threads),
260
          _max_queue_size(builder._max_queue_size),
261
          _idle_timeout(builder._idle_timeout),
262
          _pool_status(Status::Uninitialized("The pool was not initialized.")),
263
          _num_threads(0),
264
          _num_threads_pending_start(0),
265
          _active_threads(0),
266
          _total_queued_tasks(0),
267
          _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
268
          _tokenless(new_token(ExecutionMode::CONCURRENT)),
269
498
          _id(UniqueId::gen_uid()) {}
270
271
396
ThreadPool::~ThreadPool() {
272
    // There should only be one live token: the one used in tokenless submission.
273
396
    CHECK_EQ(1, _tokens.size()) << strings::Substitute(
274
0
            "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
275
396
    shutdown();
276
396
}
277
278
503
Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) {
279
4.75k
    for (int i = 0; i < thread_num; i++) {
280
4.25k
        Status status = create_thread();
281
4.25k
        if (status.ok()) {
282
4.25k
            _num_threads_pending_start++;
283
4.25k
        } else {
284
0
            LOG(WARNING) << "Thread pool " << _name << " failed to create thread: " << status;
285
0
            return status;
286
0
        }
287
4.25k
    }
288
503
    return Status::OK();
289
503
}
290
291
498
Status ThreadPool::init() {
292
498
    if (!_pool_status.is<UNINITIALIZED>()) {
293
0
        return Status::NotSupported("The thread pool {} is already initialized", _name);
294
0
    }
295
498
    _pool_status = Status::OK();
296
297
498
    {
298
498
        std::lock_guard<std::mutex> l(_lock);
299
        // create thread failed should not cause threadpool init failed,
300
        // because thread can be created later such as when submit a task.
301
498
        static_cast<void>(try_create_thread(_min_threads, l));
302
498
    }
303
304
    // _id of thread pool is used to make sure when we create thread pool with same name, we can
305
    // get different _metric_entity
306
    // If not, we will have problem when we deregister entity and register hook.
307
498
    _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
308
498
            fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
309
498
                                                   {"workload_group", _workload_group},
310
498
                                                   {"id", _id.to_string()}});
311
312
498
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
313
498
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
314
498
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
315
498
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
316
498
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total);
317
498
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total);
318
498
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total);
319
498
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total);
320
498
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
321
322
2.98k
    _metric_entity->register_hook("update", [this]() {
323
2.98k
        {
324
2.98k
            std::lock_guard<std::mutex> l(_lock);
325
2.98k
            if (!_pool_status.ok()) {
326
0
                return;
327
0
            }
328
2.98k
        }
329
330
2.98k
        thread_pool_active_threads->set_value(num_active_threads());
331
2.98k
        thread_pool_queue_size->set_value(get_queue_size());
332
2.98k
        thread_pool_max_queue_size->set_value(get_max_queue_size());
333
2.98k
        thread_pool_max_threads->set_value(max_threads());
334
2.98k
    });
335
498
    return Status::OK();
336
498
}
337
338
719
void ThreadPool::shutdown() {
339
    // Why access to doris_metrics is safe here?
340
    // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
341
    // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
342
    // ExecEnv::destroy().
343
719
    DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
344
719
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
345
719
    std::unique_lock<std::mutex> l(_lock);
346
719
    check_not_pool_thread_unlocked();
347
348
    // Note: this is the same error seen at submission if the pool is at
349
    // capacity, so clients can't tell them apart. This isn't really a practical
350
    // concern though because shutting down a pool typically requires clients to
351
    // be quiesced first, so there's no danger of a client getting confused.
352
    // Not print stack trace here
353
719
    _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
354
719
            "The thread pool {} has been shut down.", _name);
355
356
    // Clear the various queues under the lock, but defer the releasing
357
    // of the tasks outside the lock, in case there are concurrent threads
358
    // wanting to access the ThreadPool. The task's destructors may acquire
359
    // locks, etc, so this also prevents lock inversions.
360
719
    _queue.clear();
361
362
719
    std::deque<std::deque<Task>> to_release;
363
791
    for (auto* t : _tokens) {
364
791
        if (!t->_entries.empty()) {
365
2
            to_release.emplace_back(std::move(t->_entries));
366
2
        }
367
791
        switch (t->state()) {
368
434
        case ThreadPoolToken::State::IDLE:
369
            // The token is idle; we can quiesce it immediately.
370
434
            t->transition(ThreadPoolToken::State::QUIESCED);
371
434
            break;
372
18
        case ThreadPoolToken::State::RUNNING:
373
            // The token has tasks associated with it. If they're merely queued
374
            // (i.e. there are no active threads), the tasks will have been removed
375
            // above and we can quiesce immediately. Otherwise, we need to wait for
376
            // the threads to finish.
377
18
            t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
378
18
                                                 : ThreadPoolToken::State::QUIESCED);
379
18
            break;
380
339
        default:
381
339
            break;
382
791
        }
383
791
    }
384
385
    // The queues are empty. Wake any sleeping worker threads and wait for all
386
    // of them to exit. Some worker threads will exit immediately upon waking,
387
    // while others will exit after they finish executing an outstanding task.
388
719
    _total_queued_tasks = 0;
389
3.14k
    while (!_idle_threads.empty()) {
390
2.42k
        _idle_threads.front().not_empty.notify_one();
391
2.42k
        _idle_threads.pop_front();
392
2.42k
    }
393
394
1.09k
    _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
395
396
    // All the threads have exited. Check the state of each token.
397
791
    for (auto* t : _tokens) {
398
791
        DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
399
791
               t->state() == ThreadPoolToken::State::QUIESCED);
400
791
    }
401
719
}
402
403
2.64k
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
404
2.64k
    std::lock_guard<std::mutex> l(_lock);
405
2.64k
    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
406
2.64k
    if (!_tokens.insert(t.get()).second) {
407
0
        throw Exception(Status::InternalError("duplicate token"));
408
0
    }
409
2.64k
    return t;
410
2.64k
}
411
412
2.54k
void ThreadPool::release_token(ThreadPoolToken* t) {
413
2.54k
    std::lock_guard<std::mutex> l(_lock);
414
2.54k
    CHECK(!t->is_active()) << strings::Substitute("Token with state $0 may not be released",
415
0
                                                  ThreadPoolToken::state_to_string(t->state()));
416
2.54k
    CHECK_EQ(1, _tokens.erase(t));
417
2.54k
}
418
419
25.5k
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
420
25.5k
    return do_submit(std::move(r), _tokenless.get());
421
25.5k
}
422
423
25.2k
Status ThreadPool::submit_func(std::function<void()> f) {
424
25.2k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
425
25.2k
}
426
427
34.5k
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
428
34.5k
    DCHECK(token);
429
430
34.5k
    std::unique_lock<std::mutex> l(_lock);
431
34.5k
    if (PREDICT_FALSE(!_pool_status.ok())) {
432
1
        return _pool_status;
433
1
    }
434
435
34.5k
    if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
436
3.28k
        return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
437
3.28k
    }
438
439
    // Size limit check.
440
31.2k
    int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
441
31.2k
                                 static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
442
31.2k
    if (capacity_remaining < 1) {
443
4
        thread_pool_submit_failed->increment(1);
444
4
        return Status::Error<SERVICE_UNAVAILABLE>(
445
4
                "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
446
4
                _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
447
4
                _max_queue_size);
448
4
    }
449
450
    // Should we create another thread?
451
452
    // We assume that each current inactive thread will grab one item from the
453
    // queue.  If it seems like we'll need another thread, we create one.
454
    //
455
    // Rather than creating the thread here, while holding the lock, we defer
456
    // it to down below. This is because thread creation can be rather slow
457
    // (hundreds of milliseconds in some cases) and we'd like to allow the
458
    // existing threads to continue to process tasks while we do so.
459
    //
460
    // In theory, a currently active thread could finish immediately after this
461
    // calculation but before our new worker starts running. This would mean we
462
    // created a thread we didn't really need. However, this race is unavoidable
463
    // and harmless.
464
    //
465
    // Of course, we never create more than _max_threads threads no matter what.
466
31.2k
    int threads_from_this_submit =
467
31.2k
            token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
468
31.2k
    int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
469
31.2k
    int additional_threads =
470
31.2k
            static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
471
31.2k
    bool need_a_thread = false;
472
31.2k
    if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
473
626
        need_a_thread = true;
474
626
        _num_threads_pending_start++;
475
626
    }
476
477
31.2k
    Task task;
478
31.2k
    task.runnable = std::move(r);
479
31.2k
    task.submit_time_wather.start();
480
481
    // Add the task to the token's queue.
482
31.2k
    ThreadPoolToken::State state = token->state();
483
31.2k
    DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
484
31.2k
    token->_entries.emplace_back(std::move(task));
485
    // When we need to execute the task in the token, we submit the token object to the queue.
486
    // There are currently two places where tokens will be submitted to the queue:
487
    // 1. When submitting a new task, if the token is still in the IDLE state,
488
    //    or the concurrency of the token has not reached the online level, it will be added to the queue.
489
    // 2. When the dispatch thread finishes executing a task:
490
    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
491
    //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
492
    //       then submitted to the queue.
493
31.2k
    if (token->need_dispatch()) {
494
29.3k
        _queue.emplace_back(token);
495
29.3k
        ++token->_num_submitted_tasks;
496
29.3k
        if (state == ThreadPoolToken::State::IDLE) {
497
3.00k
            token->transition(ThreadPoolToken::State::RUNNING);
498
3.00k
        }
499
29.3k
    } else {
500
1.98k
        ++token->_num_unsubmitted_tasks;
501
1.98k
    }
502
31.2k
    _total_queued_tasks++;
503
504
    // Wake up an idle thread for this task. Choosing the thread at the front of
505
    // the list ensures LIFO semantics as idling threads are also added to the front.
506
    //
507
    // If there are no idle threads, the new task remains on the queue and is
508
    // processed by an active thread (or a thread we're about to create) at some
509
    // point in the future.
510
31.2k
    if (!_idle_threads.empty()) {
511
2.72k
        _idle_threads.front().not_empty.notify_one();
512
2.72k
        _idle_threads.pop_front();
513
2.72k
    }
514
31.2k
    l.unlock();
515
516
31.2k
    if (need_a_thread) {
517
626
        Status status = create_thread();
518
626
        if (!status.ok()) {
519
0
            l.lock();
520
0
            _num_threads_pending_start--;
521
0
            if (_num_threads + _num_threads_pending_start == 0) {
522
                // If we have no threads, we can't do any work.
523
0
                return status;
524
0
            }
525
            // If we failed to create a thread, but there are still some other
526
            // worker threads, log a warning message and continue.
527
0
            LOG(WARNING) << "Thread pool " << _name
528
0
                         << " failed to create thread: " << status.to_string();
529
0
        }
530
626
    }
531
532
31.2k
    return Status::OK();
533
31.2k
}
534
535
184
void ThreadPool::wait() {
536
184
    std::unique_lock<std::mutex> l(_lock);
537
184
    check_not_pool_thread_unlocked();
538
311
    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
539
184
}
540
541
4.87k
void ThreadPool::dispatch_thread() {
542
4.87k
    std::unique_lock<std::mutex> l(_lock);
543
4.87k
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
544
4.87k
    if (!_threads.insert(Thread::current_thread()).second) {
545
0
        throw Exception(Status::InternalError("duplicate token"));
546
0
    }
547
4.87k
    DCHECK_GT(_num_threads_pending_start, 0);
548
4.87k
    _num_threads++;
549
4.87k
    _num_threads_pending_start--;
550
551
4.87k
    if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) {
552
0
        static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
553
0
    }
554
555
    // Owned by this worker thread and added/removed from _idle_threads as needed.
556
4.87k
    IdleThread me;
557
558
1.47M
    while (true) {
559
        // Note: Status::Aborted() is used to indicate normal shutdown.
560
1.47M
        if (!_pool_status.ok()) {
561
2.66k
            VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
562
2.66k
            break;
563
2.66k
        }
564
565
1.46M
        if (_num_threads + _num_threads_pending_start > _max_threads) {
566
8
            break;
567
8
        }
568
569
1.46M
        if (_queue.empty()) {
570
            // There's no work to do, let's go idle.
571
            //
572
            // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
573
1.44M
            _idle_threads.push_front(me);
574
1.44M
            SCOPED_CLEANUP({
575
                // For some wake ups (i.e. shutdown or do_submit) this thread is
576
                // guaranteed to be unlinked after being awakened. In others (i.e.
577
                // spurious wake-up or Wait timeout), it'll still be linked.
578
1.44M
                if (me.is_linked()) {
579
1.44M
                    _idle_threads.erase(_idle_threads.iterator_to(me));
580
1.44M
                }
581
1.44M
            });
582
1.44M
            if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
583
                // After much investigation, it appears that pthread condition variables have
584
                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
585
                // another thread did in fact signal. Apparently after a timeout there is some
586
                // brief period during which another thread may actually grab the internal mutex
587
                // protecting the state, signal, and release again before we get the mutex. So,
588
                // we'll recheck the empty queue case regardless.
589
1.44M
                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
590
598
                    VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
591
0
                                << std::chrono::duration_cast<std::chrono::milliseconds>(
592
0
                                           _idle_timeout)
593
0
                                           .count()
594
0
                                << "ms of idle time.";
595
598
                    break;
596
598
                }
597
1.43M
            }
598
1.44M
            continue;
599
1.44M
        }
600
601
26.4k
        MonotonicStopWatch task_execution_time_watch;
602
26.4k
        task_execution_time_watch.start();
603
        // Get the next token and task to execute.
604
26.4k
        ThreadPoolToken* token = _queue.front();
605
26.4k
        _queue.pop_front();
606
26.4k
        DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
607
26.4k
        DCHECK(!token->_entries.empty());
608
26.4k
        Task task = std::move(token->_entries.front());
609
26.4k
        thread_pool_task_wait_worker_time_ns_total->increment(
610
26.4k
                task.submit_time_wather.elapsed_time());
611
26.4k
        thread_pool_task_wait_worker_count_total->increment(1);
612
26.4k
        token->_entries.pop_front();
613
26.4k
        token->_active_threads++;
614
26.4k
        --_total_queued_tasks;
615
26.4k
        ++_active_threads;
616
617
26.4k
        l.unlock();
618
619
        // Execute the task
620
26.4k
        task.runnable->run();
621
        // Destruct the task while we do not hold the lock.
622
        //
623
        // The task's destructor may be expensive if it has a lot of bound
624
        // objects, and we don't want to block submission of the threadpool.
625
        // In the worst case, the destructor might even try to do something
626
        // with this threadpool, and produce a deadlock.
627
26.4k
        task.runnable.reset();
628
26.4k
        l.lock();
629
26.4k
        thread_pool_task_execution_time_ns_total->increment(
630
26.4k
                task_execution_time_watch.elapsed_time());
631
26.4k
        thread_pool_task_execution_count_total->increment(1);
632
        // Possible states:
633
        // 1. The token was shut down while we ran its task. Transition to QUIESCED.
634
        // 2. The token has no more queued tasks. Transition back to IDLE.
635
        // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
636
26.4k
        ThreadPoolToken::State state = token->state();
637
26.4k
        DCHECK(state == ThreadPoolToken::State::RUNNING ||
638
26.4k
               state == ThreadPoolToken::State::QUIESCING);
639
26.4k
        --token->_active_threads;
640
26.4k
        --token->_num_submitted_tasks;
641
642
        // handle shutdown && idle
643
26.4k
        if (token->_active_threads == 0) {
644
3.70k
            if (state == ThreadPoolToken::State::QUIESCING) {
645
678
                DCHECK(token->_entries.empty());
646
678
                token->transition(ThreadPoolToken::State::QUIESCED);
647
3.02k
            } else if (token->_entries.empty()) {
648
1.85k
                token->transition(ThreadPoolToken::State::IDLE);
649
1.85k
            }
650
3.70k
        }
651
652
        // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
653
26.4k
        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
654
655
        // If token->state is running and there are unsubmitted tasks in the token, we put
656
        // the token back.
657
26.4k
        if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
658
            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
659
            // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
660
            // threads are running for the token.
661
942
            _queue.emplace_back(token);
662
942
            ++token->_num_submitted_tasks;
663
942
            --token->_num_unsubmitted_tasks;
664
942
        }
665
666
26.4k
        if (--_active_threads == 0) {
667
942
            _idle_cond.notify_all();
668
942
        }
669
26.4k
    }
670
671
    // It's important that we hold the lock between exiting the loop and dropping
672
    // _num_threads. Otherwise it's possible someone else could come along here
673
    // and add a new task just as the last running thread is about to exit.
674
4.87k
    CHECK(l.owns_lock());
675
676
4.87k
    CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
677
4.87k
    _num_threads--;
678
4.87k
    if (_num_threads + _num_threads_pending_start == 0) {
679
871
        _no_threads_cond.notify_all();
680
681
        // Sanity check: if we're the last thread exiting, the queue ought to be
682
        // empty. Otherwise it will never get processed.
683
871
        CHECK(_queue.empty());
684
871
        DCHECK_EQ(0, _total_queued_tasks);
685
871
    }
686
4.87k
}
687
688
4.88k
Status ThreadPool::create_thread() {
689
4.88k
    return Thread::create("thread pool", strings::Substitute("$0 [worker]", _name),
690
4.88k
                          &ThreadPool::dispatch_thread, this, nullptr);
691
4.88k
}
692
693
5.52k
void ThreadPool::check_not_pool_thread_unlocked() {
694
5.52k
    Thread* current = Thread::current_thread();
695
5.52k
    if (_threads.contains(current)) {
696
0
        throw Exception(
697
0
                Status::FatalError("Thread belonging to thread pool {} with "
698
0
                                   "name {} called pool function that would result in deadlock",
699
0
                                   _name, current->name()));
700
0
    }
701
5.52k
}
702
703
6
Status ThreadPool::set_min_threads(int min_threads) {
704
6
    std::lock_guard<std::mutex> l(_lock);
705
6
    if (min_threads > _max_threads) {
706
        // min threads can not be set greater than max threads
707
1
        return Status::InternalError("set thread pool {} min_threads failed", _name);
708
1
    }
709
5
    _min_threads = min_threads;
710
5
    if (min_threads > _num_threads + _num_threads_pending_start) {
711
0
        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
712
0
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
713
0
    }
714
5
    return Status::OK();
715
5
}
716
717
10
Status ThreadPool::set_max_threads(int max_threads) {
718
10
    std::lock_guard<std::mutex> l(_lock);
719
10
    DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", {
720
10
        _max_threads = max_threads;
721
10
        return Status::OK();
722
10
    })
723
10
    if (_min_threads > max_threads) {
724
        // max threads can not be set less than min threads
725
1
        return Status::InternalError("set thread pool {} max_threads failed", _name);
726
1
    }
727
728
9
    _max_threads = max_threads;
729
9
    if (_max_threads > _num_threads + _num_threads_pending_start) {
730
5
        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
731
5
        addition_threads = std::min(addition_threads, _total_queued_tasks);
732
5
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
733
5
    }
734
9
    return Status::OK();
735
9
}
736
737
0
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
738
0
    return o << ThreadPoolToken::state_to_string(s);
739
0
}
740
741
} // namespace doris