Coverage Report

Created: 2026-06-24 17:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/threadpool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
19
// and modified by Doris
20
21
#include "util/threadpool.h"
22
23
#include <algorithm>
24
#include <cstdint>
25
#include <limits>
26
#include <ostream>
27
#include <thread>
28
#include <utility>
29
30
#include "absl/strings/substitute.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/metrics/doris_metrics.h"
34
#include "common/metrics/metrics.h"
35
#include "util/debug_points.h"
36
#include "util/stopwatch.hpp"
37
#include "util/thread.h"
38
39
namespace doris {
40
// The name of these varialbs will be useds as metric name in prometheus.
41
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
42
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
43
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
45
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
46
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total,
47
                                     MetricUnit::NANOSECONDS);
48
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT);
49
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total,
50
                                     MetricUnit::NANOSECONDS);
51
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT);
52
using namespace ErrorCode;
53
54
using std::string;
55
56
class FunctionRunnable : public Runnable {
57
public:
58
1.41M
    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
59
60
1.41M
    void run() override { _func(); }
61
62
private:
63
    std::function<void()> _func;
64
};
65
66
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
67
903
        : _name(std::move(name)),
68
903
          _workload_group(std::move(workload_group)),
69
903
          _min_threads(0),
70
903
          _max_threads(std::thread::hardware_concurrency()),
71
903
          _max_queue_size(std::numeric_limits<int>::max()),
72
903
          _idle_timeout(std::chrono::milliseconds(500)) {}
73
74
853
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
75
853
    CHECK_GE(min_threads, 0);
76
853
    _min_threads = min_threads;
77
853
    return *this;
78
853
}
79
80
874
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
81
874
    CHECK_GT(max_threads, 0);
82
874
    _max_threads = max_threads;
83
874
    return *this;
84
874
}
85
86
160
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
87
160
    _max_queue_size = max_queue_size;
88
160
    return *this;
89
160
}
90
91
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
92
105
        std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
93
105
    _cgroup_cpu_ctl = cgroup_cpu_ctl;
94
105
    return *this;
95
105
}
96
97
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
98
                                 int max_concurrency)
99
381k
        : _mode(mode),
100
381k
          _pool(pool),
101
381k
          _state(State::IDLE),
102
381k
          _active_threads(0),
103
381k
          _max_concurrency(max_concurrency),
104
381k
          _num_submitted_tasks(0),
105
381k
          _num_unsubmitted_tasks(0) {
106
381k
    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
107
46.5k
        _mode = ThreadPool::ExecutionMode::SERIAL;
108
46.5k
    }
109
381k
}
110
111
381k
ThreadPoolToken::~ThreadPoolToken() {
112
381k
    shutdown();
113
381k
    _pool->release_token(this);
114
381k
}
115
116
557k
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
117
557k
    return _pool->do_submit(std::move(r), this);
118
557k
}
119
120
559k
Status ThreadPoolToken::submit_func(std::function<void()> f) {
121
559k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
122
559k
}
123
124
733k
void ThreadPoolToken::shutdown() {
125
733k
    std::unique_lock<std::mutex> l(_pool->_lock);
126
733k
    _pool->check_not_pool_thread_unlocked();
127
128
    // Clear the queue under the lock, but defer the releasing of the tasks
129
    // outside the lock, in case there are concurrent threads wanting to access
130
    // the ThreadPool. The task's destructors may acquire locks, etc, so this
131
    // also prevents lock inversions.
132
733k
    std::deque<ThreadPool::Task> to_release = std::move(_entries);
133
733k
    _pool->_total_queued_tasks -= to_release.size();
134
135
733k
    switch (state()) {
136
379k
    case State::IDLE:
137
        // There were no tasks outstanding; we can quiesce the token immediately.
138
379k
        transition(State::QUIESCED);
139
379k
        break;
140
1.10k
    case State::RUNNING:
141
        // There were outstanding tasks. If any are still running, switch to
142
        // QUIESCING and wait for them to finish (the worker thread executing
143
        // the token's last task will switch the token to QUIESCED). Otherwise,
144
        // we can quiesce the token immediately.
145
146
        // Note: this is an O(n) operation, but it's expected to be infrequent.
147
        // Plus doing it this way (rather than switching to QUIESCING and waiting
148
        // for a worker thread to process the queue entry) helps retain state
149
        // transition symmetry with ThreadPool::shutdown.
150
9.22k
        for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
151
8.11k
            if (*it == this) {
152
654
                it = _pool->_queue.erase(it);
153
7.46k
            } else {
154
7.46k
                it++;
155
7.46k
            }
156
8.11k
        }
157
158
1.10k
        if (_active_threads == 0) {
159
261
            transition(State::QUIESCED);
160
261
            break;
161
261
        }
162
847
        transition(State::QUIESCING);
163
847
        [[fallthrough]];
164
864
    case State::QUIESCING:
165
        // The token is already quiescing. Just wait for a worker thread to
166
        // switch it to QUIESCED.
167
1.72k
        _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
168
864
        break;
169
353k
    default:
170
353k
        break;
171
733k
    }
172
733k
}
173
174
238k
void ThreadPoolToken::wait() {
175
238k
    std::unique_lock<std::mutex> l(_pool->_lock);
176
238k
    _pool->check_not_pool_thread_unlocked();
177
273k
    _not_running_cond.wait(l, [this]() { return !is_active(); });
178
238k
}
179
180
1.83M
void ThreadPoolToken::transition(State new_state) {
181
1.83M
#ifndef NDEBUG
182
1.83M
    CHECK_NE(_state, new_state);
183
184
1.83M
    switch (_state) {
185
1.10M
    case State::IDLE:
186
1.10M
        CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
187
1.10M
        if (new_state == State::RUNNING) {
188
726k
            CHECK(!_entries.empty());
189
726k
        } else {
190
380k
            CHECK(_entries.empty());
191
380k
            CHECK_EQ(_active_threads, 0);
192
380k
        }
193
1.10M
        break;
194
726k
    case State::RUNNING:
195
726k
        CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
196
726k
              new_state == State::QUIESCED);
197
726k
        CHECK(_entries.empty());
198
726k
        if (new_state == State::QUIESCING) {
199
883
            CHECK_GT(_active_threads, 0);
200
883
        }
201
726k
        break;
202
883
    case State::QUIESCING:
203
883
        CHECK(new_state == State::QUIESCED);
204
883
        CHECK_EQ(_active_threads, 0);
205
883
        break;
206
0
    case State::QUIESCED:
207
0
        CHECK(false); // QUIESCED is a terminal state
208
0
        break;
209
0
    default:
210
0
        throw Exception(Status::FatalError("Unknown token state: {}", _state));
211
1.83M
    }
212
1.83M
#endif
213
214
    // Take actions based on the state we're entering.
215
1.83M
    switch (new_state) {
216
725k
    case State::IDLE:
217
1.10M
    case State::QUIESCED:
218
1.10M
        _not_running_cond.notify_all();
219
1.10M
        break;
220
727k
    default:
221
727k
        break;
222
1.83M
    }
223
224
1.83M
    _state = new_state;
225
1.83M
}
226
227
0
const char* ThreadPoolToken::state_to_string(State s) {
228
0
    switch (s) {
229
0
    case State::IDLE:
230
0
        return "IDLE";
231
0
        break;
232
0
    case State::RUNNING:
233
0
        return "RUNNING";
234
0
        break;
235
0
    case State::QUIESCING:
236
0
        return "QUIESCING";
237
0
        break;
238
0
    case State::QUIESCED:
239
0
        return "QUIESCED";
240
0
        break;
241
0
    }
242
0
    return "<cannot reach here>";
243
0
}
244
245
1.48M
bool ThreadPoolToken::need_dispatch() {
246
1.48M
    return _state == ThreadPoolToken::State::IDLE ||
247
1.48M
           (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
248
760k
            _num_submitted_tasks < _max_concurrency);
249
1.48M
}
250
251
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
252
857
        : _name(builder._name),
253
857
          _workload_group(builder._workload_group),
254
857
          _min_threads(builder._min_threads),
255
857
          _max_threads(builder._max_threads),
256
857
          _max_queue_size(builder._max_queue_size),
257
857
          _idle_timeout(builder._idle_timeout),
258
857
          _pool_status(Status::Uninitialized("The pool was not initialized.")),
259
857
          _num_threads(0),
260
857
          _num_threads_pending_start(0),
261
857
          _active_threads(0),
262
857
          _total_queued_tasks(0),
263
857
          _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
264
857
          _tokenless(new_token(ExecutionMode::CONCURRENT)),
265
857
          _id(UniqueId::gen_uid()) {}
266
267
594
ThreadPool::~ThreadPool() {
268
    // There should only be one live token: the one used in tokenless submission.
269
594
    CHECK_EQ(1, _tokens.size()) << absl::Substitute(
270
0
            "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
271
594
    shutdown();
272
594
    VLOG_DEBUG << fmt::format("Thread pool {} destroyed", _name);
273
594
}
274
275
878
Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) {
276
9.45k
    for (int i = 0; i < thread_num; i++) {
277
8.58k
        Status status = create_thread();
278
8.58k
        if (status.ok()) {
279
8.58k
            _num_threads_pending_start++;
280
8.58k
        } else {
281
0
            LOG(WARNING) << "Thread pool " << _name << " failed to create thread: " << status;
282
0
            return status;
283
0
        }
284
8.58k
    }
285
878
    return Status::OK();
286
878
}
287
288
857
Status ThreadPool::init() {
289
857
    if (!_pool_status.is<UNINITIALIZED>()) {
290
0
        return Status::NotSupported("The thread pool {} is already initialized", _name);
291
0
    }
292
857
    _pool_status = Status::OK();
293
294
857
    {
295
857
        std::lock_guard<std::mutex> l(_lock);
296
        // create thread failed should not cause threadpool init failed,
297
        // because thread can be created later such as when submit a task.
298
857
        static_cast<void>(try_create_thread(_min_threads, l));
299
857
    }
300
301
    // _id of thread pool is used to make sure when we create thread pool with same name, we can
302
    // get different _metric_entity
303
    // If not, we will have problem when we deregister entity and register hook.
304
857
    _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
305
857
            fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
306
857
                                                   {"workload_group", _workload_group},
307
857
                                                   {"id", _id.to_string()}});
308
309
857
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
310
857
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
311
857
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
312
857
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
313
857
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total);
314
857
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total);
315
857
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total);
316
857
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total);
317
857
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
318
319
49.7k
    _metric_entity->register_hook("update", [this]() {
320
49.7k
        {
321
49.7k
            std::lock_guard<std::mutex> l(_lock);
322
49.7k
            if (!_pool_status.ok()) {
323
0
                return;
324
0
            }
325
49.7k
        }
326
327
49.7k
        thread_pool_active_threads->set_value(num_active_threads());
328
49.7k
        thread_pool_queue_size->set_value(get_queue_size());
329
49.7k
        thread_pool_max_queue_size->set_value(get_max_queue_size());
330
49.7k
        thread_pool_max_threads->set_value(max_threads());
331
49.7k
    });
332
857
    return Status::OK();
333
857
}
334
335
1.06k
void ThreadPool::shutdown() {
336
1.06k
    VLOG_DEBUG << fmt::format("Shutting down thread pool {}", _name);
337
    // Why access to doris_metrics is safe here?
338
    // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
339
    // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
340
    // ExecEnv::destroy().
341
1.06k
    DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
342
1.06k
    std::unique_lock<std::mutex> l(_lock);
343
1.06k
    check_not_pool_thread_unlocked();
344
345
    // Note: this is the same error seen at submission if the pool is at
346
    // capacity, so clients can't tell them apart. This isn't really a practical
347
    // concern though because shutting down a pool typically requires clients to
348
    // be quiesced first, so there's no danger of a client getting confused.
349
    // Not print stack trace here
350
1.06k
    _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
351
1.06k
            "The thread pool {} has been shut down.", _name);
352
353
    // Clear the various queues under the lock, but defer the releasing
354
    // of the tasks outside the lock, in case there are concurrent threads
355
    // wanting to access the ThreadPool. The task's destructors may acquire
356
    // locks, etc, so this also prevents lock inversions.
357
1.06k
    _queue.clear();
358
359
1.06k
    std::deque<std::deque<Task>> to_release;
360
1.15k
    for (auto* t : _tokens) {
361
1.15k
        if (!t->_entries.empty()) {
362
3
            to_release.emplace_back(std::move(t->_entries));
363
3
        }
364
1.15k
        switch (t->state()) {
365
628
        case ThreadPoolToken::State::IDLE:
366
            // The token is idle; we can quiesce it immediately.
367
628
            t->transition(ThreadPoolToken::State::QUIESCED);
368
628
            break;
369
36
        case ThreadPoolToken::State::RUNNING:
370
            // The token has tasks associated with it. If they're merely queued
371
            // (i.e. there are no active threads), the tasks will have been removed
372
            // above and we can quiesce immediately. Otherwise, we need to wait for
373
            // the threads to finish.
374
36
            t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
375
36
                                                 : ThreadPoolToken::State::QUIESCED);
376
36
            break;
377
489
        default:
378
489
            break;
379
1.15k
        }
380
1.15k
    }
381
382
    // The queues are empty. Wake any sleeping worker threads and wait for all
383
    // of them to exit. Some worker threads will exit immediately upon waking,
384
    // while others will exit after they finish executing an outstanding task.
385
1.06k
    _total_queued_tasks = 0;
386
4.51k
    while (!_idle_threads.empty()) {
387
3.45k
        _idle_threads.front().not_empty.notify_one();
388
3.45k
        _idle_threads.pop_front();
389
3.45k
    }
390
391
1.62k
    _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
392
393
    // All the threads have exited. Check the state of each token.
394
1.15k
    for (auto* t : _tokens) {
395
1.15k
        DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
396
1.15k
               t->state() == ThreadPoolToken::State::QUIESCED);
397
1.15k
    }
398
1.06k
}
399
400
380k
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
401
380k
    std::lock_guard<std::mutex> l(_lock);
402
380k
    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
403
380k
    if (!_tokens.insert(t.get()).second) {
404
0
        throw Exception(Status::InternalError("duplicate token"));
405
0
    }
406
380k
    return t;
407
380k
}
408
409
381k
void ThreadPool::release_token(ThreadPoolToken* t) {
410
381k
    std::lock_guard<std::mutex> l(_lock);
411
18.4E
    CHECK(!t->is_active()) << absl::Substitute("Token with state $0 may not be released",
412
18.4E
                                               ThreadPoolToken::state_to_string(t->state()));
413
381k
    CHECK_EQ(1, _tokens.erase(t));
414
381k
}
415
416
928k
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
417
928k
    return do_submit(std::move(r), _tokenless.get());
418
928k
}
419
420
855k
Status ThreadPool::submit_func(std::function<void()> f) {
421
855k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
422
855k
}
423
424
1.48M
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
425
1.48M
    DCHECK(token);
426
427
1.48M
    std::unique_lock<std::mutex> l(_lock);
428
1.48M
    if (!_pool_status.ok()) [[unlikely]] {
429
1
        return _pool_status;
430
1
    }
431
432
1.48M
    if (!token->may_submit_new_tasks()) [[unlikely]] {
433
3.91k
        return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
434
3.91k
    }
435
436
    // Size limit check.
437
1.48M
    int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
438
1.48M
                                 static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
439
1.48M
    if (capacity_remaining < 1) {
440
4
        thread_pool_submit_failed->increment(1);
441
4
        return Status::Error<SERVICE_UNAVAILABLE>(
442
4
                "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
443
4
                _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
444
4
                _max_queue_size);
445
4
    }
446
447
    // Should we create another thread?
448
449
    // We assume that each current inactive thread will grab one item from the
450
    // queue.  If it seems like we'll need another thread, we create one.
451
    //
452
    // Rather than creating the thread here, while holding the lock, we defer
453
    // it to down below. This is because thread creation can be rather slow
454
    // (hundreds of milliseconds in some cases) and we'd like to allow the
455
    // existing threads to continue to process tasks while we do so.
456
    //
457
    // In theory, a currently active thread could finish immediately after this
458
    // calculation but before our new worker starts running. This would mean we
459
    // created a thread we didn't really need. However, this race is unavoidable
460
    // and harmless.
461
    //
462
    // Of course, we never create more than _max_threads threads no matter what.
463
1.48M
    int threads_from_this_submit =
464
1.48M
            token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
465
1.48M
    int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
466
1.48M
    int additional_threads =
467
1.48M
            static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
468
1.48M
    bool need_a_thread = false;
469
1.48M
    if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
470
39.5k
        need_a_thread = true;
471
39.5k
        _num_threads_pending_start++;
472
39.5k
    }
473
474
1.48M
    if (need_a_thread && _num_threads + _num_threads_pending_start == 1) {
475
866
        Status status = create_thread();
476
866
        if (!status.ok()) {
477
1
            _num_threads_pending_start--;
478
1
            thread_pool_submit_failed->increment(1);
479
1
            return status;
480
1
        }
481
865
        need_a_thread = false;
482
865
    }
483
484
1.48M
    Task task;
485
1.48M
    task.runnable = std::move(r);
486
1.48M
    task.submit_time_wather.start();
487
488
    // Add the task to the token's queue.
489
1.48M
    ThreadPoolToken::State state = token->state();
490
1.48M
    DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
491
1.48M
    token->_entries.emplace_back(std::move(task));
492
    // When we need to execute the task in the token, we submit the token object to the queue.
493
    // There are currently two places where tokens will be submitted to the queue:
494
    // 1. When submitting a new task, if the token is still in the IDLE state,
495
    //    or the concurrency of the token has not reached the online level, it will be added to the queue.
496
    // 2. When the dispatch thread finishes executing a task:
497
    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
498
    //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
499
    //       then submitted to the queue.
500
1.48M
    if (token->need_dispatch()) {
501
1.39M
        _queue.emplace_back(token);
502
1.39M
        ++token->_num_submitted_tasks;
503
1.39M
        if (state == ThreadPoolToken::State::IDLE) {
504
726k
            token->transition(ThreadPoolToken::State::RUNNING);
505
726k
        }
506
1.39M
    } else {
507
87.0k
        ++token->_num_unsubmitted_tasks;
508
87.0k
    }
509
1.48M
    _total_queued_tasks++;
510
511
    // Wake up an idle thread for this task. Choosing the thread at the front of
512
    // the list ensures LIFO semantics as idling threads are also added to the front.
513
    //
514
    // If there are no idle threads, the new task remains on the queue and is
515
    // processed by an active thread (or a thread we're about to create) at some
516
    // point in the future.
517
1.48M
    if (!_idle_threads.empty()) {
518
1.34M
        _idle_threads.front().not_empty.notify_one();
519
1.34M
        _idle_threads.pop_front();
520
1.34M
    }
521
1.48M
    l.unlock();
522
523
1.48M
    if (need_a_thread) {
524
38.6k
        Status status = create_thread();
525
38.6k
        if (!status.ok()) {
526
0
            l.lock();
527
0
            _num_threads_pending_start--;
528
0
            if (_num_threads + _num_threads_pending_start == 0) {
529
                // If we have no threads, we can't do any work.
530
0
                return status;
531
0
            }
532
            // If we failed to create a thread, but there are still some other
533
            // worker threads, log a warning message and continue.
534
0
            LOG(WARNING) << "Thread pool " << _name
535
0
                         << " failed to create thread: " << status.to_string();
536
0
        }
537
38.6k
    }
538
539
1.48M
    return Status::OK();
540
1.48M
}
541
542
115
void ThreadPool::wait() {
543
115
    std::unique_lock<std::mutex> l(_lock);
544
115
    check_not_pool_thread_unlocked();
545
193
    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
546
115
}
547
548
48.1k
void ThreadPool::dispatch_thread() {
549
48.1k
    std::unique_lock<std::mutex> l(_lock);
550
48.1k
    if (!_threads.insert(Thread::current_thread()).second) {
551
0
        throw Exception(Status::InternalError("duplicate token"));
552
0
    }
553
48.1k
    DCHECK_GT(_num_threads_pending_start, 0);
554
48.1k
    _num_threads++;
555
48.1k
    _num_threads_pending_start--;
556
557
48.1k
    if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) {
558
0
        static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
559
0
    }
560
561
    // Owned by this worker thread and added/removed from _idle_threads as needed.
562
48.1k
    IdleThread me;
563
564
14.2M
    while (true) {
565
        // Note: Status::Aborted() is used to indicate normal shutdown.
566
14.2M
        if (!_pool_status.ok()) {
567
4.25k
            VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
568
4.25k
            break;
569
4.25k
        }
570
571
14.2M
        if (_num_threads + _num_threads_pending_start > _max_threads) {
572
2
            break;
573
2
        }
574
575
14.2M
        if (_queue.empty()) {
576
            // There's no work to do, let's go idle.
577
            //
578
            // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
579
12.7M
            _idle_threads.push_front(me);
580
12.7M
            Defer defer = [&] {
581
                // For some wake ups (i.e. shutdown or do_submit) this thread is
582
                // guaranteed to be unlinked after being awakened. In others (i.e.
583
                // spurious wake-up or Wait timeout), it'll still be linked.
584
12.7M
                if (me.is_linked()) {
585
11.4M
                    _idle_threads.erase(_idle_threads.iterator_to(me));
586
11.4M
                }
587
12.7M
            };
588
12.7M
            if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
589
                // After much investigation, it appears that pthread condition variables have
590
                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
591
                // another thread did in fact signal. Apparently after a timeout there is some
592
                // brief period during which another thread may actually grab the internal mutex
593
                // protecting the state, signal, and release again before we get the mutex. So,
594
                // we'll recheck the empty queue case regardless.
595
11.4M
                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
596
39.3k
                    VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
597
504
                                << std::chrono::duration_cast<std::chrono::milliseconds>(
598
504
                                           _idle_timeout)
599
504
                                           .count()
600
504
                                << "ms of idle time.";
601
39.3k
                    break;
602
39.3k
                }
603
11.4M
            }
604
12.7M
            continue;
605
12.7M
        }
606
607
1.48M
        MonotonicStopWatch task_execution_time_watch;
608
1.48M
        task_execution_time_watch.start();
609
        // Get the next token and task to execute.
610
1.48M
        ThreadPoolToken* token = _queue.front();
611
1.48M
        _queue.pop_front();
612
1.48M
        DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
613
1.48M
        DCHECK(!token->_entries.empty());
614
1.48M
        Task task = std::move(token->_entries.front());
615
1.48M
        thread_pool_task_wait_worker_time_ns_total->increment(
616
1.48M
                task.submit_time_wather.elapsed_time());
617
1.48M
        thread_pool_task_wait_worker_count_total->increment(1);
618
1.48M
        token->_entries.pop_front();
619
1.48M
        token->_active_threads++;
620
1.48M
        --_total_queued_tasks;
621
1.48M
        ++_active_threads;
622
623
1.48M
        l.unlock();
624
625
        // Execute the task
626
1.48M
        task.runnable->run();
627
        // Destruct the task while we do not hold the lock.
628
        //
629
        // The task's destructor may be expensive if it has a lot of bound
630
        // objects, and we don't want to block submission of the threadpool.
631
        // In the worst case, the destructor might even try to do something
632
        // with this threadpool, and produce a deadlock.
633
1.48M
        task.runnable.reset();
634
1.48M
        l.lock();
635
1.48M
        thread_pool_task_execution_time_ns_total->increment(
636
1.48M
                task_execution_time_watch.elapsed_time());
637
1.48M
        thread_pool_task_execution_count_total->increment(1);
638
        // Possible states:
639
        // 1. The token was shut down while we ran its task. Transition to QUIESCED.
640
        // 2. The token has no more queued tasks. Transition back to IDLE.
641
        // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
642
1.48M
        ThreadPoolToken::State state = token->state();
643
1.48M
        DCHECK(state == ThreadPoolToken::State::RUNNING ||
644
1.48M
               state == ThreadPoolToken::State::QUIESCING);
645
1.48M
        --token->_active_threads;
646
1.48M
        --token->_num_submitted_tasks;
647
648
        // handle shutdown && idle
649
1.48M
        if (token->_active_threads == 0) {
650
823k
            if (state == ThreadPoolToken::State::QUIESCING) {
651
883
                DCHECK(token->_entries.empty());
652
883
                token->transition(ThreadPoolToken::State::QUIESCED);
653
822k
            } else if (token->_entries.empty()) {
654
725k
                token->transition(ThreadPoolToken::State::IDLE);
655
725k
            }
656
823k
        }
657
658
        // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
659
1.48M
        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
660
661
        // If token->state is running and there are unsubmitted tasks in the token, we put
662
        // the token back.
663
1.48M
        if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
664
            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
665
            // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
666
            // threads are running for the token.
667
89.9k
            _queue.emplace_back(token);
668
89.9k
            ++token->_num_submitted_tasks;
669
89.9k
            --token->_num_unsubmitted_tasks;
670
89.9k
        }
671
672
1.48M
        if (--_active_threads == 0) {
673
687k
            _idle_cond.notify_all();
674
687k
        }
675
1.48M
    }
676
677
    // It's important that we hold the lock between exiting the loop and dropping
678
    // _num_threads. Otherwise it's possible someone else could come along here
679
    // and add a new task just as the last running thread is about to exit.
680
48.1k
    CHECK(l.owns_lock());
681
682
48.1k
    CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
683
48.1k
    _num_threads--;
684
48.1k
    if (_num_threads + _num_threads_pending_start == 0) {
685
1.40k
        _no_threads_cond.notify_all();
686
687
        // Sanity check: if we're the last thread exiting, the queue ought to be
688
        // empty. Otherwise it will never get processed.
689
1.40k
        CHECK(_queue.empty());
690
1.40k
        DCHECK_EQ(0, _total_queued_tasks);
691
1.40k
    }
692
48.1k
}
693
694
48.1k
Status ThreadPool::create_thread() {
695
48.1k
    DBUG_EXECUTE_IF("ThreadPool.create_thread.inject_error", {
696
48.1k
        auto pool_name = dp->param<std::string>("name", "");
697
48.1k
        if (pool_name.empty() || pool_name == _name) {
698
48.1k
            return Status::InternalError("ThreadPool.create_thread.inject_error");
699
48.1k
        }
700
48.1k
    });
701
48.1k
    return Thread::create("thread pool", absl::Substitute("$0 [worker]", _name),
702
48.1k
                          &ThreadPool::dispatch_thread, this, nullptr);
703
48.1k
}
704
705
973k
void ThreadPool::check_not_pool_thread_unlocked() {
706
973k
    Thread* current = Thread::current_thread();
707
973k
    if (_threads.contains(current)) {
708
0
        throw Exception(
709
0
                Status::FatalError("Thread belonging to thread pool {} with "
710
0
                                   "name {} called pool function that would result in deadlock",
711
0
                                   _name, current->name()));
712
0
    }
713
973k
}
714
715
28
Status ThreadPool::set_min_threads(int min_threads) {
716
28
    std::lock_guard<std::mutex> l(_lock);
717
28
    if (min_threads > _max_threads) {
718
        // min threads can not be set greater than max threads
719
3
        return Status::InternalError("set thread pool {} min_threads failed", _name);
720
3
    }
721
25
    _min_threads = min_threads;
722
25
    if (min_threads > _num_threads + _num_threads_pending_start) {
723
5
        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
724
5
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
725
5
    }
726
25
    return Status::OK();
727
25
}
728
729
32
Status ThreadPool::set_max_threads(int max_threads) {
730
32
    std::lock_guard<std::mutex> l(_lock);
731
32
    DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", {
732
32
        _max_threads = max_threads;
733
32
        return Status::OK();
734
32
    })
735
32
    if (_min_threads > max_threads) {
736
        // max threads can not be set less than min threads
737
2
        return Status::InternalError("set thread pool {} max_threads failed", _name);
738
2
    }
739
740
30
    _max_threads = max_threads;
741
30
    if (_max_threads > _num_threads + _num_threads_pending_start) {
742
16
        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
743
16
        addition_threads = std::min(addition_threads, _total_queued_tasks);
744
16
        RETURN_IF_ERROR(try_create_thread(addition_threads, l));
745
16
    }
746
30
    return Status::OK();
747
30
}
748
749
0
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
750
0
    return o << ThreadPoolToken::state_to_string(s);
751
0
}
752
753
} // namespace doris