Coverage Report

Created: 2024-11-22 11:49

/root/doris/be/src/util/threadpool.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
19
// and modified by Doris
20
21
#include "util/threadpool.h"
22
23
#include <algorithm>
24
#include <cstdint>
25
#include <limits>
26
#include <ostream>
27
#include <thread>
28
#include <utility>
29
30
#include "common/logging.h"
31
#include "gutil/map-util.h"
32
#include "gutil/port.h"
33
#include "gutil/strings/substitute.h"
34
#include "util/debug/sanitizer_scopes.h"
35
#include "util/scoped_cleanup.h"
36
#include "util/thread.h"
37
38
namespace doris {
39
using namespace ErrorCode;
40
41
using std::string;
42
using strings::Substitute;
43
44
class FunctionRunnable : public Runnable {
45
public:
46
7.45k
    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
47
48
4.13k
    void run() override { _func(); }
49
50
private:
51
    std::function<void()> _func;
52
};
53
54
ThreadPoolBuilder::ThreadPoolBuilder(string name)
55
        : _name(std::move(name)),
56
          _min_threads(0),
57
          _max_threads(std::thread::hardware_concurrency()),
58
          _max_queue_size(std::numeric_limits<int>::max()),
59
242
          _idle_timeout(std::chrono::milliseconds(500)) {}
60
61
206
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
62
206
    CHECK_GE(min_threads, 0);
63
206
    _min_threads = min_threads;
64
206
    return *this;
65
206
}
66
67
214
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
68
214
    CHECK_GT(max_threads, 0);
69
214
    _max_threads = max_threads;
70
214
    return *this;
71
214
}
72
73
17
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
74
17
    _max_queue_size = max_queue_size;
75
17
    return *this;
76
17
}
77
78
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
79
0
        std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
80
0
    _cgroup_cpu_ctl = cgroup_cpu_ctl;
81
0
    return *this;
82
0
}
83
84
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
85
                                 int max_concurrency)
86
        : _mode(mode),
87
          _pool(pool),
88
          _state(State::IDLE),
89
          _active_threads(0),
90
          _max_concurrency(max_concurrency),
91
          _num_submitted_tasks(0),
92
2.13k
          _num_unsubmitted_tasks(0) {
93
2.13k
    if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
94
1
        _mode = ThreadPool::ExecutionMode::SERIAL;
95
1
    }
96
2.13k
}
97
98
2.13k
ThreadPoolToken::~ThreadPoolToken() {
99
2.13k
    shutdown();
100
2.13k
    _pool->release_token(this);
101
2.13k
}
102
103
6.34k
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
104
6.34k
    return _pool->do_submit(std::move(r), this);
105
6.34k
}
106
107
6.77k
Status ThreadPoolToken::submit_func(std::function<void()> f) {
108
6.77k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
109
6.77k
}
110
111
3.31k
void ThreadPoolToken::shutdown() {
112
3.31k
    std::unique_lock<std::mutex> l(_pool->_lock);
113
3.31k
    _pool->check_not_pool_thread_unlocked();
114
115
    // Clear the queue under the lock, but defer the releasing of the tasks
116
    // outside the lock, in case there are concurrent threads wanting to access
117
    // the ThreadPool. The task's destructors may acquire locks, etc, so this
118
    // also prevents lock inversions.
119
3.31k
    std::deque<ThreadPool::Task> to_release = std::move(_entries);
120
3.31k
    _pool->_total_queued_tasks -= to_release.size();
121
122
3.31k
    switch (state()) {
123
1.03k
    case State::IDLE:
124
        // There were no tasks outstanding; we can quiesce the token immediately.
125
1.03k
        transition(State::QUIESCED);
126
1.03k
        break;
127
864
    case State::RUNNING:
128
        // There were outstanding tasks. If any are still running, switch to
129
        // QUIESCING and wait for them to finish (the worker thread executing
130
        // the token's last task will switch the token to QUIESCED). Otherwise,
131
        // we can quiesce the token immediately.
132
133
        // Note: this is an O(n) operation, but it's expected to be infrequent.
134
        // Plus doing it this way (rather than switching to QUIESCING and waiting
135
        // for a worker thread to process the queue entry) helps retain state
136
        // transition symmetry with ThreadPool::shutdown.
137
7.32k
        for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
138
6.46k
            if (*it == this) {
139
582
                it = _pool->_queue.erase(it);
140
5.88k
            } else {
141
5.88k
                it++;
142
5.88k
            }
143
6.46k
        }
144
145
864
        if (_active_threads == 0) {
146
304
            transition(State::QUIESCED);
147
304
            break;
148
304
        }
149
560
        transition(State::QUIESCING);
150
560
        [[fallthrough]];
151
575
    case State::QUIESCING:
152
        // The token is already quiescing. Just wait for a worker thread to
153
        // switch it to QUIESCED.
154
1.15k
        _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
155
575
        break;
156
1.40k
    default:
157
1.40k
        break;
158
3.31k
    }
159
3.31k
}
160
161
893
void ThreadPoolToken::wait() {
162
893
    std::unique_lock<std::mutex> l(_pool->_lock);
163
893
    _pool->check_not_pool_thread_unlocked();
164
1.09k
    _not_running_cond.wait(l, [this]() { return !is_active(); });
165
893
}
166
167
6.91k
void ThreadPoolToken::transition(State new_state) {
168
6.91k
#ifndef NDEBUG
169
6.91k
    CHECK_NE(_state, new_state);
170
171
6.91k
    switch (_state) {
172
3.80k
    case State::IDLE:
173
3.80k
        CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
174
3.80k
        if (new_state == State::RUNNING) {
175
2.54k
            CHECK(!_entries.empty());
176
2.54k
        } else {
177
1.26k
            CHECK(_entries.empty());
178
1.26k
            CHECK_EQ(_active_threads, 0);
179
1.26k
        }
180
3.80k
        break;
181
2.54k
    case State::RUNNING:
182
2.54k
        CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
183
2.54k
              new_state == State::QUIESCED);
184
2.54k
        CHECK(_entries.empty());
185
2.54k
        if (new_state == State::QUIESCING) {
186
566
            CHECK_GT(_active_threads, 0);
187
566
        }
188
2.54k
        break;
189
566
    case State::QUIESCING:
190
566
        CHECK(new_state == State::QUIESCED);
191
566
        CHECK_EQ(_active_threads, 0);
192
566
        break;
193
0
    case State::QUIESCED:
194
0
        CHECK(false); // QUIESCED is a terminal state
195
0
        break;
196
0
    default:
197
0
        LOG(FATAL) << "Unknown token state: " << _state;
198
6.91k
    }
199
6.91k
#endif
200
201
    // Take actions based on the state we're entering.
202
6.91k
    switch (new_state) {
203
1.67k
    case State::IDLE:
204
3.80k
    case State::QUIESCED:
205
3.80k
        _not_running_cond.notify_all();
206
3.80k
        break;
207
3.10k
    default:
208
3.10k
        break;
209
6.91k
    }
210
211
6.91k
    _state = new_state;
212
6.91k
}
213
214
0
const char* ThreadPoolToken::state_to_string(State s) {
215
0
    switch (s) {
216
0
    case State::IDLE:
217
0
        return "IDLE";
218
0
        break;
219
0
    case State::RUNNING:
220
0
        return "RUNNING";
221
0
        break;
222
0
    case State::QUIESCING:
223
0
        return "QUIESCING";
224
0
        break;
225
0
    case State::QUIESCED:
226
0
        return "QUIESCED";
227
0
        break;
228
0
    }
229
0
    return "<cannot reach here>";
230
0
}
231
232
5.49k
bool ThreadPoolToken::need_dispatch() {
233
5.49k
    return _state == ThreadPoolToken::State::IDLE ||
234
5.49k
           (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
235
2.95k
            _num_submitted_tasks < _max_concurrency);
236
5.49k
}
237
238
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
239
        : _name(builder._name),
240
          _min_threads(builder._min_threads),
241
          _max_threads(builder._max_threads),
242
          _max_queue_size(builder._max_queue_size),
243
          _idle_timeout(builder._idle_timeout),
244
          _pool_status(Status::Uninitialized("The pool was not initialized.")),
245
          _num_threads(0),
246
          _num_threads_pending_start(0),
247
          _active_threads(0),
248
          _total_queued_tasks(0),
249
          _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
250
237
          _tokenless(new_token(ExecutionMode::CONCURRENT)) {}
251
252
236
ThreadPool::~ThreadPool() {
253
    // There should only be one live token: the one used in tokenless submission.
254
236
    CHECK_EQ(1, _tokens.size()) << strings::Substitute(
255
0
            "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
256
236
    shutdown();
257
236
}
258
259
237
Status ThreadPool::init() {
260
237
    if (!_pool_status.is<UNINITIALIZED>()) {
261
0
        return Status::NotSupported("The thread pool {} is already initialized", _name);
262
0
    }
263
237
    _pool_status = Status::OK();
264
237
    _num_threads_pending_start = _min_threads;
265
1.01k
    for (int i = 0; i < _min_threads; i++) {
266
780
        Status status = create_thread();
267
780
        if (!status.ok()) {
268
0
            shutdown();
269
0
            return status;
270
0
        }
271
780
    }
272
237
    return Status::OK();
273
237
}
274
275
385
void ThreadPool::shutdown() {
276
385
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
277
385
    std::unique_lock<std::mutex> l(_lock);
278
385
    check_not_pool_thread_unlocked();
279
280
    // Note: this is the same error seen at submission if the pool is at
281
    // capacity, so clients can't tell them apart. This isn't really a practical
282
    // concern though because shutting down a pool typically requires clients to
283
    // be quiesced first, so there's no danger of a client getting confused.
284
    // Not print stack trace here
285
385
    _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
286
385
            "The thread pool {} has been shut down.", _name);
287
288
    // Clear the various queues under the lock, but defer the releasing
289
    // of the tasks outside the lock, in case there are concurrent threads
290
    // wanting to access the ThreadPool. The task's destructors may acquire
291
    // locks, etc, so this also prevents lock inversions.
292
385
    _queue.clear();
293
294
385
    std::deque<std::deque<Task>> to_release;
295
387
    for (auto* t : _tokens) {
296
387
        if (!t->_entries.empty()) {
297
2
            to_release.emplace_back(std::move(t->_entries));
298
2
        }
299
387
        switch (t->state()) {
300
232
        case ThreadPoolToken::State::IDLE:
301
            // The token is idle; we can quiesce it immediately.
302
232
            t->transition(ThreadPoolToken::State::QUIESCED);
303
232
            break;
304
6
        case ThreadPoolToken::State::RUNNING:
305
            // The token has tasks associated with it. If they're merely queued
306
            // (i.e. there are no active threads), the tasks will have been removed
307
            // above and we can quiesce immediately. Otherwise, we need to wait for
308
            // the threads to finish.
309
6
            t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
310
6
                                                 : ThreadPoolToken::State::QUIESCED);
311
6
            break;
312
149
        default:
313
149
            break;
314
387
        }
315
387
    }
316
317
    // The queues are empty. Wake any sleeping worker threads and wait for all
318
    // of them to exit. Some worker threads will exit immediately upon waking,
319
    // while others will exit after they finish executing an outstanding task.
320
385
    _total_queued_tasks = 0;
321
1.24k
    while (!_idle_threads.empty()) {
322
862
        _idle_threads.front().not_empty.notify_one();
323
862
        _idle_threads.pop_front();
324
862
    }
325
326
599
    _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
327
328
    // All the threads have exited. Check the state of each token.
329
387
    for (auto* t : _tokens) {
330
387
        DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
331
387
               t->state() == ThreadPoolToken::State::QUIESCED);
332
387
    }
333
385
}
334
335
2.13k
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
336
2.13k
    std::lock_guard<std::mutex> l(_lock);
337
2.13k
    std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
338
2.13k
    InsertOrDie(&_tokens, t.get());
339
2.13k
    return t;
340
2.13k
}
341
342
2.13k
void ThreadPool::release_token(ThreadPoolToken* t) {
343
2.13k
    std::lock_guard<std::mutex> l(_lock);
344
2.13k
    CHECK(!t->is_active()) << strings::Substitute("Token with state $0 may not be released",
345
0
                                                  ThreadPoolToken::state_to_string(t->state()));
346
2.13k
    CHECK_EQ(1, _tokens.erase(t));
347
2.13k
}
348
349
1.20k
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
350
1.20k
    return do_submit(std::move(r), _tokenless.get());
351
1.20k
}
352
353
1.04k
Status ThreadPool::submit_func(std::function<void()> f) {
354
1.04k
    return submit(std::make_shared<FunctionRunnable>(std::move(f)));
355
1.04k
}
356
357
7.94k
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
358
7.94k
    DCHECK(token);
359
7.94k
    std::chrono::time_point<std::chrono::system_clock> submit_time =
360
7.94k
            std::chrono::system_clock::now();
361
362
7.94k
    std::unique_lock<std::mutex> l(_lock);
363
7.94k
    if (PREDICT_FALSE(!_pool_status.ok())) {
364
1
        return _pool_status;
365
1
    }
366
367
7.94k
    if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
368
2.54k
        return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
369
2.54k
    }
370
371
    // Size limit check.
372
5.39k
    int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
373
5.39k
                                 static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
374
5.39k
    if (capacity_remaining < 1) {
375
4
        return Status::Error<SERVICE_UNAVAILABLE>(
376
4
                "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
377
4
                _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
378
4
                _max_queue_size);
379
4
    }
380
381
    // Should we create another thread?
382
383
    // We assume that each current inactive thread will grab one item from the
384
    // queue.  If it seems like we'll need another thread, we create one.
385
    //
386
    // Rather than creating the thread here, while holding the lock, we defer
387
    // it to down below. This is because thread creation can be rather slow
388
    // (hundreds of milliseconds in some cases) and we'd like to allow the
389
    // existing threads to continue to process tasks while we do so.
390
    //
391
    // In theory, a currently active thread could finish immediately after this
392
    // calculation but before our new worker starts running. This would mean we
393
    // created a thread we didn't really need. However, this race is unavoidable
394
    // and harmless.
395
    //
396
    // Of course, we never create more than _max_threads threads no matter what.
397
5.39k
    int threads_from_this_submit =
398
5.39k
            token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
399
5.39k
    int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
400
5.39k
    int additional_threads =
401
5.39k
            static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
402
5.39k
    bool need_a_thread = false;
403
5.39k
    if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
404
622
        need_a_thread = true;
405
622
        _num_threads_pending_start++;
406
622
    }
407
408
5.39k
    Task task;
409
5.39k
    task.runnable = std::move(r);
410
5.39k
    task.submit_time = submit_time;
411
412
    // Add the task to the token's queue.
413
5.39k
    ThreadPoolToken::State state = token->state();
414
5.39k
    DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
415
5.39k
    token->_entries.emplace_back(std::move(task));
416
    // When we need to execute the task in the token, we submit the token object to the queue.
417
    // There are currently two places where tokens will be submitted to the queue:
418
    // 1. When submitting a new task, if the token is still in the IDLE state,
419
    //    or the concurrency of the token has not reached the online level, it will be added to the queue.
420
    // 2. When the dispatch thread finishes executing a task:
421
    //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
422
    //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
423
    //       then submitted to the queue.
424
5.39k
    if (token->need_dispatch()) {
425
4.20k
        _queue.emplace_back(token);
426
4.20k
        ++token->_num_submitted_tasks;
427
4.20k
        if (state == ThreadPoolToken::State::IDLE) {
428
2.54k
            token->transition(ThreadPoolToken::State::RUNNING);
429
2.54k
        }
430
4.20k
    } else {
431
1.18k
        ++token->_num_unsubmitted_tasks;
432
1.18k
    }
433
5.39k
    _total_queued_tasks++;
434
435
    // Wake up an idle thread for this task. Choosing the thread at the front of
436
    // the list ensures LIFO semantics as idling threads are also added to the front.
437
    //
438
    // If there are no idle threads, the new task remains on the queue and is
439
    // processed by an active thread (or a thread we're about to create) at some
440
    // point in the future.
441
5.39k
    if (!_idle_threads.empty()) {
442
1.36k
        _idle_threads.front().not_empty.notify_one();
443
1.36k
        _idle_threads.pop_front();
444
1.36k
    }
445
5.39k
    l.unlock();
446
447
5.39k
    if (need_a_thread) {
448
622
        Status status = create_thread();
449
622
        if (!status.ok()) {
450
0
            l.lock();
451
0
            _num_threads_pending_start--;
452
0
            if (_num_threads + _num_threads_pending_start == 0) {
453
                // If we have no threads, we can't do any work.
454
0
                return status;
455
0
            }
456
            // If we failed to create a thread, but there are still some other
457
            // worker threads, log a warning message and continue.
458
0
            LOG(WARNING) << "Thread pool " << _name
459
0
                         << " failed to create thread: " << status.to_string();
460
0
        }
461
622
    }
462
463
5.39k
    return Status::OK();
464
5.39k
}
465
466
61
void ThreadPool::wait() {
467
61
    std::unique_lock<std::mutex> l(_lock);
468
61
    check_not_pool_thread_unlocked();
469
130
    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
470
61
}
471
472
1.40k
void ThreadPool::dispatch_thread() {
473
1.40k
    std::unique_lock<std::mutex> l(_lock);
474
1.40k
    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
475
1.40k
    InsertOrDie(&_threads, Thread::current_thread());
476
1.40k
    DCHECK_GT(_num_threads_pending_start, 0);
477
1.40k
    _num_threads++;
478
1.40k
    _num_threads_pending_start--;
479
480
1.40k
    if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) {
481
0
        static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
482
0
    }
483
484
    // Owned by this worker thread and added/removed from _idle_threads as needed.
485
1.40k
    IdleThread me;
486
487
43.7k
    while (true) {
488
        // Note: Status::Aborted() is used to indicate normal shutdown.
489
43.7k
        if (!_pool_status.ok()) {
490
887
            VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
491
887
            break;
492
887
        }
493
494
42.8k
        if (_num_threads + _num_threads_pending_start > _max_threads) {
495
2
            break;
496
2
        }
497
498
42.8k
        if (_queue.empty()) {
499
            // There's no work to do, let's go idle.
500
            //
501
            // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
502
38.5k
            _idle_threads.push_front(me);
503
38.5k
            SCOPED_CLEANUP({
504
                // For some wake ups (i.e. shutdown or do_submit) this thread is
505
                // guaranteed to be unlinked after being awakened. In others (i.e.
506
                // spurious wake-up or Wait timeout), it'll still be linked.
507
38.5k
                if (me.is_linked()) {
508
38.5k
                    _idle_threads.erase(_idle_threads.iterator_to(me));
509
38.5k
                }
510
38.5k
            });
511
38.5k
            if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
512
                // After much investigation, it appears that pthread condition variables have
513
                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
514
                // another thread did in fact signal. Apparently after a timeout there is some
515
                // brief period during which another thread may actually grab the internal mutex
516
                // protecting the state, signal, and release again before we get the mutex. So,
517
                // we'll recheck the empty queue case regardless.
518
36.5k
                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
519
504
                    VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
520
0
                                << std::chrono::duration_cast<std::chrono::milliseconds>(
521
0
                                           _idle_timeout)
522
0
                                           .count()
523
0
                                << "ms of idle time.";
524
504
                    break;
525
504
                }
526
36.3k
            }
527
38.0k
            continue;
528
38.5k
        }
529
530
        // Get the next token and task to execute.
531
4.28k
        ThreadPoolToken* token = _queue.front();
532
4.28k
        _queue.pop_front();
533
4.28k
        DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
534
4.28k
        DCHECK(!token->_entries.empty());
535
4.28k
        Task task = std::move(token->_entries.front());
536
4.28k
        token->_entries.pop_front();
537
4.28k
        token->_active_threads++;
538
4.28k
        --_total_queued_tasks;
539
4.28k
        ++_active_threads;
540
541
4.28k
        l.unlock();
542
543
        // Execute the task
544
4.28k
        task.runnable->run();
545
546
        // Destruct the task while we do not hold the lock.
547
        //
548
        // The task's destructor may be expensive if it has a lot of bound
549
        // objects, and we don't want to block submission of the threadpool.
550
        // In the worst case, the destructor might even try to do something
551
        // with this threadpool, and produce a deadlock.
552
4.28k
        task.runnable.reset();
553
4.28k
        l.lock();
554
555
        // Possible states:
556
        // 1. The token was shut down while we ran its task. Transition to QUIESCED.
557
        // 2. The token has no more queued tasks. Transition back to IDLE.
558
        // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
559
4.28k
        ThreadPoolToken::State state = token->state();
560
4.28k
        DCHECK(state == ThreadPoolToken::State::RUNNING ||
561
4.28k
               state == ThreadPoolToken::State::QUIESCING);
562
4.28k
        --token->_active_threads;
563
4.28k
        --token->_num_submitted_tasks;
564
565
        // handle shutdown && idle
566
4.28k
        if (token->_active_threads == 0) {
567
3.08k
            if (state == ThreadPoolToken::State::QUIESCING) {
568
566
                DCHECK(token->_entries.empty());
569
566
                token->transition(ThreadPoolToken::State::QUIESCED);
570
2.51k
            } else if (token->_entries.empty()) {
571
1.67k
                token->transition(ThreadPoolToken::State::IDLE);
572
1.67k
            }
573
3.08k
        }
574
575
        // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
576
4.28k
        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
577
578
        // If token->state is running and there are unsubmitted tasks in the token, we put
579
        // the token back.
580
4.28k
        if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
581
            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
582
            // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
583
            // threads are running for the token.
584
697
            _queue.emplace_back(token);
585
697
            ++token->_num_submitted_tasks;
586
697
            --token->_num_unsubmitted_tasks;
587
697
        }
588
589
4.28k
        if (--_active_threads == 0) {
590
732
            _idle_cond.notify_all();
591
732
        }
592
4.28k
    }
593
594
    // It's important that we hold the lock between exiting the loop and dropping
595
    // _num_threads. Otherwise it's possible someone else could come along here
596
    // and add a new task just as the last running thread is about to exit.
597
1.40k
    CHECK(l.owns_lock());
598
599
1.40k
    CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
600
1.40k
    _num_threads--;
601
1.40k
    if (_num_threads + _num_threads_pending_start == 0) {
602
710
        _no_threads_cond.notify_all();
603
604
        // Sanity check: if we're the last thread exiting, the queue ought to be
605
        // empty. Otherwise it will never get processed.
606
710
        CHECK(_queue.empty());
607
710
        DCHECK_EQ(0, _total_queued_tasks);
608
710
    }
609
1.40k
}
610
611
1.40k
Status ThreadPool::create_thread() {
612
1.40k
    return Thread::create("thread pool", strings::Substitute("$0 [worker]", _name),
613
1.40k
                          &ThreadPool::dispatch_thread, this, nullptr);
614
1.40k
}
615
616
4.65k
void ThreadPool::check_not_pool_thread_unlocked() {
617
4.65k
    Thread* current = Thread::current_thread();
618
4.65k
    if (ContainsKey(_threads, current)) {
619
0
        LOG(FATAL) << strings::Substitute(
620
0
                "Thread belonging to thread pool '$0' with "
621
0
                "name '$1' called pool function that would result in deadlock",
622
0
                _name, current->name());
623
0
    }
624
4.65k
}
625
626
4
Status ThreadPool::set_min_threads(int min_threads) {
627
4
    std::lock_guard<std::mutex> l(_lock);
628
4
    if (min_threads > _max_threads) {
629
        // min threads can not be set greater than max threads
630
1
        return Status::InternalError("set thread pool {} min_threads failed", _name);
631
1
    }
632
3
    _min_threads = min_threads;
633
3
    if (min_threads > _num_threads + _num_threads_pending_start) {
634
0
        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
635
0
        _num_threads_pending_start += addition_threads;
636
0
        for (int i = 0; i < addition_threads; i++) {
637
0
            Status status = create_thread();
638
0
            if (!status.ok()) {
639
0
                _num_threads_pending_start--;
640
0
                LOG(WARNING) << "Thread pool " << _name
641
0
                             << " failed to create thread: " << status.to_string();
642
0
                return status;
643
0
            }
644
0
        }
645
0
    }
646
3
    return Status::OK();
647
3
}
648
649
7
Status ThreadPool::set_max_threads(int max_threads) {
650
7
    std::lock_guard<std::mutex> l(_lock);
651
7
    if (_min_threads > max_threads) {
652
        // max threads can not be set less than min threads
653
1
        return Status::InternalError("set thread pool {} max_threads failed", _name);
654
1
    }
655
656
6
    _max_threads = max_threads;
657
6
    if (_max_threads > _num_threads + _num_threads_pending_start) {
658
4
        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
659
4
        addition_threads = std::min(addition_threads, _total_queued_tasks);
660
4
        _num_threads_pending_start += addition_threads;
661
7
        for (int i = 0; i < addition_threads; i++) {
662
3
            Status status = create_thread();
663
3
            if (!status.ok()) {
664
0
                _num_threads_pending_start--;
665
0
                LOG(WARNING) << "Thread pool " << _name
666
0
                             << " failed to create thread: " << status.to_string();
667
0
                return status;
668
0
            }
669
3
        }
670
4
    }
671
6
    return Status::OK();
672
6
}
673
674
0
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
675
0
    return o << ThreadPoolToken::state_to_string(s);
676
0
}
677
678
} // namespace doris