be/src/cloud/cloud_throttle_state_machine.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_throttle_state_machine.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | |
24 | | namespace doris::cloud { |
25 | | |
26 | | // Display names for LoadRelatedRpc types |
27 | | static constexpr std::string_view LOAD_RELATED_RPC_NAMES[] = { |
28 | | "prepare_rowset", "commit_rowset", "update_tmp_rowset", "update_packed_file_info", |
29 | | "update_delete_bitmap"}; |
30 | | |
31 | 168 | std::string_view load_related_rpc_name(LoadRelatedRpc rpc) { |
32 | 168 | size_t idx = static_cast<size_t>(rpc); |
33 | 168 | if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
34 | 167 | return LOAD_RELATED_RPC_NAMES[idx]; |
35 | 167 | } |
36 | 1 | return "unknown"; |
37 | 168 | } |
38 | | |
39 | | // ============== RpcThrottleStateMachine ============== |
40 | | |
41 | 31 | RpcThrottleStateMachine::RpcThrottleStateMachine(RpcThrottleParams params) : _params(params) { |
42 | 31 | LOG(INFO) << "[ms-throttle] state machine initialized: top_k=" << params.top_k |
43 | 31 | << ", ratio=" << params.ratio << ", floor_qps=" << params.floor_qps; |
44 | 31 | } |
45 | | |
46 | 5 | void RpcThrottleStateMachine::update_params(RpcThrottleParams params) { |
47 | 5 | std::lock_guard lock(_mtx); |
48 | 5 | _params = params; |
49 | 5 | LOG(INFO) << "[ms-throttle] state machine params updated: top_k=" << params.top_k |
50 | 5 | << ", ratio=" << params.ratio << ", floor_qps=" << params.floor_qps; |
51 | 5 | } |
52 | | |
53 | | std::vector<RpcThrottleAction> RpcThrottleStateMachine::on_upgrade( |
54 | 42 | const std::vector<RpcQpsSnapshot>& qps_snapshot) { |
55 | 42 | std::lock_guard lock(_mtx); |
56 | | |
57 | 42 | UpgradeRecord record; |
58 | 42 | std::vector<RpcThrottleAction> actions; |
59 | | |
60 | 42 | double ratio = _params.ratio; |
61 | 42 | double floor_qps = _params.floor_qps; |
62 | | |
63 | | // Caller is responsible for providing top-k snapshot per RPC type. |
64 | | // State machine simply applies throttling to every entry in the snapshot. |
65 | 53 | for (const auto& snapshot : qps_snapshot) { |
66 | 53 | auto key = std::make_pair(snapshot.rpc_type, snapshot.table_id); |
67 | | |
68 | 53 | double old_limit = 0.0; |
69 | 53 | auto limit_it = _current_limits.find(key); |
70 | 53 | if (limit_it != _current_limits.end()) { |
71 | 17 | old_limit = limit_it->second; |
72 | 17 | } |
73 | | |
74 | 53 | double new_limit; |
75 | 53 | if (old_limit > 0) { |
76 | | // Already has a limit, reduce it further |
77 | 17 | new_limit = old_limit * ratio; |
78 | 36 | } else { |
79 | | // No limit yet, set based on current QPS |
80 | 36 | new_limit = snapshot.current_qps * ratio; |
81 | 36 | } |
82 | | |
83 | | // Apply floor |
84 | 53 | new_limit = std::max(new_limit, floor_qps); |
85 | | |
86 | | // Only apply if it's actually limiting |
87 | 53 | if (new_limit < snapshot.current_qps || old_limit > 0) { |
88 | 51 | RpcThrottleAction action { |
89 | 51 | .type = RpcThrottleAction::Type::SET_LIMIT, |
90 | 51 | .rpc_type = snapshot.rpc_type, |
91 | 51 | .table_id = snapshot.table_id, |
92 | 51 | .qps_limit = new_limit, |
93 | 51 | }; |
94 | 51 | actions.push_back(action); |
95 | 51 | record.changes[key] = {old_limit, new_limit}; |
96 | 51 | _current_limits[key] = new_limit; |
97 | | |
98 | 51 | LOG(INFO) << "[ms-throttle] upgrade: rpc=" << load_related_rpc_name(snapshot.rpc_type) |
99 | 51 | << ", table_id=" << snapshot.table_id |
100 | 51 | << ", current_qps=" << snapshot.current_qps << ", old_limit=" << old_limit |
101 | 51 | << ", new_limit=" << new_limit; |
102 | 51 | } |
103 | 53 | } |
104 | | |
105 | 42 | if (!record.changes.empty()) { |
106 | 35 | _upgrade_history.push_back(std::move(record)); |
107 | 35 | } |
108 | | |
109 | 42 | LOG(INFO) << "[ms-throttle] on_upgrade done: actions=" << actions.size() |
110 | 42 | << ", upgrade_level=" << _upgrade_history.size() |
111 | 42 | << ", snapshot_size=" << qps_snapshot.size(); |
112 | | |
113 | 42 | return actions; |
114 | 42 | } |
115 | | |
116 | 17 | std::vector<RpcThrottleAction> RpcThrottleStateMachine::on_downgrade() { |
117 | 17 | std::lock_guard lock(_mtx); |
118 | | |
119 | 17 | std::vector<RpcThrottleAction> actions; |
120 | | |
121 | 17 | if (_upgrade_history.empty()) { |
122 | 1 | LOG(INFO) << "[ms-throttle] on_downgrade skipped: no upgrade history"; |
123 | 1 | return actions; |
124 | 1 | } |
125 | | |
126 | | // Undo the most recent upgrade |
127 | 16 | const auto& record = _upgrade_history.back(); |
128 | | |
129 | 24 | for (const auto& [key, limits] : record.changes) { |
130 | 24 | const auto& [rpc_type, table_id] = key; |
131 | 24 | double old_limit = limits.first; |
132 | | |
133 | 24 | if (old_limit > 0) { |
134 | | // Restore the previous limit |
135 | 9 | RpcThrottleAction action { |
136 | 9 | .type = RpcThrottleAction::Type::SET_LIMIT, |
137 | 9 | .rpc_type = rpc_type, |
138 | 9 | .table_id = table_id, |
139 | 9 | .qps_limit = old_limit, |
140 | 9 | }; |
141 | | |
142 | 9 | actions.push_back(action); |
143 | 9 | _current_limits[key] = old_limit; |
144 | | |
145 | 9 | LOG(INFO) << "[ms-throttle] downgrade: rpc=" << load_related_rpc_name(rpc_type) |
146 | 9 | << ", table_id=" << table_id << ", restored_limit=" << old_limit; |
147 | 15 | } else { |
148 | | // No previous limit, remove it entirely |
149 | 15 | RpcThrottleAction action { |
150 | 15 | .type = RpcThrottleAction::Type::REMOVE_LIMIT, |
151 | 15 | .rpc_type = rpc_type, |
152 | 15 | .table_id = table_id, |
153 | 15 | }; |
154 | | |
155 | 15 | actions.push_back(action); |
156 | 15 | _current_limits.erase(key); |
157 | | |
158 | 15 | LOG(INFO) << "[ms-throttle] downgrade: rpc=" << load_related_rpc_name(rpc_type) |
159 | 15 | << ", table_id=" << table_id << ", removed limit"; |
160 | 15 | } |
161 | 24 | } |
162 | | |
163 | 16 | _upgrade_history.pop_back(); |
164 | | |
165 | 16 | LOG(INFO) << "[ms-throttle] on_downgrade done: actions=" << actions.size() |
166 | 16 | << ", upgrade_level=" << _upgrade_history.size(); |
167 | | |
168 | 16 | return actions; |
169 | 17 | } |
170 | | |
171 | 30 | size_t RpcThrottleStateMachine::upgrade_level() const { |
172 | 30 | std::lock_guard lock(_mtx); |
173 | 30 | return _upgrade_history.size(); |
174 | 30 | } |
175 | | |
176 | 25 | double RpcThrottleStateMachine::get_current_limit(LoadRelatedRpc rpc_type, int64_t table_id) const { |
177 | 25 | std::lock_guard lock(_mtx); |
178 | 25 | auto it = _current_limits.find({rpc_type, table_id}); |
179 | 25 | if (it != _current_limits.end()) { |
180 | 16 | return it->second; |
181 | 16 | } |
182 | 9 | return 0.0; |
183 | 25 | } |
184 | | |
185 | 8 | RpcThrottleParams RpcThrottleStateMachine::get_params() const { |
186 | 8 | std::lock_guard lock(_mtx); |
187 | 8 | return _params; |
188 | 8 | } |
189 | | |
190 | | // ============== RpcThrottleCoordinator ============== |
191 | | |
192 | 22 | RpcThrottleCoordinator::RpcThrottleCoordinator(ThrottleCoordinatorParams params) : _params(params) { |
193 | 22 | LOG(INFO) << "[ms-throttle] coordinator initialized: upgrade_cooldown_ticks=" |
194 | 22 | << params.upgrade_cooldown_ticks |
195 | 22 | << ", downgrade_after_ticks=" << params.downgrade_after_ticks; |
196 | 22 | } |
197 | | |
198 | 5 | void RpcThrottleCoordinator::update_params(ThrottleCoordinatorParams params) { |
199 | 5 | std::lock_guard lock(_mtx); |
200 | 5 | _params = params; |
201 | 5 | LOG(INFO) << "[ms-throttle] coordinator params updated: upgrade_cooldown_ticks=" |
202 | 5 | << params.upgrade_cooldown_ticks |
203 | 5 | << ", downgrade_after_ticks=" << params.downgrade_after_ticks; |
204 | 5 | } |
205 | | |
206 | 93 | bool RpcThrottleCoordinator::report_ms_busy() { |
207 | 93 | std::lock_guard lock(_mtx); |
208 | | |
209 | | // Reset tick counter since last MS_BUSY |
210 | 93 | _ticks_since_last_ms_busy = 0; |
211 | | |
212 | | // Check if cooldown has passed |
213 | 93 | if (_ticks_since_last_upgrade == -1 || |
214 | 93 | _ticks_since_last_upgrade >= _params.upgrade_cooldown_ticks) { |
215 | | // Reset upgrade counter |
216 | 26 | auto actual_ticks = _ticks_since_last_upgrade; |
217 | 26 | _ticks_since_last_upgrade = 0; |
218 | 26 | _has_pending_upgrades = true; |
219 | | |
220 | 26 | LOG(INFO) << "[ms-throttle] upgrade triggered: ticks_since_last_upgrade=" << actual_ticks |
221 | 26 | << ", cooldown=" << _params.upgrade_cooldown_ticks; |
222 | 26 | return true; // Should trigger upgrade |
223 | 26 | } |
224 | 67 | return false; // Cooling down |
225 | 93 | } |
226 | | |
227 | 424 | bool RpcThrottleCoordinator::tick(int ticks) { |
228 | 424 | std::lock_guard lock(_mtx); |
229 | | |
230 | | // Increment tick counters |
231 | 424 | if (_ticks_since_last_ms_busy >= 0) { |
232 | 423 | _ticks_since_last_ms_busy += ticks; |
233 | 423 | } |
234 | 424 | if (_ticks_since_last_upgrade >= 0) { |
235 | 423 | _ticks_since_last_upgrade += ticks; |
236 | 423 | } |
237 | | |
238 | | // Check if downgrade should be triggered |
239 | 424 | if (_has_pending_upgrades && _ticks_since_last_ms_busy >= _params.downgrade_after_ticks) { |
240 | | // Reset for next downgrade cycle |
241 | 12 | auto actual_ticks = _ticks_since_last_ms_busy; |
242 | 12 | _ticks_since_last_ms_busy = 0; |
243 | | |
244 | 12 | LOG(INFO) << "[ms-throttle] downgrade triggered: ticks_since_last_ms_busy=" << actual_ticks |
245 | 12 | << ", threshold=" << _params.downgrade_after_ticks; |
246 | 12 | return true; // Should trigger downgrade |
247 | 12 | } |
248 | | |
249 | 412 | return false; |
250 | 424 | } |
251 | | |
252 | 24 | void RpcThrottleCoordinator::set_has_pending_upgrades(bool has) { |
253 | 24 | std::lock_guard lock(_mtx); |
254 | 24 | _has_pending_upgrades = has; |
255 | 24 | } |
256 | | |
257 | 4 | int RpcThrottleCoordinator::ticks_since_last_ms_busy() const { |
258 | 4 | std::lock_guard lock(_mtx); |
259 | 4 | return _ticks_since_last_ms_busy; |
260 | 4 | } |
261 | | |
262 | 5 | int RpcThrottleCoordinator::ticks_since_last_upgrade() const { |
263 | 5 | std::lock_guard lock(_mtx); |
264 | 5 | return _ticks_since_last_upgrade; |
265 | 5 | } |
266 | | |
267 | 2 | ThrottleCoordinatorParams RpcThrottleCoordinator::get_params() const { |
268 | 2 | std::lock_guard lock(_mtx); |
269 | 2 | return _params; |
270 | 2 | } |
271 | | |
272 | | } // namespace doris::cloud |