/root/doris/be/src/util/threadpool.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc |
19 | | // and modified by Doris |
20 | | |
21 | | #include "util/threadpool.h" |
22 | | |
23 | | #include <algorithm> |
24 | | #include <cstdint> |
25 | | #include <limits> |
26 | | #include <ostream> |
27 | | #include <thread> |
28 | | #include <utility> |
29 | | |
30 | | #include "absl/strings/substitute.h" |
31 | | #include "common/exception.h" |
32 | | #include "common/logging.h" |
33 | | #include "gutil/port.h" |
34 | | #include "util/debug/sanitizer_scopes.h" |
35 | | #include "util/debug_points.h" |
36 | | #include "util/doris_metrics.h" |
37 | | #include "util/metrics.h" |
38 | | #include "util/scoped_cleanup.h" |
39 | | #include "util/stopwatch.hpp" |
40 | | #include "util/thread.h" |
41 | | |
42 | | namespace doris { |
43 | | // The name of these varialbs will be useds as metric name in prometheus. |
44 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT); |
45 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT); |
46 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT); |
47 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT); |
48 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT); |
49 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total, |
50 | | MetricUnit::NANOSECONDS); |
51 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT); |
52 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total, |
53 | | MetricUnit::NANOSECONDS); |
54 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT); |
55 | | using namespace ErrorCode; |
56 | | |
57 | | using std::string; |
58 | | |
59 | | class FunctionRunnable : public Runnable { |
60 | | public: |
61 | 10.5k | explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {} |
62 | | |
63 | 5.17k | void run() override { _func(); } |
64 | | |
65 | | private: |
66 | | std::function<void()> _func; |
67 | | }; |
68 | | |
69 | | ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group) |
70 | | : _name(std::move(name)), |
71 | | _workload_group(std::move(workload_group)), |
72 | | _min_threads(0), |
73 | | _max_threads(std::thread::hardware_concurrency()), |
74 | | _max_queue_size(std::numeric_limits<int>::max()), |
75 | 327 | _idle_timeout(std::chrono::milliseconds(500)) {} |
76 | | |
77 | 291 | ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { |
78 | 291 | CHECK_GE(min_threads, 0); |
79 | 291 | _min_threads = min_threads; |
80 | 291 | return *this; |
81 | 291 | } |
82 | | |
83 | 299 | ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { |
84 | 299 | CHECK_GT(max_threads, 0); |
85 | 299 | _max_threads = max_threads; |
86 | 299 | return *this; |
87 | 299 | } |
88 | | |
89 | 96 | ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { |
90 | 96 | _max_queue_size = max_queue_size; |
91 | 96 | return *this; |
92 | 96 | } |
93 | | |
94 | | ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl( |
95 | 1 | std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) { |
96 | 1 | _cgroup_cpu_ctl = cgroup_cpu_ctl; |
97 | 1 | return *this; |
98 | 1 | } |
99 | | |
100 | | ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, |
101 | | int max_concurrency) |
102 | | : _mode(mode), |
103 | | _pool(pool), |
104 | | _state(State::IDLE), |
105 | | _active_threads(0), |
106 | | _max_concurrency(max_concurrency), |
107 | | _num_submitted_tasks(0), |
108 | 2.49k | _num_unsubmitted_tasks(0) { |
109 | 2.49k | if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) { |
110 | 1 | _mode = ThreadPool::ExecutionMode::SERIAL; |
111 | 1 | } |
112 | 2.49k | } |
113 | | |
114 | 2.48k | ThreadPoolToken::~ThreadPoolToken() { |
115 | 2.48k | shutdown(); |
116 | 2.48k | _pool->release_token(this); |
117 | 2.48k | } |
118 | | |
119 | 9.06k | Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) { |
120 | 9.06k | return _pool->do_submit(std::move(r), this); |
121 | 9.06k | } |
122 | | |
123 | 9.10k | Status ThreadPoolToken::submit_func(std::function<void()> f) { |
124 | 9.10k | return submit(std::make_shared<FunctionRunnable>(std::move(f))); |
125 | 9.10k | } |
126 | | |
127 | 3.73k | void ThreadPoolToken::shutdown() { |
128 | 3.73k | std::unique_lock<std::mutex> l(_pool->_lock); |
129 | 3.73k | _pool->check_not_pool_thread_unlocked(); |
130 | | |
131 | | // Clear the queue under the lock, but defer the releasing of the tasks |
132 | | // outside the lock, in case there are concurrent threads wanting to access |
133 | | // the ThreadPool. The task's destructors may acquire locks, etc, so this |
134 | | // also prevents lock inversions. |
135 | 3.73k | std::deque<ThreadPool::Task> to_release = std::move(_entries); |
136 | 3.73k | _pool->_total_queued_tasks -= to_release.size(); |
137 | | |
138 | 3.73k | switch (state()) { |
139 | 908 | case State::IDLE: |
140 | | // There were no tasks outstanding; we can quiesce the token immediately. |
141 | 908 | transition(State::QUIESCED); |
142 | 908 | break; |
143 | 1.20k | case State::RUNNING: |
144 | | // There were outstanding tasks. If any are still running, switch to |
145 | | // QUIESCING and wait for them to finish (the worker thread executing |
146 | | // the token's last task will switch the token to QUIESCED). Otherwise, |
147 | | // we can quiesce the token immediately. |
148 | | |
149 | | // Note: this is an O(n) operation, but it's expected to be infrequent. |
150 | | // Plus doing it this way (rather than switching to QUIESCING and waiting |
151 | | // for a worker thread to process the queue entry) helps retain state |
152 | | // transition symmetry with ThreadPool::shutdown. |
153 | 12.4k | for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) { |
154 | 11.2k | if (*it == this) { |
155 | 1.11k | it = _pool->_queue.erase(it); |
156 | 10.1k | } else { |
157 | 10.1k | it++; |
158 | 10.1k | } |
159 | 11.2k | } |
160 | | |
161 | 1.20k | if (_active_threads == 0) { |
162 | 510 | transition(State::QUIESCED); |
163 | 510 | break; |
164 | 510 | } |
165 | 697 | transition(State::QUIESCING); |
166 | 697 | [[fallthrough]]; |
167 | 717 | case State::QUIESCING: |
168 | | // The token is already quiescing. Just wait for a worker thread to |
169 | | // switch it to QUIESCED. |
170 | 1.43k | _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; }); |
171 | 717 | break; |
172 | 1.60k | default: |
173 | 1.60k | break; |
174 | 3.73k | } |
175 | 3.73k | } |
176 | | |
177 | 768 | void ThreadPoolToken::wait() { |
178 | 768 | std::unique_lock<std::mutex> l(_pool->_lock); |
179 | 768 | _pool->check_not_pool_thread_unlocked(); |
180 | 1.01k | _not_running_cond.wait(l, [this]() { return !is_active(); }); |
181 | 768 | } |
182 | | |
183 | 8.17k | void ThreadPoolToken::transition(State new_state) { |
184 | 8.17k | #ifndef NDEBUG |
185 | 8.17k | CHECK_NE(_state, new_state); |
186 | | |
187 | 8.17k | switch (_state) { |
188 | 4.36k | case State::IDLE: |
189 | 4.36k | CHECK(new_state == State::RUNNING || new_state == State::QUIESCED); |
190 | 4.36k | if (new_state == State::RUNNING) { |
191 | 3.09k | CHECK(!_entries.empty()); |
192 | 3.09k | } else { |
193 | 1.26k | CHECK(_entries.empty()); |
194 | 1.26k | CHECK_EQ(_active_threads, 0); |
195 | 1.26k | } |
196 | 4.36k | break; |
197 | 3.09k | case State::RUNNING: |
198 | 3.09k | CHECK(new_state == State::IDLE || new_state == State::QUIESCING || |
199 | 3.09k | new_state == State::QUIESCED); |
200 | 3.09k | CHECK(_entries.empty()); |
201 | 3.09k | if (new_state == State::QUIESCING) { |
202 | 706 | CHECK_GT(_active_threads, 0); |
203 | 706 | } |
204 | 3.09k | break; |
205 | 706 | case State::QUIESCING: |
206 | 706 | CHECK(new_state == State::QUIESCED); |
207 | 706 | CHECK_EQ(_active_threads, 0); |
208 | 706 | break; |
209 | 0 | case State::QUIESCED: |
210 | 0 | CHECK(false); // QUIESCED is a terminal state |
211 | 0 | break; |
212 | 0 | default: |
213 | 0 | throw Exception(Status::FatalError("Unknown token state: {}", _state)); |
214 | 8.17k | } |
215 | 8.17k | #endif |
216 | | |
217 | | // Take actions based on the state we're entering. |
218 | 8.17k | switch (new_state) { |
219 | 1.87k | case State::IDLE: |
220 | 4.36k | case State::QUIESCED: |
221 | 4.36k | _not_running_cond.notify_all(); |
222 | 4.36k | break; |
223 | 3.80k | default: |
224 | 3.80k | break; |
225 | 8.17k | } |
226 | | |
227 | 8.17k | _state = new_state; |
228 | 8.17k | } |
229 | | |
230 | 0 | const char* ThreadPoolToken::state_to_string(State s) { |
231 | 0 | switch (s) { |
232 | 0 | case State::IDLE: |
233 | 0 | return "IDLE"; |
234 | 0 | break; |
235 | 0 | case State::RUNNING: |
236 | 0 | return "RUNNING"; |
237 | 0 | break; |
238 | 0 | case State::QUIESCING: |
239 | 0 | return "QUIESCING"; |
240 | 0 | break; |
241 | 0 | case State::QUIESCED: |
242 | 0 | return "QUIESCED"; |
243 | 0 | break; |
244 | 0 | } |
245 | 0 | return "<cannot reach here>"; |
246 | 0 | } |
247 | | |
248 | 7.64k | bool ThreadPoolToken::need_dispatch() { |
249 | 7.64k | return _state == ThreadPoolToken::State::IDLE || |
250 | 7.64k | (_mode == ThreadPool::ExecutionMode::CONCURRENT && |
251 | 4.54k | _num_submitted_tasks < _max_concurrency); |
252 | 7.64k | } |
253 | | |
254 | | ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) |
255 | | : _name(builder._name), |
256 | | _workload_group(builder._workload_group), |
257 | | _min_threads(builder._min_threads), |
258 | | _max_threads(builder._max_threads), |
259 | | _max_queue_size(builder._max_queue_size), |
260 | | _idle_timeout(builder._idle_timeout), |
261 | | _pool_status(Status::Uninitialized("The pool was not initialized.")), |
262 | | _num_threads(0), |
263 | | _num_threads_pending_start(0), |
264 | | _active_threads(0), |
265 | | _total_queued_tasks(0), |
266 | | _cgroup_cpu_ctl(builder._cgroup_cpu_ctl), |
267 | | _tokenless(new_token(ExecutionMode::CONCURRENT)), |
268 | 322 | _id(UniqueId::gen_uid()) {} |
269 | | |
270 | 310 | ThreadPool::~ThreadPool() { |
271 | | // There should only be one live token: the one used in tokenless submission. |
272 | 310 | CHECK_EQ(1, _tokens.size()) << absl::Substitute( |
273 | 0 | "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size()); |
274 | 310 | shutdown(); |
275 | 310 | } |
276 | | |
277 | 327 | Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) { |
278 | 1.35k | for (int i = 0; i < thread_num; i++) { |
279 | 1.03k | Status status = create_thread(); |
280 | 1.03k | if (status.ok()) { |
281 | 1.03k | _num_threads_pending_start++; |
282 | 1.03k | } else { |
283 | 0 | LOG(WARNING) << "Thread pool " << _name << " failed to create thread: " << status; |
284 | 0 | return status; |
285 | 0 | } |
286 | 1.03k | } |
287 | 327 | return Status::OK(); |
288 | 327 | } |
289 | | |
290 | 322 | Status ThreadPool::init() { |
291 | 322 | if (!_pool_status.is<UNINITIALIZED>()) { |
292 | 0 | return Status::NotSupported("The thread pool {} is already initialized", _name); |
293 | 0 | } |
294 | 322 | _pool_status = Status::OK(); |
295 | | |
296 | 322 | { |
297 | 322 | std::lock_guard<std::mutex> l(_lock); |
298 | | // create thread failed should not cause threadpool init failed, |
299 | | // because thread can be created later such as when submit a task. |
300 | 322 | static_cast<void>(try_create_thread(_min_threads, l)); |
301 | 322 | } |
302 | | |
303 | | // _id of thread pool is used to make sure when we create thread pool with same name, we can |
304 | | // get different _metric_entity |
305 | | // If not, we will have problem when we deregister entity and register hook. |
306 | 322 | _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( |
307 | 322 | fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name}, |
308 | 322 | {"workload_group", _workload_group}, |
309 | 322 | {"id", _id.to_string()}}); |
310 | | |
311 | 322 | INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads); |
312 | 322 | INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads); |
313 | 322 | INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size); |
314 | 322 | INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size); |
315 | 322 | INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total); |
316 | 322 | INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total); |
317 | 322 | INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total); |
318 | 322 | INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total); |
319 | 322 | INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed); |
320 | | |
321 | 322 | _metric_entity->register_hook("update", [this]() { |
322 | 0 | { |
323 | 0 | std::lock_guard<std::mutex> l(_lock); |
324 | 0 | if (!_pool_status.ok()) { |
325 | 0 | return; |
326 | 0 | } |
327 | 0 | } |
328 | | |
329 | 0 | thread_pool_active_threads->set_value(num_active_threads()); |
330 | 0 | thread_pool_queue_size->set_value(get_queue_size()); |
331 | 0 | thread_pool_max_queue_size->set_value(get_max_queue_size()); |
332 | 0 | thread_pool_max_threads->set_value(max_threads()); |
333 | 0 | }); |
334 | 322 | return Status::OK(); |
335 | 322 | } |
336 | | |
337 | 533 | void ThreadPool::shutdown() { |
338 | | // Why access to doris_metrics is safe here? |
339 | | // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited. |
340 | | // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by |
341 | | // ExecEnv::destroy(). |
342 | 533 | DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); |
343 | 533 | debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; |
344 | 533 | std::unique_lock<std::mutex> l(_lock); |
345 | 533 | check_not_pool_thread_unlocked(); |
346 | | |
347 | | // Note: this is the same error seen at submission if the pool is at |
348 | | // capacity, so clients can't tell them apart. This isn't really a practical |
349 | | // concern though because shutting down a pool typically requires clients to |
350 | | // be quiesced first, so there's no danger of a client getting confused. |
351 | | // Not print stack trace here |
352 | 533 | _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>( |
353 | 533 | "The thread pool {} has been shut down.", _name); |
354 | | |
355 | | // Clear the various queues under the lock, but defer the releasing |
356 | | // of the tasks outside the lock, in case there are concurrent threads |
357 | | // wanting to access the ThreadPool. The task's destructors may acquire |
358 | | // locks, etc, so this also prevents lock inversions. |
359 | 533 | _queue.clear(); |
360 | | |
361 | 533 | std::deque<std::deque<Task>> to_release; |
362 | 618 | for (auto* t : _tokens) { |
363 | 618 | if (!t->_entries.empty()) { |
364 | 6 | to_release.emplace_back(std::move(t->_entries)); |
365 | 6 | } |
366 | 618 | switch (t->state()) { |
367 | 360 | case ThreadPoolToken::State::IDLE: |
368 | | // The token is idle; we can quiesce it immediately. |
369 | 360 | t->transition(ThreadPoolToken::State::QUIESCED); |
370 | 360 | break; |
371 | 12 | case ThreadPoolToken::State::RUNNING: |
372 | | // The token has tasks associated with it. If they're merely queued |
373 | | // (i.e. there are no active threads), the tasks will have been removed |
374 | | // above and we can quiesce immediately. Otherwise, we need to wait for |
375 | | // the threads to finish. |
376 | 12 | t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING |
377 | 12 | : ThreadPoolToken::State::QUIESCED); |
378 | 12 | break; |
379 | 246 | default: |
380 | 246 | break; |
381 | 618 | } |
382 | 618 | } |
383 | | |
384 | | // The queues are empty. Wake any sleeping worker threads and wait for all |
385 | | // of them to exit. Some worker threads will exit immediately upon waking, |
386 | | // while others will exit after they finish executing an outstanding task. |
387 | 533 | _total_queued_tasks = 0; |
388 | 1.62k | while (!_idle_threads.empty()) { |
389 | 1.09k | _idle_threads.front().not_empty.notify_one(); |
390 | 1.09k | _idle_threads.pop_front(); |
391 | 1.09k | } |
392 | | |
393 | 821 | _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; }); |
394 | | |
395 | | // All the threads have exited. Check the state of each token. |
396 | 618 | for (auto* t : _tokens) { |
397 | 618 | DCHECK(t->state() == ThreadPoolToken::State::IDLE || |
398 | 618 | t->state() == ThreadPoolToken::State::QUIESCED); |
399 | 618 | } |
400 | 533 | } |
401 | | |
402 | 2.49k | std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) { |
403 | 2.49k | std::lock_guard<std::mutex> l(_lock); |
404 | 2.49k | std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency)); |
405 | 2.49k | if (!_tokens.insert(t.get()).second) { |
406 | 0 | throw Exception(Status::InternalError("duplicate token")); |
407 | 0 | } |
408 | 2.49k | return t; |
409 | 2.49k | } |
410 | | |
411 | 2.48k | void ThreadPool::release_token(ThreadPoolToken* t) { |
412 | 2.48k | std::lock_guard<std::mutex> l(_lock); |
413 | 2.48k | CHECK(!t->is_active()) << absl::Substitute("Token with state $0 may not be released", |
414 | 0 | ThreadPoolToken::state_to_string(t->state())); |
415 | 2.48k | CHECK_EQ(1, _tokens.erase(t)); |
416 | 2.48k | } |
417 | | |
418 | 1.65k | Status ThreadPool::submit(std::shared_ptr<Runnable> r) { |
419 | 1.65k | return do_submit(std::move(r), _tokenless.get()); |
420 | 1.65k | } |
421 | | |
422 | 1.41k | Status ThreadPool::submit_func(std::function<void()> f) { |
423 | 1.41k | return submit(std::make_shared<FunctionRunnable>(std::move(f))); |
424 | 1.41k | } |
425 | | |
426 | 10.7k | Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) { |
427 | 10.7k | DCHECK(token); |
428 | | |
429 | 10.7k | std::unique_lock<std::mutex> l(_lock); |
430 | 10.7k | if (PREDICT_FALSE(!_pool_status.ok())) { |
431 | 1 | return _pool_status; |
432 | 1 | } |
433 | | |
434 | 10.7k | if (PREDICT_FALSE(!token->may_submit_new_tasks())) { |
435 | 3.16k | return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name); |
436 | 3.16k | } |
437 | | |
438 | | // Size limit check. |
439 | 7.60k | int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads + |
440 | 7.60k | static_cast<int64_t>(_max_queue_size) - _total_queued_tasks; |
441 | 7.60k | if (capacity_remaining < 1) { |
442 | 4 | thread_pool_submit_failed->increment(1); |
443 | 4 | return Status::Error<SERVICE_UNAVAILABLE>( |
444 | 4 | "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name, |
445 | 4 | _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks, |
446 | 4 | _max_queue_size); |
447 | 4 | } |
448 | | |
449 | | // Should we create another thread? |
450 | | |
451 | | // We assume that each current inactive thread will grab one item from the |
452 | | // queue. If it seems like we'll need another thread, we create one. |
453 | | // |
454 | | // Rather than creating the thread here, while holding the lock, we defer |
455 | | // it to down below. This is because thread creation can be rather slow |
456 | | // (hundreds of milliseconds in some cases) and we'd like to allow the |
457 | | // existing threads to continue to process tasks while we do so. |
458 | | // |
459 | | // In theory, a currently active thread could finish immediately after this |
460 | | // calculation but before our new worker starts running. This would mean we |
461 | | // created a thread we didn't really need. However, this race is unavoidable |
462 | | // and harmless. |
463 | | // |
464 | | // Of course, we never create more than _max_threads threads no matter what. |
465 | 7.60k | int threads_from_this_submit = |
466 | 7.60k | token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1; |
467 | 7.60k | int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads; |
468 | 7.60k | int additional_threads = |
469 | 7.60k | static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads; |
470 | 7.60k | bool need_a_thread = false; |
471 | 7.60k | if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) { |
472 | 623 | need_a_thread = true; |
473 | 623 | _num_threads_pending_start++; |
474 | 623 | } |
475 | | |
476 | 7.60k | Task task; |
477 | 7.60k | task.runnable = std::move(r); |
478 | 7.60k | task.submit_time_wather.start(); |
479 | | |
480 | | // Add the task to the token's queue. |
481 | 7.60k | ThreadPoolToken::State state = token->state(); |
482 | 7.60k | DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING); |
483 | 7.60k | token->_entries.emplace_back(std::move(task)); |
484 | | // When we need to execute the task in the token, we submit the token object to the queue. |
485 | | // There are currently two places where tokens will be submitted to the queue: |
486 | | // 1. When submitting a new task, if the token is still in the IDLE state, |
487 | | // or the concurrency of the token has not reached the online level, it will be added to the queue. |
488 | | // 2. When the dispatch thread finishes executing a task: |
489 | | // 1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue. |
490 | | // 2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached, |
491 | | // then submitted to the queue. |
492 | 7.60k | if (token->need_dispatch()) { |
493 | 5.65k | _queue.emplace_back(token); |
494 | 5.65k | ++token->_num_submitted_tasks; |
495 | 5.65k | if (state == ThreadPoolToken::State::IDLE) { |
496 | 3.09k | token->transition(ThreadPoolToken::State::RUNNING); |
497 | 3.09k | } |
498 | 5.65k | } else { |
499 | 1.95k | ++token->_num_unsubmitted_tasks; |
500 | 1.95k | } |
501 | 7.60k | _total_queued_tasks++; |
502 | | |
503 | | // Wake up an idle thread for this task. Choosing the thread at the front of |
504 | | // the list ensures LIFO semantics as idling threads are also added to the front. |
505 | | // |
506 | | // If there are no idle threads, the new task remains on the queue and is |
507 | | // processed by an active thread (or a thread we're about to create) at some |
508 | | // point in the future. |
509 | 7.60k | if (!_idle_threads.empty()) { |
510 | 1.76k | _idle_threads.front().not_empty.notify_one(); |
511 | 1.76k | _idle_threads.pop_front(); |
512 | 1.76k | } |
513 | 7.60k | l.unlock(); |
514 | | |
515 | 7.60k | if (need_a_thread) { |
516 | 623 | Status status = create_thread(); |
517 | 623 | if (!status.ok()) { |
518 | 0 | l.lock(); |
519 | 0 | _num_threads_pending_start--; |
520 | 0 | if (_num_threads + _num_threads_pending_start == 0) { |
521 | | // If we have no threads, we can't do any work. |
522 | 0 | return status; |
523 | 0 | } |
524 | | // If we failed to create a thread, but there are still some other |
525 | | // worker threads, log a warning message and continue. |
526 | 0 | LOG(WARNING) << "Thread pool " << _name |
527 | 0 | << " failed to create thread: " << status.to_string(); |
528 | 0 | } |
529 | 623 | } |
530 | | |
531 | 7.60k | return Status::OK(); |
532 | 7.60k | } |
533 | | |
534 | 121 | void ThreadPool::wait() { |
535 | 121 | std::unique_lock<std::mutex> l(_lock); |
536 | 121 | check_not_pool_thread_unlocked(); |
537 | 243 | _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; }); |
538 | 121 | } |
539 | | |
540 | 1.65k | void ThreadPool::dispatch_thread() { |
541 | 1.65k | std::unique_lock<std::mutex> l(_lock); |
542 | 1.65k | debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; |
543 | 1.65k | if (!_threads.insert(Thread::current_thread()).second) { |
544 | 0 | throw Exception(Status::InternalError("duplicate token")); |
545 | 0 | } |
546 | 1.65k | DCHECK_GT(_num_threads_pending_start, 0); |
547 | 1.65k | _num_threads++; |
548 | 1.65k | _num_threads_pending_start--; |
549 | | |
550 | 1.65k | if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) { |
551 | 0 | static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup()); |
552 | 0 | } |
553 | | |
554 | | // Owned by this worker thread and added/removed from _idle_threads as needed. |
555 | 1.65k | IdleThread me; |
556 | | |
557 | 272k | while (true) { |
558 | | // Note: Status::Aborted() is used to indicate normal shutdown. |
559 | 272k | if (!_pool_status.ok()) { |
560 | 1.12k | VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string(); |
561 | 1.12k | break; |
562 | 1.12k | } |
563 | | |
564 | 271k | if (_num_threads + _num_threads_pending_start > _max_threads) { |
565 | 2 | break; |
566 | 2 | } |
567 | | |
568 | 271k | if (_queue.empty()) { |
569 | | // There's no work to do, let's go idle. |
570 | | // |
571 | | // Note: if FIFO behavior is desired, it's as simple as changing this to push_back(). |
572 | 265k | _idle_threads.push_front(me); |
573 | 265k | SCOPED_CLEANUP({ |
574 | | // For some wake ups (i.e. shutdown or do_submit) this thread is |
575 | | // guaranteed to be unlinked after being awakened. In others (i.e. |
576 | | // spurious wake-up or Wait timeout), it'll still be linked. |
577 | 265k | if (me.is_linked()) { |
578 | 265k | _idle_threads.erase(_idle_threads.iterator_to(me)); |
579 | 265k | } |
580 | 265k | }); |
581 | 265k | if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) { |
582 | | // After much investigation, it appears that pthread condition variables have |
583 | | // a weird behavior in which they can return ETIMEDOUT from timed_wait even if |
584 | | // another thread did in fact signal. Apparently after a timeout there is some |
585 | | // brief period during which another thread may actually grab the internal mutex |
586 | | // protecting the state, signal, and release again before we get the mutex. So, |
587 | | // we'll recheck the empty queue case regardless. |
588 | 265k | if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) { |
589 | 505 | VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after " |
590 | 0 | << std::chrono::duration_cast<std::chrono::milliseconds>( |
591 | 0 | _idle_timeout) |
592 | 0 | .count() |
593 | 0 | << "ms of idle time."; |
594 | 505 | break; |
595 | 505 | } |
596 | 265k | } |
597 | 265k | continue; |
598 | 265k | } |
599 | | |
600 | 5.25k | MonotonicStopWatch task_execution_time_watch; |
601 | 5.25k | task_execution_time_watch.start(); |
602 | | // Get the next token and task to execute. |
603 | 5.25k | ThreadPoolToken* token = _queue.front(); |
604 | 5.25k | _queue.pop_front(); |
605 | 5.25k | DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); |
606 | 5.25k | DCHECK(!token->_entries.empty()); |
607 | 5.25k | Task task = std::move(token->_entries.front()); |
608 | 5.25k | thread_pool_task_wait_worker_time_ns_total->increment( |
609 | 5.25k | task.submit_time_wather.elapsed_time()); |
610 | 5.25k | thread_pool_task_wait_worker_count_total->increment(1); |
611 | 5.25k | token->_entries.pop_front(); |
612 | 5.25k | token->_active_threads++; |
613 | 5.25k | --_total_queued_tasks; |
614 | 5.25k | ++_active_threads; |
615 | | |
616 | 5.25k | l.unlock(); |
617 | | |
618 | | // Execute the task |
619 | 5.25k | task.runnable->run(); |
620 | | // Destruct the task while we do not hold the lock. |
621 | | // |
622 | | // The task's destructor may be expensive if it has a lot of bound |
623 | | // objects, and we don't want to block submission of the threadpool. |
624 | | // In the worst case, the destructor might even try to do something |
625 | | // with this threadpool, and produce a deadlock. |
626 | 5.25k | task.runnable.reset(); |
627 | 5.25k | l.lock(); |
628 | 5.25k | thread_pool_task_execution_time_ns_total->increment( |
629 | 5.25k | task_execution_time_watch.elapsed_time()); |
630 | 5.25k | thread_pool_task_execution_count_total->increment(1); |
631 | | // Possible states: |
632 | | // 1. The token was shut down while we ran its task. Transition to QUIESCED. |
633 | | // 2. The token has no more queued tasks. Transition back to IDLE. |
634 | | // 3. The token has more tasks. Requeue it and transition back to RUNNABLE. |
635 | 5.25k | ThreadPoolToken::State state = token->state(); |
636 | 5.25k | DCHECK(state == ThreadPoolToken::State::RUNNING || |
637 | 5.25k | state == ThreadPoolToken::State::QUIESCING); |
638 | 5.25k | --token->_active_threads; |
639 | 5.25k | --token->_num_submitted_tasks; |
640 | | |
641 | | // handle shutdown && idle |
642 | 5.25k | if (token->_active_threads == 0) { |
643 | 3.73k | if (state == ThreadPoolToken::State::QUIESCING) { |
644 | 706 | DCHECK(token->_entries.empty()); |
645 | 706 | token->transition(ThreadPoolToken::State::QUIESCED); |
646 | 3.03k | } else if (token->_entries.empty()) { |
647 | 1.87k | token->transition(ThreadPoolToken::State::IDLE); |
648 | 1.87k | } |
649 | 3.73k | } |
650 | | |
651 | | // We decrease _num_submitted_tasks holding lock, so the following DCHECK works. |
652 | 5.25k | DCHECK(token->_num_submitted_tasks < token->_max_concurrency); |
653 | | |
654 | | // If token->state is running and there are unsubmitted tasks in the token, we put |
655 | | // the token back. |
656 | 5.25k | if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) { |
657 | | // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0. |
658 | | // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2 |
659 | | // threads are running for the token. |
660 | 893 | _queue.emplace_back(token); |
661 | 893 | ++token->_num_submitted_tasks; |
662 | 893 | --token->_num_unsubmitted_tasks; |
663 | 893 | } |
664 | | |
665 | 5.25k | if (--_active_threads == 0) { |
666 | 921 | _idle_cond.notify_all(); |
667 | 921 | } |
668 | 5.25k | } |
669 | | |
670 | | // It's important that we hold the lock between exiting the loop and dropping |
671 | | // _num_threads. Otherwise it's possible someone else could come along here |
672 | | // and add a new task just as the last running thread is about to exit. |
673 | 1.65k | CHECK(l.owns_lock()); |
674 | | |
675 | 1.65k | CHECK_EQ(_threads.erase(Thread::current_thread()), 1); |
676 | 1.65k | _num_threads--; |
677 | 1.65k | if (_num_threads + _num_threads_pending_start == 0) { |
678 | 786 | _no_threads_cond.notify_all(); |
679 | | |
680 | | // Sanity check: if we're the last thread exiting, the queue ought to be |
681 | | // empty. Otherwise it will never get processed. |
682 | 786 | CHECK(_queue.empty()); |
683 | 786 | DCHECK_EQ(0, _total_queued_tasks); |
684 | 786 | } |
685 | 1.65k | } |
686 | | |
687 | 1.65k | Status ThreadPool::create_thread() { |
688 | 1.65k | return Thread::create("thread pool", absl::Substitute("$0 [worker]", _name), |
689 | 1.65k | &ThreadPool::dispatch_thread, this, nullptr); |
690 | 1.65k | } |
691 | | |
692 | 5.16k | void ThreadPool::check_not_pool_thread_unlocked() { |
693 | 5.16k | Thread* current = Thread::current_thread(); |
694 | 5.16k | if (_threads.contains(current)) { |
695 | 0 | throw Exception( |
696 | 0 | Status::FatalError("Thread belonging to thread pool {} with " |
697 | 0 | "name {} called pool function that would result in deadlock", |
698 | 0 | _name, current->name())); |
699 | 0 | } |
700 | 5.16k | } |
701 | | |
702 | 4 | Status ThreadPool::set_min_threads(int min_threads) { |
703 | 4 | std::lock_guard<std::mutex> l(_lock); |
704 | 4 | if (min_threads > _max_threads) { |
705 | | // min threads can not be set greater than max threads |
706 | 1 | return Status::InternalError("set thread pool {} min_threads failed", _name); |
707 | 1 | } |
708 | 3 | _min_threads = min_threads; |
709 | 3 | if (min_threads > _num_threads + _num_threads_pending_start) { |
710 | 0 | int addition_threads = min_threads - _num_threads - _num_threads_pending_start; |
711 | 0 | RETURN_IF_ERROR(try_create_thread(addition_threads, l)); |
712 | 0 | } |
713 | 3 | return Status::OK(); |
714 | 3 | } |
715 | | |
716 | 8 | Status ThreadPool::set_max_threads(int max_threads) { |
717 | 8 | std::lock_guard<std::mutex> l(_lock); |
718 | 8 | DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", { |
719 | 8 | _max_threads = max_threads; |
720 | 8 | return Status::OK(); |
721 | 8 | }) |
722 | 8 | if (_min_threads > max_threads) { |
723 | | // max threads can not be set less than min threads |
724 | 1 | return Status::InternalError("set thread pool {} max_threads failed", _name); |
725 | 1 | } |
726 | | |
727 | 7 | _max_threads = max_threads; |
728 | 7 | if (_max_threads > _num_threads + _num_threads_pending_start) { |
729 | 5 | int addition_threads = _max_threads - _num_threads - _num_threads_pending_start; |
730 | 5 | addition_threads = std::min(addition_threads, _total_queued_tasks); |
731 | 5 | RETURN_IF_ERROR(try_create_thread(addition_threads, l)); |
732 | 5 | } |
733 | 7 | return Status::OK(); |
734 | 7 | } |
735 | | |
736 | 0 | std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { |
737 | 0 | return o << ThreadPoolToken::state_to_string(s); |
738 | 0 | } |
739 | | |
740 | | } // namespace doris |