be/src/storage/adaptive_thread_pool_controller.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bthread/unstable.h> |
21 | | |
22 | | #include <atomic> |
23 | | #include <cstdint> |
24 | | #include <functional> |
25 | | #include <map> |
26 | | #include <mutex> |
27 | | #include <string> |
28 | | #include <vector> |
29 | | |
30 | | namespace doris { |
31 | | |
32 | | class ThreadPool; |
33 | | class SystemMetrics; |
34 | | class AdaptiveThreadPoolController; |
35 | | |
36 | | // Each pool group's timer state. Heap-allocated; shared between the controller |
37 | | // and the brpc TimerThread callback. |
38 | | struct TimerArg { |
39 | | AdaptiveThreadPoolController* ctrl; // never null |
40 | | std::string name; |
41 | | int64_t interval_ms; |
42 | | |
43 | | // Set by cancel() before calling bthread_timer_del. The callback checks |
44 | | // this flag after acquiring `mu` and skips re-registration when true. |
45 | | std::atomic<bool> stopped {false}; |
46 | | |
47 | | // Tracks the most recently registered timer id. Updated under `mu` by the |
48 | | // callback after each re-registration; read by cancel() to call |
49 | | // bthread_timer_del on the latest pending timer. |
50 | | std::atomic<bthread_timer_t> timer_id {0}; |
51 | | |
52 | | // Held for the entire duration of the callback (fire + re-registration). |
53 | | // cancel() acquires it after bthread_timer_del to wait for any in-flight |
54 | | // invocation to complete before freeing `this`. |
55 | | std::mutex mu; |
56 | | }; |
57 | | |
58 | | // AdaptiveThreadPoolController dynamically adjusts thread pool sizes based on |
59 | | // system load (IO utilisation, CPU load average, flush queue depth). |
60 | | // |
61 | | // Each registered pool group runs as a one-shot bthread_timer_add chain: the |
62 | | // callback fires, adjusts the pool, then re-registers the next one-shot timer. |
63 | | // All groups share the single brpc TimerThread, keeping the overhead minimal. |
64 | | // |
65 | | // Usage: |
66 | | // AdaptiveThreadPoolController ctrl; |
67 | | // ctrl.init(system_metrics, s3_pool); |
68 | | // ctrl.add("flush", {pool1, pool2}, |
69 | | // AdaptiveThreadPoolController::make_flush_adjust_func(&ctrl, pool1), |
70 | | // max_per_cpu, min_per_cpu); |
71 | | // // ... later ... |
72 | | // ctrl.cancel("flush"); // or ctrl.stop() |
73 | | class AdaptiveThreadPoolController { |
74 | | public: |
75 | | using AdjustFunc = |
76 | | std::function<int(int current, int min_threads, int max_threads, std::string& reason)>; |
77 | | |
78 | | static constexpr int kDefaultIntervalMs = 10000; |
79 | | |
80 | | static constexpr int kQueueThreshold = 10; |
81 | | static constexpr int kIOBusyThresholdPercent = 90; |
82 | | static constexpr int kCPUBusyThresholdPercent = 90; |
83 | | static constexpr int kS3QueueBusyThreshold = 100; |
84 | | |
85 | 509 | AdaptiveThreadPoolController() = default; |
86 | 509 | ~AdaptiveThreadPoolController() { stop(); } |
87 | | |
88 | | // Initialize with system-level dependencies. |
89 | | void init(SystemMetrics* system_metrics, ThreadPool* s3_file_upload_pool); |
90 | | |
91 | | // Cancel all registered pool groups. Must be called before the pools are destroyed. |
92 | | void stop(); |
93 | | |
94 | | // Register a pool group and start a recurring bthread_timer_add chain. |
95 | | void add(std::string name, std::vector<ThreadPool*> pools, AdjustFunc adjust_func, |
96 | | double max_threads_per_cpu, double min_threads_per_cpu, |
97 | | int64_t interval_ms = kDefaultIntervalMs); |
98 | | |
99 | | // Cancel the timer chain and remove the pool group. Blocks until any |
100 | | // in-flight callback finishes, then returns. Safe to call before pool teardown. |
101 | | void cancel(const std::string& name); |
102 | | |
103 | | // Fire all registered groups once, ignoring the schedule. For testing. |
104 | | void adjust_once(); |
105 | | |
106 | | // Get current thread count for a named group. For testing/debugging. |
107 | | int get_current_threads(const std::string& name) const; |
108 | | |
109 | | // System-state helpers; safe to call from inside an AdjustFunc. |
110 | | bool is_io_busy(); |
111 | | bool is_cpu_busy(); |
112 | | |
113 | | // Factory: standard flush-pool adjust function. |
114 | | static AdjustFunc make_flush_adjust_func(AdaptiveThreadPoolController* controller, |
115 | | ThreadPool* flush_pool); |
116 | | |
117 | | // Callback registered with bthread_timer_add. Public only for the C linkage |
118 | | // requirement; do not call directly. |
119 | | static void _on_timer(void* arg); |
120 | | |
121 | | private: |
122 | | struct PoolGroup { |
123 | | std::string name; |
124 | | std::vector<ThreadPool*> pools; |
125 | | AdjustFunc adjust_func; |
126 | | double max_threads_per_cpu = 4.0; |
127 | | double min_threads_per_cpu = 0.5; |
128 | | int current_threads = 0; |
129 | | TimerArg* timer_arg = nullptr; // owned; freed by cancel() |
130 | | |
131 | | int get_max_threads() const; |
132 | | int get_min_threads() const; |
133 | | }; |
134 | | |
135 | | // Run one group's adjustment. Called from _on_timer (no lock on entry). |
136 | | void _fire_group(const std::string& name); |
137 | | |
138 | | void _apply_thread_count(PoolGroup& group, int target_threads, const std::string& reason); |
139 | | |
140 | | private: |
141 | | SystemMetrics* _system_metrics = nullptr; |
142 | | ThreadPool* _s3_file_upload_pool = nullptr; |
143 | | |
144 | | mutable std::mutex _mutex; |
145 | | std::map<std::string, PoolGroup> _pool_groups; |
146 | | |
147 | | // Last successfully computed IO-busy result. Returned as-is when the |
148 | | // measurement interval is too short to produce a valid new delta. |
149 | | bool _last_io_busy = false; |
150 | | |
151 | | // For disk IO util calculation (used by is_io_busy). |
152 | | std::map<std::string, int64_t> _last_disk_io_time; |
153 | | int64_t _last_check_time_sec = 0; |
154 | | }; |
155 | | |
156 | | } // namespace doris |