be/src/cloud/cloud_ms_backpressure_handler.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_ms_backpressure_handler.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <cmath> |
25 | | #include <queue> |
26 | | #include <thread> |
27 | | |
28 | | #include "cloud/config.h" |
29 | | #include "common/status.h" |
30 | | #include "util/thread.h" |
31 | | |
32 | | namespace doris::cloud { |
33 | | |
34 | | // Global bvar metrics |
35 | | bvar::Adder<uint64_t> g_backpressure_upgrade_count("ms_rpc_backpressure_upgrade_count"); |
36 | | bvar::Window<bvar::Adder<uint64_t>> g_backpressure_upgrade_60s("ms_rpc_backpressure_upgrade_60s", |
37 | | &g_backpressure_upgrade_count, 60); |
38 | | bvar::Adder<uint64_t> g_backpressure_downgrade_count("ms_rpc_backpressure_downgrade_count"); |
39 | | bvar::Window<bvar::Adder<uint64_t>> g_backpressure_downgrade_60s( |
40 | | "ms_rpc_backpressure_downgrade_60s", &g_backpressure_downgrade_count, 60); |
41 | | bvar::LatencyRecorder g_throttle_wait_prepare_rowset( |
42 | | "ms_rpc_backpressure_throttle_wait_prepare_rowset"); |
43 | | bvar::LatencyRecorder g_throttle_wait_commit_rowset( |
44 | | "ms_rpc_backpressure_throttle_wait_commit_rowset"); |
45 | | bvar::LatencyRecorder g_throttle_wait_update_tmp_rowset( |
46 | | "ms_rpc_backpressure_throttle_wait_update_tmp_rowset"); |
47 | | bvar::LatencyRecorder g_throttle_wait_update_packed_file_info( |
48 | | "ms_rpc_backpressure_throttle_wait_update_packed_file_info"); |
49 | | bvar::LatencyRecorder g_throttle_wait_update_delete_bitmap( |
50 | | "ms_rpc_backpressure_throttle_wait_update_delete_bitmap"); |
51 | | bvar::Adder<uint64_t> g_ms_busy_count("ms_rpc_backpressure_ms_busy_count"); |
52 | | bvar::Window<bvar::Adder<uint64_t>> g_ms_busy_60s("ms_rpc_backpressure_ms_busy_60s", |
53 | | &g_ms_busy_count, 60); |
54 | | |
55 | | static bvar::LatencyRecorder* s_throttle_wait_recorders[] = { |
56 | | &g_throttle_wait_prepare_rowset, &g_throttle_wait_commit_rowset, |
57 | | &g_throttle_wait_update_tmp_rowset, &g_throttle_wait_update_packed_file_info, |
58 | | &g_throttle_wait_update_delete_bitmap, |
59 | | }; |
60 | | |
61 | 0 | bvar::LatencyRecorder* get_throttle_wait_recorder(LoadRelatedRpc rpc) { |
62 | 0 | size_t idx = static_cast<size_t>(rpc); |
63 | 0 | if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
64 | 0 | return nullptr; |
65 | 0 | } |
66 | 0 | return s_throttle_wait_recorders[idx]; |
67 | 0 | } |
68 | | |
69 | | // ============== StrictQpsLimiter ============== |
70 | | |
71 | 13 | StrictQpsLimiter::StrictQpsLimiter(double qps) { |
72 | 13 | if (qps <= 0) { |
73 | 1 | qps = 1.0; |
74 | 1 | } |
75 | 13 | _interval_ns = static_cast<int64_t>(1e9 / qps); |
76 | 13 | _next_allowed_time = Clock::now(); |
77 | 13 | } |
78 | | |
79 | 997 | StrictQpsLimiter::Clock::time_point StrictQpsLimiter::reserve() { |
80 | 997 | std::lock_guard lock(_mtx); |
81 | 997 | auto now = Clock::now(); |
82 | 997 | if (_next_allowed_time <= now) { |
83 | 4 | _next_allowed_time = now + std::chrono::nanoseconds(_interval_ns); |
84 | 4 | return now; |
85 | 4 | } |
86 | 993 | auto result = _next_allowed_time; |
87 | 993 | _next_allowed_time += std::chrono::nanoseconds(_interval_ns); |
88 | 993 | return result; |
89 | 997 | } |
90 | | |
91 | 1 | void StrictQpsLimiter::update_qps(double new_qps) { |
92 | 1 | if (new_qps <= 0) { |
93 | 0 | new_qps = 1.0; |
94 | 0 | } |
95 | 1 | std::lock_guard lock(_mtx); |
96 | 1 | _interval_ns = static_cast<int64_t>(1e9 / new_qps); |
97 | 1 | } |
98 | | |
99 | 6 | double StrictQpsLimiter::get_qps() const { |
100 | 6 | std::lock_guard lock(_mtx); |
101 | 6 | if (_interval_ns <= 0) { |
102 | 0 | return 0; |
103 | 0 | } |
104 | 6 | return 1e9 / _interval_ns; |
105 | 6 | } |
106 | | |
107 | | // ============== TableRpcQpsCounter ============== |
108 | | |
109 | | TableRpcQpsCounter::TableRpcQpsCounter(int64_t table_id, LoadRelatedRpc rpc_type, int window_sec) |
110 | 18 | : _table_id(table_id), _rpc_type(rpc_type) { |
111 | 18 | _counter = std::make_unique<bvar::Adder<int64_t>>(); |
112 | 18 | _counter->hide(); |
113 | 18 | _qps = std::make_unique<bvar::PerSecond<bvar::Adder<int64_t>>>(_counter.get(), window_sec); |
114 | 18 | _qps->hide(); |
115 | 18 | } |
116 | | |
117 | 766 | void TableRpcQpsCounter::increment() { |
118 | 766 | (*_counter) << 1; |
119 | 766 | } |
120 | | |
121 | 15 | double TableRpcQpsCounter::get_qps() const { |
122 | 15 | return _qps->get_value(); |
123 | 15 | } |
124 | | |
125 | | // ============== TableRpcQpsRegistry ============== |
126 | | |
127 | 19 | TableRpcQpsRegistry::TableRpcQpsRegistry() = default; |
128 | | |
129 | 766 | void TableRpcQpsRegistry::record(LoadRelatedRpc rpc_type, int64_t table_id) { |
130 | 766 | auto* counter = get_or_create_counter(rpc_type, table_id); |
131 | 766 | if (counter) { |
132 | 766 | counter->increment(); |
133 | 766 | } |
134 | 766 | } |
135 | | |
136 | | TableRpcQpsCounter* TableRpcQpsRegistry::get_or_create_counter(LoadRelatedRpc rpc_type, |
137 | 766 | int64_t table_id) { |
138 | 766 | size_t idx = static_cast<size_t>(rpc_type); |
139 | 766 | if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
140 | 0 | return nullptr; |
141 | 0 | } |
142 | | |
143 | 766 | { |
144 | 766 | std::shared_lock lock(_mutex); |
145 | 766 | auto it = _counters[idx].find(table_id); |
146 | 766 | if (it != _counters[idx].end()) { |
147 | 748 | return it->second.get(); |
148 | 748 | } |
149 | 766 | } |
150 | | |
151 | 18 | std::unique_lock lock(_mutex); |
152 | | // Double check after acquiring exclusive lock |
153 | 18 | auto it = _counters[idx].find(table_id); |
154 | 18 | if (it != _counters[idx].end()) { |
155 | 0 | return it->second.get(); |
156 | 0 | } |
157 | | |
158 | 18 | auto counter = std::make_unique<TableRpcQpsCounter>(table_id, rpc_type, |
159 | 18 | config::ms_rpc_table_qps_window_sec); |
160 | 18 | auto* ptr = counter.get(); |
161 | 18 | _counters[idx][table_id] = std::move(counter); |
162 | 18 | return ptr; |
163 | 18 | } |
164 | | |
165 | | std::vector<std::pair<int64_t, double>> TableRpcQpsRegistry::get_top_k_tables( |
166 | 41 | LoadRelatedRpc rpc_type, int k) const { |
167 | 41 | size_t idx = static_cast<size_t>(rpc_type); |
168 | 41 | if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT) || k <= 0) { |
169 | 4 | return {}; |
170 | 4 | } |
171 | | |
172 | | // Use a min-heap of size k to find top-k without allocating a vector for all tables. |
173 | | // The heap top is the smallest among the k largest elements seen so far. |
174 | 37 | using Entry = std::pair<int64_t, double>; // (table_id, qps) |
175 | 37 | auto min_cmp = [](const Entry& a, const Entry& b) { return a.second > b.second; }; |
176 | 37 | std::priority_queue<Entry, std::vector<Entry>, decltype(min_cmp)> min_heap(min_cmp); |
177 | | |
178 | 37 | { |
179 | 37 | std::shared_lock lock(_mutex); |
180 | 37 | for (const auto& [table_id, counter] : _counters[idx]) { |
181 | 14 | double qps = counter->get_qps(); |
182 | 14 | if (qps > 0) { |
183 | 13 | if (static_cast<int>(min_heap.size()) < k) { |
184 | 12 | min_heap.push({table_id, qps}); |
185 | 12 | } else if (qps > min_heap.top().second) { |
186 | 1 | min_heap.pop(); |
187 | 1 | min_heap.push({table_id, qps}); |
188 | 1 | } |
189 | 13 | } |
190 | 14 | } |
191 | 37 | } |
192 | | |
193 | | // Extract results from heap (comes out in ascending order, reverse to descending) |
194 | 37 | std::vector<Entry> result; |
195 | 37 | result.reserve(min_heap.size()); |
196 | 49 | while (!min_heap.empty()) { |
197 | 12 | result.push_back(min_heap.top()); |
198 | 12 | min_heap.pop(); |
199 | 12 | } |
200 | 37 | std::reverse(result.begin(), result.end()); |
201 | | |
202 | 37 | return result; |
203 | 41 | } |
204 | | |
205 | 1 | double TableRpcQpsRegistry::get_qps(LoadRelatedRpc rpc_type, int64_t table_id) const { |
206 | 1 | size_t idx = static_cast<size_t>(rpc_type); |
207 | 1 | if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
208 | 0 | return 0; |
209 | 0 | } |
210 | | |
211 | 1 | std::shared_lock lock(_mutex); |
212 | 1 | auto it = _counters[idx].find(table_id); |
213 | 1 | if (it != _counters[idx].end()) { |
214 | 1 | return it->second->get_qps(); |
215 | 1 | } |
216 | 0 | return 0; |
217 | 1 | } |
218 | | |
219 | 0 | void TableRpcQpsRegistry::cleanup_inactive_tables() { |
220 | 0 | std::unique_lock lock(_mutex); |
221 | |
|
222 | 0 | for (size_t idx = 0; idx < static_cast<size_t>(LoadRelatedRpc::COUNT); ++idx) { |
223 | 0 | auto& counter_map = _counters[idx]; |
224 | 0 | for (auto it = counter_map.begin(); it != counter_map.end();) { |
225 | | // Remove counters with zero QPS for a long time |
226 | 0 | if (it->second->get_qps() < 0.01) { |
227 | 0 | it = counter_map.erase(it); |
228 | 0 | } else { |
229 | 0 | ++it; |
230 | 0 | } |
231 | 0 | } |
232 | 0 | } |
233 | 0 | } |
234 | | |
235 | | // ============== TableRpcThrottler ============== |
236 | | |
237 | 15 | TableRpcThrottler::TableRpcThrottler() { |
238 | | // Initialize bvar for throttled table counts |
239 | 90 | for (size_t i = 0; i < static_cast<size_t>(LoadRelatedRpc::COUNT); ++i) { |
240 | 75 | std::string bvar_name = fmt::format("ms_rpc_backpressure_throttled_tables_{}", |
241 | 75 | load_related_rpc_name(static_cast<LoadRelatedRpc>(i))); |
242 | 75 | _throttled_table_counts[i] = std::make_unique<bvar::Status<size_t>>(bvar_name, 0); |
243 | 75 | } |
244 | 15 | } |
245 | | |
246 | | std::chrono::steady_clock::time_point TableRpcThrottler::throttle(LoadRelatedRpc rpc_type, |
247 | 6 | int64_t table_id) { |
248 | 6 | std::shared_lock lock(_mutex); |
249 | 6 | auto it = _limiters.find({rpc_type, table_id}); |
250 | 6 | if (it == _limiters.end()) { |
251 | 2 | return std::chrono::steady_clock::now(); |
252 | 2 | } |
253 | 4 | return it->second->reserve(); |
254 | 6 | } |
255 | | |
256 | 9 | void TableRpcThrottler::set_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id, double qps_limit) { |
257 | 9 | if (qps_limit <= 0) { |
258 | 0 | return; |
259 | 0 | } |
260 | | |
261 | 9 | std::unique_lock lock(_mutex); |
262 | 9 | auto key = std::make_pair(rpc_type, table_id); |
263 | 9 | auto it = _limiters.find(key); |
264 | 9 | if (it != _limiters.end()) { |
265 | 0 | it->second->update_qps(qps_limit); |
266 | 9 | } else { |
267 | 9 | _limiters[key] = std::make_unique<StrictQpsLimiter>(qps_limit); |
268 | | // Update bvar count |
269 | 9 | size_t idx = static_cast<size_t>(rpc_type); |
270 | 9 | if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
271 | 9 | size_t count = 0; |
272 | 11 | for (const auto& [k, _] : _limiters) { |
273 | 11 | if (k.first == rpc_type) { |
274 | 10 | ++count; |
275 | 10 | } |
276 | 11 | } |
277 | 9 | _throttled_table_counts[idx]->set_value(count); |
278 | 9 | } |
279 | 9 | } |
280 | | |
281 | 9 | LOG(INFO) << "[ms-throttle] set table QPS limit: rpc=" << load_related_rpc_name(rpc_type) |
282 | 9 | << ", table_id=" << table_id << ", qps_limit=" << qps_limit; |
283 | 9 | } |
284 | | |
285 | 3 | void TableRpcThrottler::remove_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) { |
286 | 3 | std::unique_lock lock(_mutex); |
287 | 3 | auto key = std::make_pair(rpc_type, table_id); |
288 | 3 | auto it = _limiters.find(key); |
289 | 3 | if (it != _limiters.end()) { |
290 | 3 | _limiters.erase(it); |
291 | | // Update bvar count |
292 | 3 | size_t idx = static_cast<size_t>(rpc_type); |
293 | 3 | if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
294 | 3 | size_t count = 0; |
295 | 3 | for (const auto& [k, _] : _limiters) { |
296 | 1 | if (k.first == rpc_type) { |
297 | 1 | ++count; |
298 | 1 | } |
299 | 1 | } |
300 | 3 | _throttled_table_counts[idx]->set_value(count); |
301 | 3 | } |
302 | | |
303 | 3 | LOG(INFO) << "[ms-throttle] removed table QPS limit: rpc=" |
304 | 3 | << load_related_rpc_name(rpc_type) << ", table_id=" << table_id; |
305 | 3 | } |
306 | 3 | } |
307 | | |
308 | 4 | double TableRpcThrottler::get_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) const { |
309 | 4 | std::shared_lock lock(_mutex); |
310 | 4 | auto it = _limiters.find({rpc_type, table_id}); |
311 | 4 | if (it != _limiters.end()) { |
312 | 3 | return it->second->get_qps(); |
313 | 3 | } |
314 | 1 | return 0; |
315 | 4 | } |
316 | | |
317 | 4 | bool TableRpcThrottler::has_limit(LoadRelatedRpc rpc_type, int64_t table_id) const { |
318 | 4 | std::shared_lock lock(_mutex); |
319 | 4 | return _limiters.find({rpc_type, table_id}) != _limiters.end(); |
320 | 4 | } |
321 | | |
322 | 3 | size_t TableRpcThrottler::get_throttled_table_count(LoadRelatedRpc rpc_type) const { |
323 | 3 | size_t idx = static_cast<size_t>(rpc_type); |
324 | 3 | if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
325 | 0 | return 0; |
326 | 0 | } |
327 | 3 | return _throttled_table_counts[idx]->get_value(); |
328 | 3 | } |
329 | | |
330 | 1 | std::vector<TableRpcThrottler::ThrottleEntry> TableRpcThrottler::get_all_throttled_entries() const { |
331 | 1 | std::shared_lock lock(_mutex); |
332 | 1 | std::vector<ThrottleEntry> entries; |
333 | 1 | entries.reserve(_limiters.size()); |
334 | 1 | for (const auto& [key, limiter] : _limiters) { |
335 | 0 | entries.push_back({key.first, key.second, limiter->get_qps()}); |
336 | 0 | } |
337 | 1 | return entries; |
338 | 1 | } |
339 | | |
340 | | // ============== MSBackpressureHandler ============== |
341 | | |
342 | | MSBackpressureHandler::MSBackpressureHandler(TableRpcQpsRegistry* qps_registry, |
343 | | TableRpcThrottler* throttler) |
344 | 9 | : _qps_registry(qps_registry), |
345 | 9 | _throttler(throttler), |
346 | 9 | _stop_latch(1), |
347 | 9 | _last_ms_busy_time(std::chrono::steady_clock::time_point::min()) { |
348 | | // Initialize state machine with config values |
349 | 9 | RpcThrottleParams throttle_params { |
350 | 9 | .top_k = config::ms_backpressure_upgrade_top_k, |
351 | 9 | .ratio = config::ms_backpressure_throttle_ratio, |
352 | 9 | .floor_qps = config::ms_rpc_table_qps_limit_floor, |
353 | 9 | }; |
354 | 9 | _state_machine = std::make_unique<RpcThrottleStateMachine>(throttle_params); |
355 | | |
356 | | // Initialize coordinator with config values |
357 | | // Coordinator uses ticks where 1 tick = 1 millisecond (fixed unit) |
358 | | // This allows tick_interval_ms to change at runtime without affecting correctness |
359 | 9 | ThrottleCoordinatorParams coordinator_params { |
360 | 9 | .upgrade_cooldown_ticks = config::ms_backpressure_upgrade_interval_ms, |
361 | 9 | .downgrade_after_ticks = config::ms_backpressure_downgrade_interval_ms, |
362 | 9 | }; |
363 | 9 | _coordinator = std::make_unique<RpcThrottleCoordinator>(coordinator_params); |
364 | | |
365 | 9 | auto st = Thread::create( |
366 | 9 | "MSBackpressureHandler", "tick_thread", [this]() { this->_tick_thread_callback(); }, |
367 | 9 | &_tick_thread); |
368 | 9 | if (!st.ok()) { |
369 | 0 | LOG(WARNING) << "[ms-throttle] failed to create tick thread: " << st; |
370 | 9 | } else { |
371 | 9 | LOG(INFO) << "[ms-throttle] handler started: upgrade_cooldown=" |
372 | 9 | << config::ms_backpressure_upgrade_interval_ms |
373 | 9 | << "ms, downgrade_interval=" << config::ms_backpressure_downgrade_interval_ms |
374 | 9 | << "ms"; |
375 | 9 | } |
376 | 9 | } |
377 | | |
378 | 8 | MSBackpressureHandler::~MSBackpressureHandler() { |
379 | 8 | _stop_latch.count_down(); |
380 | 8 | if (_tick_thread) { |
381 | 8 | _tick_thread->join(); |
382 | 8 | } |
383 | 8 | } |
384 | | |
385 | 9 | void MSBackpressureHandler::_tick_thread_callback() { |
386 | | // Fixed tick interval: 1 second. Since 1 tick = 1 ms, advance by 1000 ticks each iteration. |
387 | 9 | constexpr int kTickIntervalMs = 1000; |
388 | 4.31k | while (!_stop_latch.wait_for(std::chrono::milliseconds(kTickIntervalMs))) { |
389 | 4.30k | _advance_time(kTickIntervalMs); |
390 | 4.30k | } |
391 | 9 | } |
392 | | |
393 | 4.30k | void MSBackpressureHandler::_advance_time(int ticks) { |
394 | 4.30k | if (!config::enable_ms_backpressure_handling) { |
395 | 4.30k | return; |
396 | 4.30k | } |
397 | | |
398 | 4 | std::lock_guard lock(_transition_mutex); |
399 | | |
400 | | // Advance coordinator time; if downgrade is triggered, handle it |
401 | 4 | if (_coordinator->tick(ticks)) { |
402 | 1 | LOG(INFO) << "[ms-throttle] triggering downgrade, upgrade_level=" |
403 | 1 | << _state_machine->upgrade_level(); |
404 | | |
405 | 1 | auto actions = _state_machine->on_downgrade(); |
406 | 1 | _apply_actions(actions); |
407 | 1 | _coordinator->set_has_pending_upgrades(_state_machine->upgrade_level() > 0); |
408 | | |
409 | 1 | g_backpressure_downgrade_count << 1; |
410 | 1 | } |
411 | 4 | } |
412 | | |
413 | 9 | bool MSBackpressureHandler::on_ms_busy() { |
414 | 9 | g_ms_busy_count << 1; |
415 | | |
416 | 9 | if (!config::enable_ms_backpressure_handling) { |
417 | 1 | return false; |
418 | 1 | } |
419 | | |
420 | 8 | { |
421 | 8 | std::lock_guard lock(_mutex); |
422 | 8 | _last_ms_busy_time = std::chrono::steady_clock::now(); |
423 | 8 | } |
424 | | |
425 | 8 | std::lock_guard lock(_transition_mutex); |
426 | | |
427 | | // Check with coordinator if upgrade should be triggered |
428 | 8 | if (!_coordinator->report_ms_busy()) { |
429 | 2 | return false; |
430 | 2 | } |
431 | | |
432 | 8 | LOG(INFO) << "[ms-throttle] received MS_BUSY, triggering upgrade"; |
433 | | |
434 | 6 | auto snapshot = _build_qps_snapshot(); |
435 | 6 | auto actions = _state_machine->on_upgrade(snapshot); |
436 | 6 | _apply_actions(actions); |
437 | 6 | _coordinator->set_has_pending_upgrades(_state_machine->upgrade_level() > 0); |
438 | | |
439 | 6 | g_backpressure_upgrade_count << 1; |
440 | 6 | return true; |
441 | 8 | } |
442 | | |
443 | | std::chrono::steady_clock::time_point MSBackpressureHandler::before_rpc(LoadRelatedRpc rpc_type, |
444 | 439k | int64_t table_id) { |
445 | 440k | if (!config::enable_ms_backpressure_handling) { |
446 | 440k | return std::chrono::steady_clock::now(); |
447 | 440k | } |
448 | | |
449 | 18.4E | return _throttler->throttle(rpc_type, table_id); |
450 | 439k | } |
451 | | |
452 | 445k | void MSBackpressureHandler::after_rpc(LoadRelatedRpc rpc_type, int64_t table_id) { |
453 | 445k | if (!config::enable_ms_backpressure_handling) { |
454 | 445k | return; |
455 | 445k | } |
456 | | |
457 | 18.4E | _qps_registry->record(rpc_type, table_id); |
458 | 18.4E | } |
459 | | |
460 | 0 | void MSBackpressureHandler::update_throttle_params(RpcThrottleParams params) { |
461 | 0 | _state_machine->update_params(params); |
462 | 0 | } |
463 | | |
464 | 0 | void MSBackpressureHandler::update_coordinator_params(ThrottleCoordinatorParams params) { |
465 | 0 | _coordinator->update_params(params); |
466 | 0 | } |
467 | | |
468 | 3 | int64_t MSBackpressureHandler::seconds_since_last_ms_busy() const { |
469 | 3 | std::lock_guard lock(_mutex); |
470 | 3 | if (_last_ms_busy_time == std::chrono::steady_clock::time_point::min()) { |
471 | 1 | return -1; // Never received MS_BUSY |
472 | 1 | } |
473 | 2 | return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - |
474 | 2 | _last_ms_busy_time) |
475 | 2 | .count(); |
476 | 3 | } |
477 | | |
478 | 2 | size_t MSBackpressureHandler::upgrade_level() const { |
479 | 2 | return _state_machine->upgrade_level(); |
480 | 2 | } |
481 | | |
482 | 0 | int MSBackpressureHandler::ticks_since_last_ms_busy() const { |
483 | 0 | return _coordinator->ticks_since_last_ms_busy(); |
484 | 0 | } |
485 | | |
486 | 0 | int MSBackpressureHandler::ticks_since_last_upgrade() const { |
487 | 0 | return _coordinator->ticks_since_last_upgrade(); |
488 | 0 | } |
489 | | |
490 | 7 | void MSBackpressureHandler::_apply_actions(const std::vector<RpcThrottleAction>& actions) { |
491 | 7 | for (const auto& action : actions) { |
492 | 2 | switch (action.type) { |
493 | 1 | case RpcThrottleAction::Type::SET_LIMIT: |
494 | 1 | _throttler->set_qps_limit(action.rpc_type, action.table_id, action.qps_limit); |
495 | 1 | break; |
496 | 1 | case RpcThrottleAction::Type::REMOVE_LIMIT: |
497 | 1 | _throttler->remove_qps_limit(action.rpc_type, action.table_id); |
498 | 1 | break; |
499 | 2 | } |
500 | 2 | } |
501 | 7 | } |
502 | | |
503 | 6 | std::vector<RpcQpsSnapshot> MSBackpressureHandler::_build_qps_snapshot() const { |
504 | 6 | std::vector<RpcQpsSnapshot> snapshot; |
505 | | |
506 | | // For each RPC type, get top-k tables |
507 | 6 | int top_k = _state_machine->get_params().top_k; |
508 | | |
509 | 36 | for (size_t i = 0; i < static_cast<size_t>(LoadRelatedRpc::COUNT); ++i) { |
510 | 30 | LoadRelatedRpc rpc_type = static_cast<LoadRelatedRpc>(i); |
511 | 30 | auto top_tables = _qps_registry->get_top_k_tables(rpc_type, top_k); |
512 | | |
513 | 30 | for (const auto& [table_id, qps] : top_tables) { |
514 | 1 | snapshot.push_back({rpc_type, table_id, qps}); |
515 | 1 | } |
516 | 30 | } |
517 | | |
518 | 6 | return snapshot; |
519 | 6 | } |
520 | | |
521 | | } // namespace doris::cloud |