Coverage Report

Created: 2025-09-05 16:47

/root/doris/be/src/util/threadpool.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
19
// and modified by Doris
20
21
#include "util/threadpool.h"
22
23
#include <algorithm>
24
#include <cstdint>
25
#include <limits>
26
#include <ostream>
27
#include <thread>
28
#include <utility>
29
30
#include "common/logging.h"
31
#include "gutil/map-util.h"
32
#include "gutil/port.h"
33
#include "gutil/strings/substitute.h"
34
#include "util/debug/sanitizer_scopes.h"
35
#include "util/debug_points.h"
36
#include "util/doris_metrics.h"
37
#include "util/metrics.h"
38
#include "util/scoped_cleanup.h"
39
#include "util/stopwatch.hpp"
40
#include "util/thread.h"
41
42
namespace doris {
43
// The name of these varialbs will be useds as metric name in prometheus.
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
45
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
46
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
47
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
48
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
49
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total,
50
                                     MetricUnit::NANOSECONDS);
51
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT);
52
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total,
53
                                     MetricUnit::NANOSECONDS);
54
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT);
55
using namespace ErrorCode;
56
57
using std::string;
58
using strings::Substitute;
59
60
class FunctionRunnable : public Runnable {
61
public:
62
1.74M
    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
63
64
1.74M
    void run() override { _func(); }
65
66
private:
67
    std::function<void()> _func;
68
};
69
70
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
71
        : _name(std::move(name)),
72
          _workload_group(std::move(workload_group)),
73
          _min_threads(0),
74
          _max_threads(std::thread::hardware_concurrency()),
75
          _max_queue_size(std::numeric_limits<int>::max()),
76
772
          _idle_timeout(std::chrono::milliseconds(500)) {}
77
78
730
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
79
730
    CHECK_GE(min_threads, 0);
80
730
    _min_threads = min_threads;
81
730
    return *this;
82
730
}
83
84
744
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
85
744
    CHECK_GT(max_threads, 0);
86
744
    _max_threads = max_threads;
87
744
    return *this;
88
744
}
89
90
206
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
91
206
    _max_queue_size = max_queue_size;
92
206
    return *this;
93
206
}
94
95
131
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) {
96
131
    _cgroup_cpu_ctl = cgroup_cpu_ctl;
97
131
    return *this;
98
131
}
99
100
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
101
                                 int max_concurrency)
102
        : _mode(mode),
103
          _pool(pool),
104
          _state(State::IDLE),
105
          _active_threads(0),
106
          _max_concurrency(max_concurrency),
107
          _num_submitted_tasks(0),
108
294k
          _num_unsubmitted_tasks(0) {
109
294k
    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
110
48.4k
        _mode = ThreadPool::ExecutionMode::SERIAL;
111
48.4k
    }
112
294k
}
113
114
293k
ThreadPoolToken::~ThreadPoolToken() {
115
293k
    shutdown();
116
293k
    _pool->release_token(this);
117
293k
}
118
119
133k
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
120
133k
    return _pool->do_submit(std::move(r), this);
121
133k
}
122
123
133k
Status ThreadPoolToken::submit_func(std::function<void()> f) {
124
133k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
125
133k
}
126
127
528k
void ThreadPoolToken::shutdown() {
128
528k
    std::unique_lock<std::mutex> l(_pool->_lock);
129
528k
    _pool->check_not_pool_thread_unlocked();
130
131
    // Clear the queue under the lock, but defer the releasing of the tasks
132
    // outside the lock, in case there are concurrent threads wanting to access
133
    // the ThreadPool. The task's destructors may acquire locks, etc, so this
134
    // also prevents lock inversions.
135
528k
    std::deque<ThreadPool::Task> to_release = std::move(_entries);
136
528k
    _pool->_total_queued_tasks -= to_release.size();
137
138
528k
    switch (state()) {
139
292k
    case State::IDLE:
140
        // There were no tasks outstanding; we can quiesce the token immediately.
141
292k
        transition(State::QUIESCED);
142
292k
        break;
143
785
    case State::RUNNING:
144
        // There were outstanding tasks. If any are still running, switch to
145
        // QUIESCING and wait for them to finish (the worker thread executing
146
        // the token's last task will switch the token to QUIESCED). Otherwise,
147
        // we can quiesce the token immediately.
148
149
        // Note: this is an O(n) operation, but it's expected to be infrequent.
150
        // Plus doing it this way (rather than switching to QUIESCING and waiting
151
        // for a worker thread to process the queue entry) helps retain state
152
        // transition symmetry with ThreadPool::shutdown.
153
2.11k
        for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
154
1.32k
            if (*it == this) {
155
170
                it = _pool->_queue.erase(it);
156
1.15k
            } else {
157
1.15k
                it++;
158
1.15k
            }
159
1.32k
        }
160
161
785
        if (_active_threads == 0) {
162
79
            transition(State::QUIESCED);
163
79
            break;
164
79
        }
165
706
        transition(State::QUIESCING);
166
706
        [[fallthrough]];
167
716
    case State::QUIESCING:
168
        // The token is already quiescing. Just wait for a worker thread to
169
        // switch it to QUIESCED.
170
1.43k
        _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
171
716
        break;
172
235k
    default:
173
235k
        break;
174
528k
    }
175
528k
}
176
177
114k
void ThreadPoolToken::wait() {
178
114k
    std::unique_lock<std::mutex> l(_pool->_lock);
179
114k
    _pool->check_not_pool_thread_unlocked();
180
129k
    _not_running_cond.wait(l, [this]() { return !is_active(); });
181
114k
}
182
183
1.06M
void ThreadPoolToken::transition(State new_state) {
184
1.06M
#ifndef NDEBUG
185
1.06M
    CHECK_NE(_state, new_state);
186
187
1.06M
    switch (_state) {
188
679k
    case State::IDLE:
189
679k
        CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
190
679k
        if (new_state == State::RUNNING) {
191
386k
            CHECK(!_entries.empty());
192
386k
        } else {
193
292k
            CHECK(_entries.empty());
194
292k
            CHECK_EQ(_active_threads, 0);
195
292k
        }
196
679k
        break;
197
386k
    case State::RUNNING:
198
386k
        CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
199
386k
              new_state == State::QUIESCED);
200
386k
        CHECK(_entries.empty());
201
386k
        if (new_state == State::QUIESCING) {
202
733
            CHECK_GT(_active_threads, 0);
203
733
        }
204
386k
        break;
205
733
    case State::QUIESCING:
206
733
        CHECK(new_state == State::QUIESCED);
207
733
        CHECK_EQ(_active_threads, 0);
208
733
        break;
209
0
    case State::QUIESCED:
210
0
        CHECK(false); // QUIESCED is a terminal state
211
0
        break;
212
0
    default:
213
0
        LOG(FATAL) << "Unknown token state: " << _state;
214
1.06M
    }
215
1.06M
#endif
216
217
    // Take actions based on the state we're entering.
218
1.06M
    switch (new_state) {
219
385k
    case State::IDLE:
220
679k
    case State::QUIESCED:
221
679k
        _not_running_cond.notify_all();
222
679k
        break;
223
387k
    default:
224
387k
        break;
225
1.06M
    }
226
227
1.06M
    _state = new_state;
228
1.06M
}
229
230
0
const char* ThreadPoolToken::state_to_string(State s) {
231
0
    switch (s) {
232
0
    case State::IDLE:
233
0
        return "IDLE";
234
0
        break;
235
0
    case State::RUNNING:
236
0
        return "RUNNING";
237
0
        break;
238
0
    case State::QUIESCING:
239
0
        return "QUIESCING";
240
0
        break;
241
0
    case State::QUIESCED:
242
0
        return "QUIESCED";
243
0
        break;
244
0
    }
245
0
    return "<cannot reach here>";
246
0
}
247
248
1.79M
bool ThreadPoolToken::need_dispatch() {
249
1.79M
    return _state == ThreadPoolToken::State::IDLE ||
250
1.79M
           (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
251
1.41M
            _num_submitted_tasks < _max_concurrency);
252
1.79M
}
253
254
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
255
        : _name(builder._name),
256
          _workload_group(builder._workload_group),
257
          _min_threads(builder._min_threads),
258
          _max_threads(builder._max_threads),
259
          _max_queue_size(builder._max_queue_size),
260
          _idle_timeout(builder._idle_timeout),
261
          _pool_status(Status::Uninitialized("The pool was not initialized.")),
262
          _num_threads(0),
263
          _num_threads_pending_start(0),
264
          _active_threads(0),
265
          _total_queued_tasks(0),
266
          _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
267
          _tokenless(new_token(ExecutionMode::CONCURRENT)),
268
726
          _id(UniqueId::gen_uid()) {}
269
270
465
ThreadPool::~ThreadPool() {
271
    // There should only be one live token: the one used in tokenless submission.
272
465
    CHECK_EQ(1, _tokens.size()) << strings::Substitute(
273
0
            "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
274
465
    shutdown();
275
465
}
276
277
726
Status ThreadPool::init() {
278
726
    if (!_pool_status.is<UNINITIALIZED>()) {
279
0
        return Status::NotSupported("The thread pool {} is already initialized", _name);
280
0
    }
281
726
    _pool_status = Status::OK();
282
726
    _num_threads_pending_start = _min_threads;
283
9.24k
    for (int i = 0; i < _min_threads; i++) {
284
8.51k
        Status status = create_thread();
285
8.51k
        if (!status.ok()) {
286
0
            shutdown();
287
0
            return status;
288
0
        }
289
8.51k
    }
290
    // _id of thread pool is used to make sure when we create thread pool with same name, we can
291
    // get different _metric_entity
292
    // If not, we will have problem when we deregister entity and register hook.
293
726
    _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
294
726
            fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
295
726
                                                   {"workload_group", _workload_group},
296
726
                                                   {"id", _id.to_string()}});
297
298
726
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
299
726
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
300
726
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
301
726
    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
302
726
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total);
303
726
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total);
304
726
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total);
305
726
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total);
306
726
    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
307
308
22.4k
    _metric_entity->register_hook("update", [this]() {
309
22.4k
        {
310
22.4k
            std::lock_guard<std::mutex> l(_lock);
311
22.4k
            if (!_pool_status.ok()) {
312
0
                return;
313
0
            }
314
22.4k
        }
315
316
22.4k
        thread_pool_active_threads->set_value(num_active_threads());
317
22.4k
        thread_pool_queue_size->set_value(get_queue_size());
318
22.4k
        thread_pool_max_queue_size->set_value(get_max_queue_size());
319
22.4k
        thread_pool_max_threads->set_value(max_threads());
320
22.4k
    });
321
726
    return Status::OK();
322
726
}
323
324
853
void ThreadPool::shutdown() {
325
    // Why access to doris_metrics is safe here?
326
    // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
327
    // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
328
    // ExecEnv::destroy().
329
853
    DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
330
853
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
331
853
    std::unique_lock<std::mutex> l(_lock);
332
853
    check_not_pool_thread_unlocked();
333
334
    // Note: this is the same error seen at submission if the pool is at
335
    // capacity, so clients can't tell them apart. This isn't really a practical
336
    // concern though because shutting down a pool typically requires clients to
337
    // be quiesced first, so there's no danger of a client getting confused.
338
    // Not print stack trace here
339
853
    _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
340
853
            "The thread pool {} has been shut down.", _name);
341
342
    // Clear the various queues under the lock, but defer the releasing
343
    // of the tasks outside the lock, in case there are concurrent threads
344
    // wanting to access the ThreadPool. The task's destructors may acquire
345
    // locks, etc, so this also prevents lock inversions.
346
853
    _queue.clear();
347
348
853
    std::deque<std::deque<Task>> to_release;
349
938
    for (auto* t : _tokens) {
350
938
        if (!t->_entries.empty()) {
351
13
            to_release.emplace_back(std::move(t->_entries));
352
13
        }
353
938
        switch (t->state()) {
354
487
        case ThreadPoolToken::State::IDLE:
355
            // The token is idle; we can quiesce it immediately.
356
487
            t->transition(ThreadPoolToken::State::QUIESCED);
357
487
            break;
358
35
        case ThreadPoolToken::State::RUNNING:
359
            // The token has tasks associated with it. If they're merely queued
360
            // (i.e. there are no active threads), the tasks will have been removed
361
            // above and we can quiesce immediately. Otherwise, we need to wait for
362
            // the threads to finish.
363
35
            t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
364
35
                                                 : ThreadPoolToken::State::QUIESCED);
365
35
            break;
366
416
        default:
367
416
            break;
368
938
        }
369
938
    }
370
371
    // The queues are empty. Wake any sleeping worker threads and wait for all
372
    // of them to exit. Some worker threads will exit immediately upon waking,
373
    // while others will exit after they finish executing an outstanding task.
374
853
    _total_queued_tasks = 0;
375
4.82k
    while (!_idle_threads.empty()) {
376
3.96k
        _idle_threads.front().not_empty.notify_one();
377
3.96k
        _idle_threads.pop_front();
378
3.96k
    }
379
380
1.29k
    _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
381
382
    // All the threads have exited. Check the state of each token.
383
938
    for (auto* t : _tokens) {
384
938
        DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
385
938
               t->state() == ThreadPoolToken::State::QUIESCED);
386
938
    }
387
853
}
388
389
292k
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
390
292k
    std::lock_guard<std::mutex> l(_lock);
391
292k
    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
392
292k
    InsertOrDie(&_tokens, t.get());
393
292k
    return t;
394
292k
}
395
396
293k
void ThreadPool::release_token(ThreadPoolToken* t) {
397
293k
    std::lock_guard<std::mutex> l(_lock);
398
293k
    CHECK(!t->is_active()) << strings::Substitute("Token with state $0 may not be released",
399
2
                                                  ThreadPoolToken::state_to_string(t->state()));
400
293k
    CHECK_EQ(1, _tokens.erase(t));
401
293k
}
402
403
1.66M
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
404
1.66M
    return do_submit(std::move(r), _tokenless.get());
405
1.66M
}
406
407
1.61M
Status ThreadPool::submit_func(std::function<void()> f) {
408
1.61M
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
409
1.61M
}
410
411
1.79M
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
412
1.79M
    DCHECK(token);
413
414
1.79M
    std::unique_lock<std::mutex> l(_lock);
415
1.79M
    if (PREDICT_FALSE(!_pool_status.ok())) {
416
1
        return _pool_status;
417
1
    }
418
419
1.79M
    if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
420
2.37k
        return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
421
2.37k
    }
422
423
    // Size limit check.
424
1.79M
    int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
425
1.79M
                                 static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
426
1.79M
    if (capacity_remaining < 1) {
427
4
        thread_pool_submit_failed->increment(1);
428
4
        return Status::Error<SERVICE_UNAVAILABLE>(
429
4
                "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
430
4
                _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
431
4
                _max_queue_size);
432
4
    }
433
434
    // Should we create another thread?
435
436
    // We assume that each current inactive thread will grab one item from the
437
    // queue.  If it seems like we'll need another thread, we create one.
438
    //
439
    // Rather than creating the thread here, while holding the lock, we defer
440
    // it to down below. This is because thread creation can be rather slow
441
    // (hundreds of milliseconds in some cases) and we'd like to allow the
442
    // existing threads to continue to process tasks while we do so.
443
    //
444
    // In theory, a currently active thread could finish immediately after this
445
    // calculation but before our new worker starts running. This would mean we
446
    // created a thread we didn't really need. However, this race is unavoidable
447
    // and harmless.
448
    //
449
    // Of course, we never create more than _max_threads threads no matter what.
450
1.79M
    int threads_from_this_submit =
451
1.79M
            token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
452
1.79M
    int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
453
1.79M
    int additional_threads =
454
1.79M
            static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
455
1.79M
    bool need_a_thread = false;
456
1.79M
    if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
457
20.3k
        need_a_thread = true;
458
20.3k
        _num_threads_pending_start++;
459
20.3k
    }
460
461
1.79M
    Task task;
462
1.79M
    task.runnable = std::move(r);
463
1.79M
    task.submit_time_wather.start();
464
465
    // Add the task to the token's queue.
466
1.79M
    ThreadPoolToken::State state = token->state();
467
1.79M
    DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
468
1.79M
    token->_entries.emplace_back(std::move(task));
469
    // When we need to execute the task in the token, we submit the token object to the queue.
470
    // There are currently two places where tokens will be submitted to the queue:
471
    // 1. When submitting a new task, if the token is still in the IDLE state,
472
    //    or the concurrency of the token has not reached the online level, it will be added to the queue.
473
    // 2. When the dispatch thread finishes executing a task:
474
    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
475
    //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
476
    //       then submitted to the queue.
477
1.79M
    if (token->need_dispatch()) {
478
1.79M
        _queue.emplace_back(token);
479
1.79M
        ++token->_num_submitted_tasks;
480
1.79M
        if (state == ThreadPoolToken::State::IDLE) {
481
386k
            token->transition(ThreadPoolToken::State::RUNNING);
482
386k
        }
483
1.79M
    } else {
484
2.73k
        ++token->_num_unsubmitted_tasks;
485
2.73k
    }
486
1.79M
    _total_queued_tasks++;
487
488
    // Wake up an idle thread for this task. Choosing the thread at the front of
489
    // the list ensures LIFO semantics as idling threads are also added to the front.
490
    //
491
    // If there are no idle threads, the new task remains on the queue and is
492
    // processed by an active thread (or a thread we're about to create) at some
493
    // point in the future.
494
1.79M
    if (!_idle_threads.empty()) {
495
1.39M
        _idle_threads.front().not_empty.notify_one();
496
1.39M
        _idle_threads.pop_front();
497
1.39M
    }
498
1.79M
    l.unlock();
499
500
1.79M
    if (need_a_thread) {
501
20.3k
        Status status = create_thread();
502
20.3k
        if (!status.ok()) {
503
0
            l.lock();
504
0
            _num_threads_pending_start--;
505
0
            if (_num_threads + _num_threads_pending_start == 0) {
506
                // If we have no threads, we can't do any work.
507
0
                return status;
508
0
            }
509
            // If we failed to create a thread, but there are still some other
510
            // worker threads, log a warning message and continue.
511
0
            LOG(WARNING) << "Thread pool " << _name
512
0
                         << " failed to create thread: " << status.to_string();
513
0
        }
514
20.3k
    }
515
516
1.79M
    return Status::OK();
517
1.79M
}
518
519
159
void ThreadPool::wait() {
520
159
    std::unique_lock<std::mutex> l(_lock);
521
159
    check_not_pool_thread_unlocked();
522
234
    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
523
159
}
524
525
28.9k
void ThreadPool::dispatch_thread() {
526
28.9k
    std::unique_lock<std::mutex> l(_lock);
527
28.9k
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
528
28.9k
    InsertOrDie(&_threads, Thread::current_thread());
529
28.9k
    DCHECK_GT(_num_threads_pending_start, 0);
530
28.9k
    _num_threads++;
531
28.9k
    _num_threads_pending_start--;
532
533
28.9k
    if (_cgroup_cpu_ctl != nullptr) {
534
0
        static_cast<void>(_cgroup_cpu_ctl->add_thread_to_cgroup());
535
0
    }
536
537
    // Owned by this worker thread and added/removed from _idle_threads as needed.
538
28.9k
    IdleThread me;
539
540
12.6M
    while (true) {
541
        // Note: Status::Aborted() is used to indicate normal shutdown.
542
12.6M
        if (!_pool_status.ok()) {
543
4.15k
            VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
544
4.15k
            break;
545
4.15k
        }
546
547
12.6M
        if (_num_threads + _num_threads_pending_start > _max_threads) {
548
58
            break;
549
58
        }
550
551
12.6M
        if (_queue.empty()) {
552
            // There's no work to do, let's go idle.
553
            //
554
            // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
555
10.8M
            _idle_threads.push_front(me);
556
10.8M
            SCOPED_CLEANUP({
557
                // For some wake ups (i.e. shutdown or do_submit) this thread is
558
                // guaranteed to be unlinked after being awakened. In others (i.e.
559
                // spurious wake-up or Wait timeout), it'll still be linked.
560
10.8M
                if (me.is_linked()) {
561
10.8M
                    _idle_threads.erase(_idle_threads.iterator_to(me));
562
10.8M
                }
563
10.8M
            });
564
10.8M
            if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
565
                // After much investigation, it appears that pthread condition variables have
566
                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
567
                // another thread did in fact signal. Apparently after a timeout there is some
568
                // brief period during which another thread may actually grab the internal mutex
569
                // protecting the state, signal, and release again before we get the mutex. So,
570
                // we'll recheck the empty queue case regardless.
571
9.43M
                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
572
20.3k
                    VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
573
0
                                << std::chrono::duration_cast<std::chrono::milliseconds>(
574
0
                                           _idle_timeout)
575
0
                                           .count()
576
0
                                << "ms of idle time.";
577
20.3k
                    break;
578
20.3k
                }
579
9.43M
            }
580
10.8M
            continue;
581
10.8M
        }
582
583
1.79M
        MonotonicStopWatch task_execution_time_watch;
584
1.79M
        task_execution_time_watch.start();
585
        // Get the next token and task to execute.
586
1.79M
        ThreadPoolToken* token = _queue.front();
587
1.79M
        _queue.pop_front();
588
1.79M
        DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
589
1.79M
        DCHECK(!token->_entries.empty());
590
1.79M
        Task task = std::move(token->_entries.front());
591
1.79M
        thread_pool_task_wait_worker_time_ns_total->increment(
592
1.79M
                task.submit_time_wather.elapsed_time());
593
1.79M
        thread_pool_task_wait_worker_count_total->increment(1);
594
1.79M
        token->_entries.pop_front();
595
1.79M
        token->_active_threads++;
596
1.79M
        --_total_queued_tasks;
597
1.79M
        ++_active_threads;
598
599
1.79M
        l.unlock();
600
601
        // Execute the task
602
1.79M
        task.runnable->run();
603
        // Destruct the task while we do not hold the lock.
604
        //
605
        // The task's destructor may be expensive if it has a lot of bound
606
        // objects, and we don't want to block submission of the threadpool.
607
        // In the worst case, the destructor might even try to do something
608
        // with this threadpool, and produce a deadlock.
609
1.79M
        task.runnable.reset();
610
1.79M
        l.lock();
611
1.79M
        thread_pool_task_execution_time_ns_total->increment(
612
1.79M
                task_execution_time_watch.elapsed_time());
613
1.79M
        thread_pool_task_execution_count_total->increment(1);
614
        // Possible states:
615
        // 1. The token was shut down while we ran its task. Transition to QUIESCED.
616
        // 2. The token has no more queued tasks. Transition back to IDLE.
617
        // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
618
1.79M
        ThreadPoolToken::State state = token->state();
619
1.79M
        DCHECK(state == ThreadPoolToken::State::RUNNING ||
620
1.79M
               state == ThreadPoolToken::State::QUIESCING);
621
1.79M
        --token->_active_threads;
622
1.79M
        --token->_num_submitted_tasks;
623
624
        // handle shutdown && idle
625
1.79M
        if (token->_active_threads == 0) {
626
395k
            if (state == ThreadPoolToken::State::QUIESCING) {
627
733
                DCHECK(token->_entries.empty());
628
733
                token->transition(ThreadPoolToken::State::QUIESCED);
629
394k
            } else if (token->_entries.empty()) {
630
385k
                token->transition(ThreadPoolToken::State::IDLE);
631
385k
            }
632
395k
        }
633
634
        // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
635
1.79M
        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
636
637
        // If token->state is running and there are unsubmitted tasks in the token, we put
638
        // the token back.
639
1.79M
        if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
640
            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
641
            // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
642
            // threads are running for the token.
643
4.63k
            _queue.emplace_back(token);
644
4.63k
            ++token->_num_submitted_tasks;
645
4.63k
            --token->_num_unsubmitted_tasks;
646
4.63k
        }
647
648
1.79M
        if (--_active_threads == 0) {
649
373k
            _idle_cond.notify_all();
650
373k
        }
651
1.79M
    }
652
653
    // It's important that we hold the lock between exiting the loop and dropping
654
    // _num_threads. Otherwise it's possible someone else could come along here
655
    // and add a new task just as the last running thread is about to exit.
656
28.9k
    CHECK(l.owns_lock());
657
658
28.9k
    CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
659
28.9k
    _num_threads--;
660
28.9k
    if (_num_threads + _num_threads_pending_start == 0) {
661
1.00k
        _no_threads_cond.notify_all();
662
663
        // Sanity check: if we're the last thread exiting, the queue ought to be
664
        // empty. Otherwise it will never get processed.
665
1.00k
        CHECK(_queue.empty());
666
1.00k
        DCHECK_EQ(0, _total_queued_tasks);
667
1.00k
    }
668
28.9k
}
669
670
28.9k
Status ThreadPool::create_thread() {
671
28.9k
    return Thread::create("thread pool", strings::Substitute("$0 [worker]", _name),
672
28.9k
                          &ThreadPool::dispatch_thread, this, nullptr);
673
28.9k
}
674
675
643k
void ThreadPool::check_not_pool_thread_unlocked() {
676
643k
    Thread* current = Thread::current_thread();
677
643k
    if (ContainsKey(_threads, current)) {
678
0
        LOG(FATAL) << strings::Substitute(
679
0
                "Thread belonging to thread pool '$0' with "
680
0
                "name '$1' called pool function that would result in deadlock",
681
0
                _name, current->name());
682
0
    }
683
643k
}
684
685
7
Status ThreadPool::set_min_threads(int min_threads) {
686
7
    std::lock_guard<std::mutex> l(_lock);
687
7
    if (min_threads > _max_threads) {
688
        // min threads can not be set greater than max threads
689
1
        return Status::InternalError("set thread pool {} min_threads failed", _name);
690
1
    }
691
6
    _min_threads = min_threads;
692
6
    if (min_threads > _num_threads + _num_threads_pending_start) {
693
0
        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
694
0
        _num_threads_pending_start += addition_threads;
695
0
        for (int i = 0; i < addition_threads; i++) {
696
0
            Status status = create_thread();
697
0
            if (!status.ok()) {
698
0
                _num_threads_pending_start--;
699
0
                LOG(WARNING) << "Thread pool " << _name
700
0
                             << " failed to create thread: " << status.to_string();
701
0
                return status;
702
0
            }
703
0
        }
704
0
    }
705
6
    return Status::OK();
706
6
}
707
708
11
Status ThreadPool::set_max_threads(int max_threads) {
709
11
    std::lock_guard<std::mutex> l(_lock);
710
11
    DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", {
711
11
        _max_threads = max_threads;
712
11
        return Status::OK();
713
11
    })
714
11
    if (_min_threads > max_threads) {
715
        // max threads can not be set less than min threads
716
1
        return Status::InternalError("set thread pool {} max_threads failed", _name);
717
1
    }
718
719
10
    _max_threads = max_threads;
720
10
    if (_max_threads > _num_threads + _num_threads_pending_start) {
721
5
        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
722
5
        addition_threads = std::min(addition_threads, _total_queued_tasks);
723
5
        _num_threads_pending_start += addition_threads;
724
8
        for (int i = 0; i < addition_threads; i++) {
725
3
            Status status = create_thread();
726
3
            if (!status.ok()) {
727
0
                _num_threads_pending_start--;
728
0
                LOG(WARNING) << "Thread pool " << _name
729
0
                             << " failed to create thread: " << status.to_string();
730
0
                return status;
731
0
            }
732
3
        }
733
5
    }
734
10
    return Status::OK();
735
10
}
736
737
0
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
738
0
    return o << ThreadPoolToken::state_to_string(s);
739
0
}
740
741
} // namespace doris