be/src/storage/adaptive_thread_pool_controller.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/adaptive_thread_pool_controller.h" |
19 | | |
20 | | #include <butil/time.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <thread> |
24 | | |
25 | | #include "cloud/config.h" |
26 | | #include "common/config.h" |
27 | | #include "common/logging.h" |
28 | | #include "common/metrics/system_metrics.h" |
29 | | #include "common/status.h" |
30 | | #include "util/threadpool.h" |
31 | | #include "util/time.h" |
32 | | |
33 | | namespace doris { |
34 | | |
35 | 40 | int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const { |
36 | 40 | int num_cpus = std::thread::hardware_concurrency(); |
37 | 40 | if (num_cpus <= 0) num_cpus = 1; |
38 | 40 | return static_cast<int>(num_cpus * max_threads_per_cpu); |
39 | 40 | } |
40 | | |
41 | 26 | int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const { |
42 | 26 | int num_cpus = std::thread::hardware_concurrency(); |
43 | 26 | if (num_cpus <= 0) num_cpus = 1; |
44 | 26 | return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu)); |
45 | 26 | } |
46 | | |
47 | | // Static callback registered with bthread_timer_add. |
48 | | // Runs in brpc TimerThread. Must be fast and non-blocking. |
49 | 0 | void AdaptiveThreadPoolController::_on_timer(void* raw) { |
50 | 0 | auto* arg = static_cast<TimerArg*>(raw); |
51 | | |
52 | | // Hold mu for the entire callback (fire + re-registration). |
53 | | // cancel() acquires mu after bthread_timer_del, so this provides |
54 | | // cancel-with-wait semantics without a dedicated thread. |
55 | 0 | std::lock_guard<std::mutex> lk(arg->mu); |
56 | |
|
57 | 0 | if (arg->stopped.load(std::memory_order_acquire)) { |
58 | | // cancel() set stopped before we took the lock. |
59 | | // cancel() owns arg and will delete it after taking mu. |
60 | 0 | return; |
61 | 0 | } |
62 | | |
63 | 0 | arg->ctrl->_fire_group(arg->name); |
64 | |
|
65 | 0 | if (arg->stopped.load(std::memory_order_acquire)) { |
66 | 0 | return; // cancel() will clean up |
67 | 0 | } |
68 | | |
69 | | // Re-register the next one-shot timer. |
70 | 0 | bthread_timer_t tid; |
71 | 0 | if (bthread_timer_add(&tid, butil::milliseconds_from_now(arg->interval_ms), _on_timer, arg) == |
72 | 0 | 0) { |
73 | 0 | arg->timer_id.store(tid, std::memory_order_release); |
74 | 0 | } else { |
75 | 0 | LOG(WARNING) << "Adaptive: failed to re-register timer for group '" << arg->name << "'"; |
76 | 0 | } |
77 | 0 | } |
78 | | |
79 | | void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics, |
80 | 12 | ThreadPool* s3_file_upload_pool) { |
81 | 12 | _system_metrics = system_metrics; |
82 | 12 | _s3_file_upload_pool = s3_file_upload_pool; |
83 | 12 | } |
84 | | |
85 | 1.00k | void AdaptiveThreadPoolController::stop() { |
86 | 1.00k | std::vector<std::string> names; |
87 | 1.00k | { |
88 | 1.00k | std::lock_guard<std::mutex> lk(_mutex); |
89 | 1.00k | for (const auto& [name, _] : _pool_groups) { |
90 | 13 | names.push_back(name); |
91 | 13 | } |
92 | 1.00k | } |
93 | 1.00k | for (const auto& name : names) { |
94 | 13 | cancel(name); |
95 | 13 | } |
96 | 1.00k | } |
97 | | |
98 | | void AdaptiveThreadPoolController::add(std::string name, std::vector<ThreadPool*> pools, |
99 | | AdjustFunc adjust_func, double max_threads_per_cpu, |
100 | 14 | double min_threads_per_cpu, int64_t interval_ms) { |
101 | 14 | PoolGroup group; |
102 | 14 | group.name = name; |
103 | 14 | group.pools = std::move(pools); |
104 | 14 | group.adjust_func = std::move(adjust_func); |
105 | 14 | group.max_threads_per_cpu = max_threads_per_cpu; |
106 | 14 | group.min_threads_per_cpu = min_threads_per_cpu; |
107 | 14 | group.current_threads = group.get_max_threads(); |
108 | | |
109 | 14 | int log_max = group.get_max_threads(); |
110 | 14 | int log_min = group.get_min_threads(); |
111 | | |
112 | 14 | auto* arg = new TimerArg(); |
113 | 14 | arg->ctrl = this; |
114 | 14 | arg->name = name; |
115 | 14 | arg->interval_ms = interval_ms; |
116 | | |
117 | 14 | bthread_timer_t tid; |
118 | 14 | if (bthread_timer_add(&tid, butil::milliseconds_from_now(interval_ms), _on_timer, arg) == 0) { |
119 | 14 | arg->timer_id.store(tid, std::memory_order_release); |
120 | 14 | } else { |
121 | 0 | LOG(WARNING) << "Adaptive: failed to register timer for pool group '" << name << "'"; |
122 | 0 | } |
123 | 14 | group.timer_arg = arg; |
124 | | |
125 | 14 | { |
126 | 14 | std::lock_guard<std::mutex> lk(_mutex); |
127 | 14 | _pool_groups[name] = std::move(group); |
128 | 14 | } |
129 | | |
130 | 14 | LOG(INFO) << "Adaptive: added pool group '" << name << "'" |
131 | 14 | << ", max_threads=" << log_max << ", min_threads=" << log_min |
132 | 14 | << ", interval_ms=" << interval_ms; |
133 | 14 | } |
134 | | |
135 | 14 | void AdaptiveThreadPoolController::cancel(const std::string& name) { |
136 | 14 | TimerArg* arg = nullptr; |
137 | 14 | { |
138 | 14 | std::lock_guard<std::mutex> lk(_mutex); |
139 | 14 | auto it = _pool_groups.find(name); |
140 | 14 | if (it != _pool_groups.end()) { |
141 | 14 | arg = it->second.timer_arg; |
142 | 14 | _pool_groups.erase(it); |
143 | 14 | } |
144 | 14 | } |
145 | | |
146 | 14 | if (arg == nullptr) { |
147 | 0 | return; |
148 | 0 | } |
149 | | |
150 | | // Signal the callback to stop re-registering. |
151 | 14 | arg->stopped.store(true, std::memory_order_release); |
152 | | |
153 | | // Try to cancel a pending (not yet fired) timer. Read timer_id after |
154 | | // setting stopped so any re-registration in a concurrent callback has |
155 | | // already stored the latest id by now (it holds mu, which we haven't |
156 | | // taken yet). |
157 | 14 | bthread_timer_t tid = arg->timer_id.load(std::memory_order_acquire); |
158 | 14 | bthread_timer_del(tid); // returns non-zero if already fired; that's fine |
159 | | |
160 | | // Wait for any in-flight callback to finish. The callback holds mu while |
161 | | // running _fire_group and re-registering, so acquiring mu here ensures |
162 | | // we don't free arg while the callback is still executing. |
163 | 14 | { std::lock_guard<std::mutex> lk(arg->mu); } |
164 | | |
165 | 14 | delete arg; |
166 | 14 | LOG(INFO) << "Adaptive: cancelled pool group '" << name << "'"; |
167 | 14 | } |
168 | | |
169 | | // Called from _on_timer. No lock held on entry. |
170 | 6 | void AdaptiveThreadPoolController::_fire_group(const std::string& name) { |
171 | 6 | if (!config::enable_adaptive_flush_threads) { |
172 | 0 | return; |
173 | 0 | } |
174 | | // Phase 1: snapshot parameters under the lock. |
175 | 6 | AdjustFunc fn; |
176 | 6 | int current, min_t, max_t; |
177 | 6 | { |
178 | 6 | std::lock_guard<std::mutex> lk(_mutex); |
179 | 6 | auto it = _pool_groups.find(name); |
180 | 6 | if (it == _pool_groups.end()) return; |
181 | 6 | const PoolGroup& g = it->second; |
182 | 6 | fn = g.adjust_func; |
183 | 6 | current = g.current_threads; |
184 | 6 | min_t = g.get_min_threads(); |
185 | 6 | max_t = g.get_max_threads(); |
186 | 6 | } |
187 | | |
188 | | // Phase 2: compute target — no lock held (adjust_func may call is_io_busy etc.). |
189 | 0 | std::string reason; |
190 | 6 | int target = fn(current, min_t, max_t, reason); |
191 | | |
192 | | // Phase 3: apply under lock; recheck in case cancel() raced with us. |
193 | 6 | std::lock_guard<std::mutex> lk(_mutex); |
194 | 6 | auto it = _pool_groups.find(name); |
195 | 6 | if (it == _pool_groups.end()) return; |
196 | 6 | _apply_thread_count(it->second, target, reason); |
197 | 6 | } |
198 | | |
199 | | // Fire all groups once regardless of schedule. For testing. |
200 | 5 | void AdaptiveThreadPoolController::adjust_once() { |
201 | 5 | std::vector<std::string> names; |
202 | 5 | { |
203 | 5 | std::lock_guard<std::mutex> lk(_mutex); |
204 | 6 | for (const auto& [name, _] : _pool_groups) { |
205 | 6 | names.push_back(name); |
206 | 6 | } |
207 | 5 | } |
208 | 6 | for (const auto& name : names) { |
209 | 6 | _fire_group(name); |
210 | 6 | } |
211 | 5 | } |
212 | | |
213 | | void AdaptiveThreadPoolController::_apply_thread_count(PoolGroup& group, int target_threads, |
214 | 6 | const std::string& reason) { |
215 | 6 | int max_threads = group.get_max_threads(); |
216 | 6 | int min_threads = group.get_min_threads(); |
217 | 6 | target_threads = std::max(min_threads, std::min(max_threads, target_threads)); |
218 | 6 | if (target_threads == group.current_threads) return; |
219 | | |
220 | 6 | LOG(INFO) << "Adaptive[" << group.name << "]: adjusting threads from " << group.current_threads |
221 | 2 | << " to " << target_threads << " (min=" << min_threads << ", max=" << max_threads |
222 | 2 | << ")" << (reason.empty() ? "" : " reason=[" + reason + "]"); |
223 | | |
224 | 2 | bool all_success = true; |
225 | 2 | for (auto* pool : group.pools) { |
226 | 2 | if (pool == nullptr) continue; |
227 | | // Always sync min_threads to guard against races with update_memtable_flush_threads(). |
228 | | // Order matters: when increasing, set max first so max >= min is always satisfied; |
229 | | // when decreasing, set min first so the new max is never below min. |
230 | 2 | Status st; |
231 | 2 | if (target_threads >= group.current_threads) { |
232 | 0 | st = pool->set_max_threads(target_threads); |
233 | 0 | if (st.ok()) static_cast<void>(pool->set_min_threads(min_threads)); |
234 | 2 | } else { |
235 | 2 | st = pool->set_min_threads(min_threads); |
236 | 2 | if (st.ok()) st = pool->set_max_threads(target_threads); |
237 | 2 | } |
238 | 2 | if (!st.ok()) { |
239 | 0 | all_success = false; |
240 | 0 | LOG(WARNING) << "Adaptive[" << group.name << "]: failed to set threads: " << st; |
241 | 0 | } |
242 | 2 | } |
243 | 2 | if (all_success) { |
244 | 2 | group.current_threads = target_threads; |
245 | 2 | } |
246 | 2 | } |
247 | | |
248 | 15 | int AdaptiveThreadPoolController::get_current_threads(const std::string& name) const { |
249 | 15 | std::lock_guard<std::mutex> lk(_mutex); |
250 | 15 | auto it = _pool_groups.find(name); |
251 | 15 | return it != _pool_groups.end() ? it->second.current_threads : 0; |
252 | 15 | } |
253 | | |
254 | 2 | bool AdaptiveThreadPoolController::is_io_busy() { |
255 | 2 | if (config::is_cloud_mode()) { |
256 | 0 | if (_s3_file_upload_pool == nullptr) return false; |
257 | 0 | int queue_size = _s3_file_upload_pool->get_queue_size(); |
258 | 0 | return queue_size > kS3QueueBusyThreshold; |
259 | 0 | } |
260 | | |
261 | 2 | if (_system_metrics == nullptr) return false; |
262 | | |
263 | 0 | int64_t current_time_sec = MonotonicSeconds(); |
264 | 0 | int64_t interval_sec = current_time_sec - _last_check_time_sec; |
265 | 0 | if (interval_sec <= 0) { |
266 | 0 | return _last_io_busy; |
267 | 0 | } |
268 | | |
269 | 0 | int64_t max_io_util = _system_metrics->get_max_io_util(_last_disk_io_time, interval_sec); |
270 | 0 | _system_metrics->get_disks_io_time(&_last_disk_io_time); |
271 | 0 | _last_check_time_sec = current_time_sec; |
272 | |
|
273 | 0 | _last_io_busy = max_io_util > kIOBusyThresholdPercent; |
274 | 0 | return _last_io_busy; |
275 | 0 | } |
276 | | |
277 | 2 | bool AdaptiveThreadPoolController::is_cpu_busy() { |
278 | 2 | if (_system_metrics == nullptr) return false; |
279 | | |
280 | 0 | double load_avg = _system_metrics->get_load_average_1_min(); |
281 | 0 | int num_cpus = std::thread::hardware_concurrency(); |
282 | 0 | if (num_cpus <= 0) return false; |
283 | | |
284 | 0 | double cpu_usage_percent = (load_avg / num_cpus) * 100.0; |
285 | 0 | return cpu_usage_percent > kCPUBusyThresholdPercent; |
286 | 0 | } |
287 | | |
288 | | AdaptiveThreadPoolController::AdjustFunc AdaptiveThreadPoolController::make_flush_adjust_func( |
289 | 0 | AdaptiveThreadPoolController* controller, ThreadPool* flush_pool) { |
290 | 0 | return [controller, flush_pool](int current, int min_t, int max_t, std::string& reason) { |
291 | 0 | int target = current; |
292 | 0 | int queue_size = flush_pool->get_queue_size(); |
293 | 0 | if (queue_size > kQueueThreshold) { |
294 | 0 | target = std::min(max_t, target + 1); |
295 | 0 | reason += "queue_size=" + std::to_string(queue_size) + ">" + |
296 | 0 | std::to_string(kQueueThreshold) + " -> target=" + std::to_string(target) + |
297 | 0 | "; "; |
298 | 0 | } |
299 | 0 | if (controller->is_io_busy()) { |
300 | 0 | target = std::max(min_t, target - 2); |
301 | 0 | reason += "io_busy -> target=" + std::to_string(target) + "; "; |
302 | 0 | } |
303 | 0 | if (controller->is_cpu_busy()) { |
304 | 0 | target = std::max(min_t, target - 2); |
305 | 0 | reason += "cpu_busy -> target=" + std::to_string(target) + "; "; |
306 | 0 | } |
307 | 0 | return target; |
308 | 0 | }; |
309 | 0 | } |
310 | | |
311 | | } // namespace doris |