/root/doris/be/src/util/threadpool.cpp
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  | // This file is copied from | 
| 18 |  | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc | 
| 19 |  | // and modified by Doris | 
| 20 |  |  | 
| 21 |  | #include "util/threadpool.h" | 
| 22 |  |  | 
| 23 |  | #include <algorithm> | 
| 24 |  | #include <cstdint> | 
| 25 |  | #include <limits> | 
| 26 |  | #include <ostream> | 
| 27 |  | #include <thread> | 
| 28 |  | #include <utility> | 
| 29 |  |  | 
| 30 |  | #include "absl/strings/substitute.h" | 
| 31 |  | #include "common/exception.h" | 
| 32 |  | #include "common/logging.h" | 
| 33 |  | #include "util/debug_points.h" | 
| 34 |  | #include "util/doris_metrics.h" | 
| 35 |  | #include "util/metrics.h" | 
| 36 |  | #include "util/stopwatch.hpp" | 
| 37 |  | #include "util/thread.h" | 
| 38 |  |  | 
| 39 |  | namespace doris { | 
| 40 |  | // The name of these varialbs will be useds as metric name in prometheus. | 
| 41 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT); | 
| 42 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT); | 
| 43 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT); | 
| 44 |  | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT); | 
| 45 |  | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT); | 
| 46 |  | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total, | 
| 47 |  |                                      MetricUnit::NANOSECONDS); | 
| 48 |  | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT); | 
| 49 |  | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total, | 
| 50 |  |                                      MetricUnit::NANOSECONDS); | 
| 51 |  | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT); | 
| 52 |  | using namespace ErrorCode; | 
| 53 |  |  | 
| 54 |  | using std::string; | 
| 55 |  |  | 
| 56 |  | class FunctionRunnable : public Runnable { | 
| 57 |  | public: | 
| 58 | 3.93M |     explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {} | 
| 59 |  |  | 
| 60 | 3.93M |     void run() override { _func(); } | 
| 61 |  |  | 
| 62 |  | private: | 
| 63 |  |     std::function<void()> _func; | 
| 64 |  | }; | 
| 65 |  |  | 
| 66 |  | ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group) | 
| 67 | 825 |         : _name(std::move(name)), | 
| 68 | 825 |           _workload_group(std::move(workload_group)), | 
| 69 | 825 |           _min_threads(0), | 
| 70 | 825 |           _max_threads(std::thread::hardware_concurrency()), | 
| 71 | 825 |           _max_queue_size(std::numeric_limits<int>::max()), | 
| 72 | 825 |           _idle_timeout(std::chrono::milliseconds(500)) {} | 
| 73 |  |  | 
| 74 | 776 | ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { | 
| 75 | 776 |     CHECK_GE(min_threads, 0); | 
| 76 | 776 |     _min_threads = min_threads; | 
| 77 | 776 |     return *this; | 
| 78 | 776 | } | 
| 79 |  |  | 
| 80 | 797 | ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { | 
| 81 | 797 |     CHECK_GT(max_threads, 0); | 
| 82 | 797 |     _max_threads = max_threads; | 
| 83 | 797 |     return *this; | 
| 84 | 797 | } | 
| 85 |  |  | 
| 86 | 195 | ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { | 
| 87 | 195 |     _max_queue_size = max_queue_size; | 
| 88 | 195 |     return *this; | 
| 89 | 195 | } | 
| 90 |  |  | 
| 91 |  | ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl( | 
| 92 | 148 |         std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) { | 
| 93 | 148 |     _cgroup_cpu_ctl = cgroup_cpu_ctl; | 
| 94 | 148 |     return *this; | 
| 95 | 148 | } | 
| 96 |  |  | 
| 97 |  | ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, | 
| 98 |  |                                  int max_concurrency) | 
| 99 | 447k |         : _mode(mode), | 
| 100 | 447k |           _pool(pool), | 
| 101 | 447k |           _state(State::IDLE), | 
| 102 | 447k |           _active_threads(0), | 
| 103 | 447k |           _max_concurrency(max_concurrency), | 
| 104 | 447k |           _num_submitted_tasks(0), | 
| 105 | 447k |           _num_unsubmitted_tasks(0) { | 
| 106 | 447k |     if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) { | 
| 107 | 65.2k |         _mode = ThreadPool::ExecutionMode::SERIAL; | 
| 108 | 65.2k |     } | 
| 109 | 447k | } | 
| 110 |  |  | 
| 111 | 447k | ThreadPoolToken::~ThreadPoolToken() { | 
| 112 | 447k |     shutdown(); | 
| 113 | 447k |     _pool->release_token(this); | 
| 114 | 447k | } | 
| 115 |  |  | 
| 116 | 287k | Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) { | 
| 117 | 287k |     return _pool->do_submit(std::move(r), this); | 
| 118 | 287k | } | 
| 119 |  |  | 
| 120 | 287k | Status ThreadPoolToken::submit_func(std::function<void()> f) { | 
| 121 | 287k |     return submit(std::make_shared<FunctionRunnable>(std::move(f))); | 
| 122 | 287k | } | 
| 123 |  |  | 
| 124 | 816k | void ThreadPoolToken::shutdown() { | 
| 125 | 816k |     std::unique_lock<std::mutex> l(_pool->_lock); | 
| 126 | 816k |     _pool->check_not_pool_thread_unlocked(); | 
| 127 |  |  | 
| 128 |  |     // Clear the queue under the lock, but defer the releasing of the tasks | 
| 129 |  |     // outside the lock, in case there are concurrent threads wanting to access | 
| 130 |  |     // the ThreadPool. The task's destructors may acquire locks, etc, so this | 
| 131 |  |     // also prevents lock inversions. | 
| 132 | 816k |     std::deque<ThreadPool::Task> to_release = std::move(_entries); | 
| 133 | 816k |     _pool->_total_queued_tasks -= to_release.size(); | 
| 134 |  |  | 
| 135 | 816k |     switch (state()) { | 
| 136 | 446k |     case State::IDLE: | 
| 137 |  |         // There were no tasks outstanding; we can quiesce the token immediately. | 
| 138 | 446k |         transition(State::QUIESCED); | 
| 139 | 446k |         break; | 
| 140 | 855 |     case State::RUNNING: | 
| 141 |  |         // There were outstanding tasks. If any are still running, switch to | 
| 142 |  |         // QUIESCING and wait for them to finish (the worker thread executing | 
| 143 |  |         // the token's last task will switch the token to QUIESCED). Otherwise, | 
| 144 |  |         // we can quiesce the token immediately. | 
| 145 |  |  | 
| 146 |  |         // Note: this is an O(n) operation, but it's expected to be infrequent. | 
| 147 |  |         // Plus doing it this way (rather than switching to QUIESCING and waiting | 
| 148 |  |         // for a worker thread to process the queue entry) helps retain state | 
| 149 |  |         // transition symmetry with ThreadPool::shutdown. | 
| 150 | 3.70k |         for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) { | 
| 151 | 2.84k |             if (*it == this) { | 
| 152 | 259 |                 it = _pool->_queue.erase(it); | 
| 153 | 2.59k |             } else { | 
| 154 | 2.59k |                 it++; | 
| 155 | 2.59k |             } | 
| 156 | 2.84k |         } | 
| 157 |  |  | 
| 158 | 855 |         if (_active_threads == 0) { | 
| 159 | 125 |             transition(State::QUIESCED); | 
| 160 | 125 |             break; | 
| 161 | 125 |         } | 
| 162 | 730 |         transition(State::QUIESCING); | 
| 163 | 730 |         [[fallthrough]]; | 
| 164 | 739 |     case State::QUIESCING: | 
| 165 |  |         // The token is already quiescing. Just wait for a worker thread to | 
| 166 |  |         // switch it to QUIESCED. | 
| 167 | 1.47k |         _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; }); | 
| 168 | 739 |         break; | 
| 169 | 369k |     default: | 
| 170 | 369k |         break; | 
| 171 | 816k |     } | 
| 172 | 816k | } | 
| 173 |  |  | 
| 174 | 146k | void ThreadPoolToken::wait() { | 
| 175 | 146k |     std::unique_lock<std::mutex> l(_pool->_lock); | 
| 176 | 146k |     _pool->check_not_pool_thread_unlocked(); | 
| 177 | 163k |     _not_running_cond.wait(l, [this]() { return !is_active(); }); | 
| 178 | 146k | } | 
| 179 |  |  | 
| 180 | 1.63M | void ThreadPoolToken::transition(State new_state) { | 
| 181 | 1.63M | #ifndef NDEBUG | 
| 182 | 1.63M |     CHECK_NE(_state, new_state); | 
| 183 |  |  | 
| 184 | 1.63M |     switch (_state) { | 
| 185 | 1.04M |     case State::IDLE: | 
| 186 | 1.04M |         CHECK(new_state == State::RUNNING || new_state == State::QUIESCED); | 
| 187 | 1.04M |         if (new_state == State::RUNNING) { | 
| 188 | 595k |             CHECK(!_entries.empty()); | 
| 189 | 595k |         } else { | 
| 190 | 446k |             CHECK(_entries.empty()); | 
| 191 | 446k |             CHECK_EQ(_active_threads, 0); | 
| 192 | 446k |         } | 
| 193 | 1.04M |         break; | 
| 194 | 595k |     case State::RUNNING: | 
| 195 | 595k |         CHECK(new_state == State::IDLE || new_state == State::QUIESCING || | 
| 196 | 595k |               new_state == State::QUIESCED); | 
| 197 | 595k |         CHECK(_entries.empty()); | 
| 198 | 595k |         if (new_state == State::QUIESCING) { | 
| 199 | 771 |             CHECK_GT(_active_threads, 0); | 
| 200 | 771 |         } | 
| 201 | 595k |         break; | 
| 202 | 771 |     case State::QUIESCING: | 
| 203 | 771 |         CHECK(new_state == State::QUIESCED); | 
| 204 | 771 |         CHECK_EQ(_active_threads, 0); | 
| 205 | 771 |         break; | 
| 206 | 0 |     case State::QUIESCED: | 
| 207 | 0 |         CHECK(false); // QUIESCED is a terminal state | 
| 208 | 0 |         break; | 
| 209 | 0 |     default: | 
| 210 | 0 |         throw Exception(Status::FatalError("Unknown token state: {}", _state)); | 
| 211 | 1.63M |     } | 
| 212 | 1.63M | #endif | 
| 213 |  |  | 
| 214 |  |     // Take actions based on the state we're entering. | 
| 215 | 1.63M |     switch (new_state) { | 
| 216 | 594k |     case State::IDLE: | 
| 217 | 1.04M |     case State::QUIESCED: | 
| 218 | 1.04M |         _not_running_cond.notify_all(); | 
| 219 | 1.04M |         break; | 
| 220 | 596k |     default: | 
| 221 | 596k |         break; | 
| 222 | 1.63M |     } | 
| 223 |  |  | 
| 224 | 1.63M |     _state = new_state; | 
| 225 | 1.63M | } | 
| 226 |  |  | 
| 227 | 0 | const char* ThreadPoolToken::state_to_string(State s) { | 
| 228 | 0 |     switch (s) { | 
| 229 | 0 |     case State::IDLE: | 
| 230 | 0 |         return "IDLE"; | 
| 231 | 0 |         break; | 
| 232 | 0 |     case State::RUNNING: | 
| 233 | 0 |         return "RUNNING"; | 
| 234 | 0 |         break; | 
| 235 | 0 |     case State::QUIESCING: | 
| 236 | 0 |         return "QUIESCING"; | 
| 237 | 0 |         break; | 
| 238 | 0 |     case State::QUIESCED: | 
| 239 | 0 |         return "QUIESCED"; | 
| 240 | 0 |         break; | 
| 241 | 0 |     } | 
| 242 | 0 |     return "<cannot reach here>"; | 
| 243 | 0 | } | 
| 244 |  |  | 
| 245 | 4.00M | bool ThreadPoolToken::need_dispatch() { | 
| 246 | 4.00M |     return _state == ThreadPoolToken::State::IDLE || | 
| 247 | 4.00M |            (_mode == ThreadPool::ExecutionMode::CONCURRENT && | 
| 248 | 3.40M |             _num_submitted_tasks < _max_concurrency); | 
| 249 | 4.00M | } | 
| 250 |  |  | 
| 251 |  | ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) | 
| 252 | 779 |         : _name(builder._name), | 
| 253 | 779 |           _workload_group(builder._workload_group), | 
| 254 | 779 |           _min_threads(builder._min_threads), | 
| 255 | 779 |           _max_threads(builder._max_threads), | 
| 256 | 779 |           _max_queue_size(builder._max_queue_size), | 
| 257 | 779 |           _idle_timeout(builder._idle_timeout), | 
| 258 | 779 |           _pool_status(Status::Uninitialized("The pool was not initialized.")), | 
| 259 | 779 |           _num_threads(0), | 
| 260 | 779 |           _num_threads_pending_start(0), | 
| 261 | 779 |           _active_threads(0), | 
| 262 | 779 |           _total_queued_tasks(0), | 
| 263 | 779 |           _cgroup_cpu_ctl(builder._cgroup_cpu_ctl), | 
| 264 | 779 |           _tokenless(new_token(ExecutionMode::CONCURRENT)), | 
| 265 | 779 |           _id(UniqueId::gen_uid()) {} | 
| 266 |  |  | 
| 267 | 505 | ThreadPool::~ThreadPool() { | 
| 268 |  |     // There should only be one live token: the one used in tokenless submission. | 
| 269 | 505 |     CHECK_EQ(1, _tokens.size()) << absl::Substitute( | 
| 270 | 0 |             "Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size()); | 
| 271 | 505 |     shutdown(); | 
| 272 | 505 | } | 
| 273 |  |  | 
| 274 | 784 | Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) { | 
| 275 | 11.6k |     for (int i = 0; i < thread_num; i++) { | 
| 276 | 10.8k |         Status status = create_thread(); | 
| 277 | 10.8k |         if (status.ok()) { | 
| 278 | 10.8k |             _num_threads_pending_start++; | 
| 279 | 10.8k |         } else { | 
| 280 | 0 |             LOG(WARNING) << "Thread pool " << _name << " failed to create thread: " << status; | 
| 281 | 0 |             return status; | 
| 282 | 0 |         } | 
| 283 | 10.8k |     } | 
| 284 | 784 |     return Status::OK(); | 
| 285 | 784 | } | 
| 286 |  |  | 
| 287 | 779 | Status ThreadPool::init() { | 
| 288 | 779 |     if (!_pool_status.is<UNINITIALIZED>()) { | 
| 289 | 0 |         return Status::NotSupported("The thread pool {} is already initialized", _name); | 
| 290 | 0 |     } | 
| 291 | 779 |     _pool_status = Status::OK(); | 
| 292 |  |  | 
| 293 | 779 |     { | 
| 294 | 779 |         std::lock_guard<std::mutex> l(_lock); | 
| 295 |  |         // create thread failed should not cause threadpool init failed, | 
| 296 |  |         // because thread can be created later such as when submit a task. | 
| 297 | 779 |         static_cast<void>(try_create_thread(_min_threads, l)); | 
| 298 | 779 |     } | 
| 299 |  |  | 
| 300 |  |     // _id of thread pool is used to make sure when we create thread pool with same name, we can | 
| 301 |  |     // get different _metric_entity | 
| 302 |  |     // If not, we will have problem when we deregister entity and register hook. | 
| 303 | 779 |     _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( | 
| 304 | 779 |             fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name}, | 
| 305 | 779 |                                                    {"workload_group", _workload_group}, | 
| 306 | 779 |                                                    {"id", _id.to_string()}}); | 
| 307 |  |  | 
| 308 | 779 |     INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads); | 
| 309 | 779 |     INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads); | 
| 310 | 779 |     INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size); | 
| 311 | 779 |     INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size); | 
| 312 | 779 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total); | 
| 313 | 779 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total); | 
| 314 | 779 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total); | 
| 315 | 779 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total); | 
| 316 | 779 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed); | 
| 317 |  |  | 
| 318 | 30.4k |     _metric_entity->register_hook("update", [this]() { | 
| 319 | 30.4k |         { | 
| 320 | 30.4k |             std::lock_guard<std::mutex> l(_lock); | 
| 321 | 30.4k |             if (!_pool_status.ok()) { | 
| 322 | 0 |                 return; | 
| 323 | 0 |             } | 
| 324 | 30.4k |         } | 
| 325 |  |  | 
| 326 | 30.4k |         thread_pool_active_threads->set_value(num_active_threads()); | 
| 327 | 30.4k |         thread_pool_queue_size->set_value(get_queue_size()); | 
| 328 | 30.4k |         thread_pool_max_queue_size->set_value(get_max_queue_size()); | 
| 329 | 30.4k |         thread_pool_max_threads->set_value(max_threads()); | 
| 330 | 30.4k |     }); | 
| 331 | 779 |     return Status::OK(); | 
| 332 | 779 | } | 
| 333 |  |  | 
| 334 | 922 | void ThreadPool::shutdown() { | 
| 335 |  |     // Why access to doris_metrics is safe here? | 
| 336 |  |     // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited. | 
| 337 |  |     // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by | 
| 338 |  |     // ExecEnv::destroy(). | 
| 339 | 922 |     DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); | 
| 340 | 922 |     std::unique_lock<std::mutex> l(_lock); | 
| 341 | 922 |     check_not_pool_thread_unlocked(); | 
| 342 |  |  | 
| 343 |  |     // Note: this is the same error seen at submission if the pool is at | 
| 344 |  |     // capacity, so clients can't tell them apart. This isn't really a practical | 
| 345 |  |     // concern though because shutting down a pool typically requires clients to | 
| 346 |  |     // be quiesced first, so there's no danger of a client getting confused. | 
| 347 |  |     // Not print stack trace here | 
| 348 | 922 |     _pool_status = Status::Error<SERVICE_UNAVAILABLE, false>( | 
| 349 | 922 |             "The thread pool {} has been shut down.", _name); | 
| 350 |  |  | 
| 351 |  |     // Clear the various queues under the lock, but defer the releasing | 
| 352 |  |     // of the tasks outside the lock, in case there are concurrent threads | 
| 353 |  |     // wanting to access the ThreadPool. The task's destructors may acquire | 
| 354 |  |     // locks, etc, so this also prevents lock inversions. | 
| 355 | 922 |     _queue.clear(); | 
| 356 |  |  | 
| 357 | 922 |     std::deque<std::deque<Task>> to_release; | 
| 358 | 924 |     for (auto* t : _tokens) { | 
| 359 | 924 |         if (!t->_entries.empty()) { | 
| 360 | 3 |             to_release.emplace_back(std::move(t->_entries)); | 
| 361 | 3 |         } | 
| 362 | 924 |         switch (t->state()) { | 
| 363 | 466 |         case ThreadPoolToken::State::IDLE: | 
| 364 |  |             // The token is idle; we can quiesce it immediately. | 
| 365 | 466 |             t->transition(ThreadPoolToken::State::QUIESCED); | 
| 366 | 466 |             break; | 
| 367 | 41 |         case ThreadPoolToken::State::RUNNING: | 
| 368 |  |             // The token has tasks associated with it. If they're merely queued | 
| 369 |  |             // (i.e. there are no active threads), the tasks will have been removed | 
| 370 |  |             // above and we can quiesce immediately. Otherwise, we need to wait for | 
| 371 |  |             // the threads to finish. | 
| 372 | 41 |             t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING | 
| 373 | 41 |                                                  : ThreadPoolToken::State::QUIESCED); | 
| 374 | 41 |             break; | 
| 375 | 417 |         default: | 
| 376 | 417 |             break; | 
| 377 | 924 |         } | 
| 378 | 924 |     } | 
| 379 |  |  | 
| 380 |  |     // The queues are empty. Wake any sleeping worker threads and wait for all | 
| 381 |  |     // of them to exit. Some worker threads will exit immediately upon waking, | 
| 382 |  |     // while others will exit after they finish executing an outstanding task. | 
| 383 | 922 |     _total_queued_tasks = 0; | 
| 384 | 5.22k |     while (!_idle_threads.empty()) { | 
| 385 | 4.30k |         _idle_threads.front().not_empty.notify_one(); | 
| 386 | 4.30k |         _idle_threads.pop_front(); | 
| 387 | 4.30k |     } | 
| 388 |  |  | 
| 389 | 1.39k |     _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; }); | 
| 390 |  |  | 
| 391 |  |     // All the threads have exited. Check the state of each token. | 
| 392 | 924 |     for (auto* t : _tokens) { | 
| 393 | 924 |         DCHECK(t->state() == ThreadPoolToken::State::IDLE || | 
| 394 | 924 |                t->state() == ThreadPoolToken::State::QUIESCED); | 
| 395 | 924 |     } | 
| 396 | 922 | } | 
| 397 |  |  | 
| 398 | 444k | std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) { | 
| 399 | 444k |     std::lock_guard<std::mutex> l(_lock); | 
| 400 | 444k |     std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency)); | 
| 401 | 444k |     if (!_tokens.insert(t.get()).second) { | 
| 402 | 0 |         throw Exception(Status::InternalError("duplicate token")); | 
| 403 | 0 |     } | 
| 404 | 444k |     return t; | 
| 405 | 444k | } | 
| 406 |  |  | 
| 407 | 447k | void ThreadPool::release_token(ThreadPoolToken* t) { | 
| 408 | 447k |     std::lock_guard<std::mutex> l(_lock); | 
| 409 | 18.4E |     CHECK(!t->is_active()) << absl::Substitute("Token with state $0 may not be released", | 
| 410 | 18.4E |                                                ThreadPoolToken::state_to_string(t->state())); | 
| 411 | 447k |     CHECK_EQ(1, _tokens.erase(t)); | 
| 412 | 447k | } | 
| 413 |  |  | 
| 414 | 3.71M | Status ThreadPool::submit(std::shared_ptr<Runnable> r) { | 
| 415 | 3.71M |     return do_submit(std::move(r), _tokenless.get()); | 
| 416 | 3.71M | } | 
| 417 |  |  | 
| 418 | 3.64M | Status ThreadPool::submit_func(std::function<void()> f) { | 
| 419 | 3.64M |     return submit(std::make_shared<FunctionRunnable>(std::move(f))); | 
| 420 | 3.64M | } | 
| 421 |  |  | 
| 422 | 4.00M | Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) { | 
| 423 | 4.00M |     DCHECK(token); | 
| 424 |  |  | 
| 425 | 4.00M |     std::unique_lock<std::mutex> l(_lock); | 
| 426 | 4.00M |     if (!_pool_status.ok()) [[unlikely]] { | 
| 427 | 1 |         return _pool_status; | 
| 428 | 1 |     } | 
| 429 |  |  | 
| 430 | 4.00M |     if (!token->may_submit_new_tasks()) [[unlikely]] { | 
| 431 | 2.59k |         return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name); | 
| 432 | 2.59k |     } | 
| 433 |  |  | 
| 434 |  |     // Size limit check. | 
| 435 | 4.00M |     int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads + | 
| 436 | 4.00M |                                  static_cast<int64_t>(_max_queue_size) - _total_queued_tasks; | 
| 437 | 4.00M |     if (capacity_remaining < 1) { | 
| 438 | 4 |         thread_pool_submit_failed->increment(1); | 
| 439 | 4 |         return Status::Error<SERVICE_UNAVAILABLE>( | 
| 440 | 4 |                 "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name, | 
| 441 | 4 |                 _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks, | 
| 442 | 4 |                 _max_queue_size); | 
| 443 | 4 |     } | 
| 444 |  |  | 
| 445 |  |     // Should we create another thread? | 
| 446 |  |  | 
| 447 |  |     // We assume that each current inactive thread will grab one item from the | 
| 448 |  |     // queue.  If it seems like we'll need another thread, we create one. | 
| 449 |  |     // | 
| 450 |  |     // Rather than creating the thread here, while holding the lock, we defer | 
| 451 |  |     // it to down below. This is because thread creation can be rather slow | 
| 452 |  |     // (hundreds of milliseconds in some cases) and we'd like to allow the | 
| 453 |  |     // existing threads to continue to process tasks while we do so. | 
| 454 |  |     // | 
| 455 |  |     // In theory, a currently active thread could finish immediately after this | 
| 456 |  |     // calculation but before our new worker starts running. This would mean we | 
| 457 |  |     // created a thread we didn't really need. However, this race is unavoidable | 
| 458 |  |     // and harmless. | 
| 459 |  |     // | 
| 460 |  |     // Of course, we never create more than _max_threads threads no matter what. | 
| 461 | 4.00M |     int threads_from_this_submit = | 
| 462 | 4.00M |             token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1; | 
| 463 | 4.00M |     int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads; | 
| 464 | 4.00M |     int additional_threads = | 
| 465 | 4.00M |             static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads; | 
| 466 | 4.00M |     bool need_a_thread = false; | 
| 467 | 4.00M |     if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) { | 
| 468 | 23.1k |         need_a_thread = true; | 
| 469 | 23.1k |         _num_threads_pending_start++; | 
| 470 | 23.1k |     } | 
| 471 |  |  | 
| 472 | 4.00M |     Task task; | 
| 473 | 4.00M |     task.runnable = std::move(r); | 
| 474 | 4.00M |     task.submit_time_wather.start(); | 
| 475 |  |  | 
| 476 |  |     // Add the task to the token's queue. | 
| 477 | 4.00M |     ThreadPoolToken::State state = token->state(); | 
| 478 | 4.00M |     DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING); | 
| 479 | 4.00M |     token->_entries.emplace_back(std::move(task)); | 
| 480 |  |     // When we need to execute the task in the token, we submit the token object to the queue. | 
| 481 |  |     // There are currently two places where tokens will be submitted to the queue: | 
| 482 |  |     // 1. When submitting a new task, if the token is still in the IDLE state, | 
| 483 |  |     //    or the concurrency of the token has not reached the online level, it will be added to the queue. | 
| 484 |  |     // 2. When the dispatch thread finishes executing a task: | 
| 485 |  |     //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue. | 
| 486 |  |     //    2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached, | 
| 487 |  |     //       then submitted to the queue. | 
| 488 | 4.00M |     if (token->need_dispatch()) { | 
| 489 | 3.99M |         _queue.emplace_back(token); | 
| 490 | 3.99M |         ++token->_num_submitted_tasks; | 
| 491 | 3.99M |         if (state == ThreadPoolToken::State::IDLE) { | 
| 492 | 595k |             token->transition(ThreadPoolToken::State::RUNNING); | 
| 493 | 595k |         } | 
| 494 | 3.99M |     } else { | 
| 495 | 4.66k |         ++token->_num_unsubmitted_tasks; | 
| 496 | 4.66k |     } | 
| 497 | 4.00M |     _total_queued_tasks++; | 
| 498 |  |  | 
| 499 |  |     // Wake up an idle thread for this task. Choosing the thread at the front of | 
| 500 |  |     // the list ensures LIFO semantics as idling threads are also added to the front. | 
| 501 |  |     // | 
| 502 |  |     // If there are no idle threads, the new task remains on the queue and is | 
| 503 |  |     // processed by an active thread (or a thread we're about to create) at some | 
| 504 |  |     // point in the future. | 
| 505 | 4.00M |     if (!_idle_threads.empty()) { | 
| 506 | 2.47M |         _idle_threads.front().not_empty.notify_one(); | 
| 507 | 2.47M |         _idle_threads.pop_front(); | 
| 508 | 2.47M |     } | 
| 509 | 4.00M |     l.unlock(); | 
| 510 |  |  | 
| 511 | 4.00M |     if (need_a_thread) { | 
| 512 | 23.1k |         Status status = create_thread(); | 
| 513 | 23.1k |         if (!status.ok()) { | 
| 514 | 0 |             l.lock(); | 
| 515 | 0 |             _num_threads_pending_start--; | 
| 516 | 0 |             if (_num_threads + _num_threads_pending_start == 0) { | 
| 517 |  |                 // If we have no threads, we can't do any work. | 
| 518 | 0 |                 return status; | 
| 519 | 0 |             } | 
| 520 |  |             // If we failed to create a thread, but there are still some other | 
| 521 |  |             // worker threads, log a warning message and continue. | 
| 522 | 0 |             LOG(WARNING) << "Thread pool " << _name | 
| 523 | 0 |                          << " failed to create thread: " << status.to_string(); | 
| 524 | 0 |         } | 
| 525 | 23.1k |     } | 
| 526 |  |  | 
| 527 | 4.00M |     return Status::OK(); | 
| 528 | 4.00M | } | 
| 529 |  |  | 
| 530 | 148 | void ThreadPool::wait() { | 
| 531 | 148 |     std::unique_lock<std::mutex> l(_lock); | 
| 532 | 148 |     check_not_pool_thread_unlocked(); | 
| 533 | 224 |     _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; }); | 
| 534 | 148 | } | 
| 535 |  |  | 
| 536 | 34.0k | void ThreadPool::dispatch_thread() { | 
| 537 | 34.0k |     std::unique_lock<std::mutex> l(_lock); | 
| 538 | 34.0k |     if (!_threads.insert(Thread::current_thread()).second) { | 
| 539 | 0 |         throw Exception(Status::InternalError("duplicate token")); | 
| 540 | 0 |     } | 
| 541 | 34.0k |     DCHECK_GT(_num_threads_pending_start, 0); | 
| 542 | 34.0k |     _num_threads++; | 
| 543 | 34.0k |     _num_threads_pending_start--; | 
| 544 |  |  | 
| 545 | 34.0k |     if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) { | 
| 546 | 0 |         static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup()); | 
| 547 | 0 |     } | 
| 548 |  |  | 
| 549 |  |     // Owned by this worker thread and added/removed from _idle_threads as needed. | 
| 550 | 34.0k |     IdleThread me; | 
| 551 |  |  | 
| 552 | 17.2M |     while (true) { | 
| 553 |  |         // Note: Status::Aborted() is used to indicate normal shutdown. | 
| 554 | 17.2M |         if (!_pool_status.ok()) { | 
| 555 | 5.17k |             VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string(); | 
| 556 | 5.17k |             break; | 
| 557 | 5.17k |         } | 
| 558 |  |  | 
| 559 | 17.2M |         if (_num_threads + _num_threads_pending_start > _max_threads) { | 
| 560 | 37 |             break; | 
| 561 | 37 |         } | 
| 562 |  |  | 
| 563 | 17.2M |         if (_queue.empty()) { | 
| 564 |  |             // There's no work to do, let's go idle. | 
| 565 |  |             // | 
| 566 |  |             // Note: if FIFO behavior is desired, it's as simple as changing this to push_back(). | 
| 567 | 13.2M |             _idle_threads.push_front(me); | 
| 568 | 13.2M |             Defer defer = [&] { | 
| 569 |  |                 // For some wake ups (i.e. shutdown or do_submit) this thread is | 
| 570 |  |                 // guaranteed to be unlinked after being awakened. In others (i.e. | 
| 571 |  |                 // spurious wake-up or Wait timeout), it'll still be linked. | 
| 572 | 13.2M |                 if (me.is_linked()) { | 
| 573 | 10.7M |                     _idle_threads.erase(_idle_threads.iterator_to(me)); | 
| 574 | 10.7M |                 } | 
| 575 | 13.2M |             }; | 
| 576 | 13.2M |             if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) { | 
| 577 |  |                 // After much investigation, it appears that pthread condition variables have | 
| 578 |  |                 // a weird behavior in which they can return ETIMEDOUT from timed_wait even if | 
| 579 |  |                 // another thread did in fact signal. Apparently after a timeout there is some | 
| 580 |  |                 // brief period during which another thread may actually grab the internal mutex | 
| 581 |  |                 // protecting the state, signal, and release again before we get the mutex. So, | 
| 582 |  |                 // we'll recheck the empty queue case regardless. | 
| 583 | 10.7M |                 if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) { | 
| 584 | 23.1k |                     VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after " | 
| 585 | 0 |                                 << std::chrono::duration_cast<std::chrono::milliseconds>( | 
| 586 | 0 |                                            _idle_timeout) | 
| 587 | 0 |                                            .count() | 
| 588 | 0 |                                 << "ms of idle time."; | 
| 589 | 23.1k |                     break; | 
| 590 | 23.1k |                 } | 
| 591 | 10.7M |             } | 
| 592 | 13.2M |             continue; | 
| 593 | 13.2M |         } | 
| 594 |  |  | 
| 595 | 4.00M |         MonotonicStopWatch task_execution_time_watch; | 
| 596 | 4.00M |         task_execution_time_watch.start(); | 
| 597 |  |         // Get the next token and task to execute. | 
| 598 | 4.00M |         ThreadPoolToken* token = _queue.front(); | 
| 599 | 4.00M |         _queue.pop_front(); | 
| 600 | 4.00M |         DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); | 
| 601 | 4.00M |         DCHECK(!token->_entries.empty()); | 
| 602 | 4.00M |         Task task = std::move(token->_entries.front()); | 
| 603 | 4.00M |         thread_pool_task_wait_worker_time_ns_total->increment( | 
| 604 | 4.00M |                 task.submit_time_wather.elapsed_time()); | 
| 605 | 4.00M |         thread_pool_task_wait_worker_count_total->increment(1); | 
| 606 | 4.00M |         token->_entries.pop_front(); | 
| 607 | 4.00M |         token->_active_threads++; | 
| 608 | 4.00M |         --_total_queued_tasks; | 
| 609 | 4.00M |         ++_active_threads; | 
| 610 |  |  | 
| 611 | 4.00M |         l.unlock(); | 
| 612 |  |  | 
| 613 |  |         // Execute the task | 
| 614 | 4.00M |         task.runnable->run(); | 
| 615 |  |         // Destruct the task while we do not hold the lock. | 
| 616 |  |         // | 
| 617 |  |         // The task's destructor may be expensive if it has a lot of bound | 
| 618 |  |         // objects, and we don't want to block submission of the threadpool. | 
| 619 |  |         // In the worst case, the destructor might even try to do something | 
| 620 |  |         // with this threadpool, and produce a deadlock. | 
| 621 | 4.00M |         task.runnable.reset(); | 
| 622 | 4.00M |         l.lock(); | 
| 623 | 4.00M |         thread_pool_task_execution_time_ns_total->increment( | 
| 624 | 4.00M |                 task_execution_time_watch.elapsed_time()); | 
| 625 | 4.00M |         thread_pool_task_execution_count_total->increment(1); | 
| 626 |  |         // Possible states: | 
| 627 |  |         // 1. The token was shut down while we ran its task. Transition to QUIESCED. | 
| 628 |  |         // 2. The token has no more queued tasks. Transition back to IDLE. | 
| 629 |  |         // 3. The token has more tasks. Requeue it and transition back to RUNNABLE. | 
| 630 | 4.00M |         ThreadPoolToken::State state = token->state(); | 
| 631 | 4.00M |         DCHECK(state == ThreadPoolToken::State::RUNNING || | 
| 632 | 4.00M |                state == ThreadPoolToken::State::QUIESCING); | 
| 633 | 4.00M |         --token->_active_threads; | 
| 634 | 4.00M |         --token->_num_submitted_tasks; | 
| 635 |  |  | 
| 636 |  |         // handle shutdown && idle | 
| 637 | 4.00M |         if (token->_active_threads == 0) { | 
| 638 | 619k |             if (state == ThreadPoolToken::State::QUIESCING) { | 
| 639 | 771 |                 DCHECK(token->_entries.empty()); | 
| 640 | 771 |                 token->transition(ThreadPoolToken::State::QUIESCED); | 
| 641 | 618k |             } else if (token->_entries.empty()) { | 
| 642 | 594k |                 token->transition(ThreadPoolToken::State::IDLE); | 
| 643 | 594k |             } | 
| 644 | 619k |         } | 
| 645 |  |  | 
| 646 |  |         // We decrease _num_submitted_tasks holding lock, so the following DCHECK works. | 
| 647 | 4.00M |         DCHECK(token->_num_submitted_tasks < token->_max_concurrency); | 
| 648 |  |  | 
| 649 |  |         // If token->state is running and there are unsubmitted tasks in the token, we put | 
| 650 |  |         // the token back. | 
| 651 | 4.00M |         if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) { | 
| 652 |  |             // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0. | 
| 653 |  |             // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2 | 
| 654 |  |             // threads are running for the token. | 
| 655 | 7.16k |             _queue.emplace_back(token); | 
| 656 | 7.16k |             ++token->_num_submitted_tasks; | 
| 657 | 7.16k |             --token->_num_unsubmitted_tasks; | 
| 658 | 7.16k |         } | 
| 659 |  |  | 
| 660 | 4.00M |         if (--_active_threads == 0) { | 
| 661 | 585k |             _idle_cond.notify_all(); | 
| 662 | 585k |         } | 
| 663 | 4.00M |     } | 
| 664 |  |  | 
| 665 |  |     // It's important that we hold the lock between exiting the loop and dropping | 
| 666 |  |     // _num_threads. Otherwise it's possible someone else could come along here | 
| 667 |  |     // and add a new task just as the last running thread is about to exit. | 
| 668 | 34.0k |     CHECK(l.owns_lock()); | 
| 669 |  |  | 
| 670 | 34.0k |     CHECK_EQ(_threads.erase(Thread::current_thread()), 1); | 
| 671 | 34.0k |     _num_threads--; | 
| 672 | 34.0k |     if (_num_threads + _num_threads_pending_start == 0) { | 
| 673 | 1.47k |         _no_threads_cond.notify_all(); | 
| 674 |  |  | 
| 675 |  |         // Sanity check: if we're the last thread exiting, the queue ought to be | 
| 676 |  |         // empty. Otherwise it will never get processed. | 
| 677 | 1.47k |         CHECK(_queue.empty()); | 
| 678 | 1.47k |         DCHECK_EQ(0, _total_queued_tasks); | 
| 679 | 1.47k |     } | 
| 680 | 34.0k | } | 
| 681 |  |  | 
| 682 | 34.0k | Status ThreadPool::create_thread() { | 
| 683 | 34.0k |     return Thread::create("thread pool", absl::Substitute("$0 [worker]", _name), | 
| 684 | 34.0k |                           &ThreadPool::dispatch_thread, this, nullptr); | 
| 685 | 34.0k | } | 
| 686 |  |  | 
| 687 | 963k | void ThreadPool::check_not_pool_thread_unlocked() { | 
| 688 | 963k |     Thread* current = Thread::current_thread(); | 
| 689 | 963k |     if (_threads.contains(current)) { | 
| 690 | 0 |         throw Exception( | 
| 691 | 0 |                 Status::FatalError("Thread belonging to thread pool {} with " | 
| 692 | 0 |                                    "name {} called pool function that would result in deadlock", | 
| 693 | 0 |                                    _name, current->name())); | 
| 694 | 0 |     } | 
| 695 | 963k | } | 
| 696 |  |  | 
| 697 | 7 | Status ThreadPool::set_min_threads(int min_threads) { | 
| 698 | 7 |     std::lock_guard<std::mutex> l(_lock); | 
| 699 | 7 |     if (min_threads > _max_threads) { | 
| 700 |  |         // min threads can not be set greater than max threads | 
| 701 | 1 |         return Status::InternalError("set thread pool {} min_threads failed", _name); | 
| 702 | 1 |     } | 
| 703 | 6 |     _min_threads = min_threads; | 
| 704 | 6 |     if (min_threads > _num_threads + _num_threads_pending_start) { | 
| 705 | 0 |         int addition_threads = min_threads - _num_threads - _num_threads_pending_start; | 
| 706 | 0 |         RETURN_IF_ERROR(try_create_thread(addition_threads, l)); | 
| 707 | 0 |     } | 
| 708 | 6 |     return Status::OK(); | 
| 709 | 6 | } | 
| 710 |  |  | 
| 711 | 11 | Status ThreadPool::set_max_threads(int max_threads) { | 
| 712 | 11 |     std::lock_guard<std::mutex> l(_lock); | 
| 713 | 11 |     DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", { | 
| 714 | 11 |         _max_threads = max_threads; | 
| 715 | 11 |         return Status::OK(); | 
| 716 | 11 |     }) | 
| 717 | 11 |     if (_min_threads > max_threads) { | 
| 718 |  |         // max threads can not be set less than min threads | 
| 719 | 1 |         return Status::InternalError("set thread pool {} max_threads failed", _name); | 
| 720 | 1 |     } | 
| 721 |  |  | 
| 722 | 10 |     _max_threads = max_threads; | 
| 723 | 10 |     if (_max_threads > _num_threads + _num_threads_pending_start) { | 
| 724 | 5 |         int addition_threads = _max_threads - _num_threads - _num_threads_pending_start; | 
| 725 | 5 |         addition_threads = std::min(addition_threads, _total_queued_tasks); | 
| 726 | 5 |         RETURN_IF_ERROR(try_create_thread(addition_threads, l)); | 
| 727 | 5 |     } | 
| 728 | 10 |     return Status::OK(); | 
| 729 | 10 | } | 
| 730 |  |  | 
| 731 | 0 | std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { | 
| 732 | 0 |     return o << ThreadPoolToken::state_to_string(s); | 
| 733 | 0 | } | 
| 734 |  |  | 
| 735 |  | } // namespace doris |