be/src/cloud/cloud_ms_backpressure_handler.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | |
22 | | #include <array> |
23 | | #include <chrono> |
24 | | #include <map> |
25 | | #include <memory> |
26 | | #include <mutex> |
27 | | #include <shared_mutex> |
28 | | #include <string> |
29 | | #include <string_view> |
30 | | #include <unordered_map> |
31 | | #include <vector> |
32 | | |
33 | | #include "cloud/cloud_throttle_state_machine.h" |
34 | | #include "util/countdown_latch.h" |
35 | | #include "util/thread.h" |
36 | | |
37 | | namespace doris::cloud { |
38 | | |
39 | | // Strict QPS limiter that doesn't allow burst |
40 | | // Unlike token bucket, it strictly enforces fixed intervals between requests |
41 | | class StrictQpsLimiter { |
42 | | public: |
43 | | using Clock = std::chrono::steady_clock; |
44 | | |
45 | | explicit StrictQpsLimiter(double qps); |
46 | | |
47 | | // Returns the time point when the request is allowed to execute |
48 | | // Caller should sleep until this time point |
49 | | Clock::time_point reserve(); |
50 | | |
51 | | // Dynamically update the QPS limit |
52 | | void update_qps(double new_qps); |
53 | | |
54 | | // Get current QPS limit |
55 | | double get_qps() const; |
56 | | |
57 | | private: |
58 | | mutable std::mutex _mtx; |
59 | | int64_t _interval_ns; |
60 | | Clock::time_point _next_allowed_time; |
61 | | }; |
62 | | |
63 | | // QPS counter for a single (table, RPC type) pair using bvar |
64 | | class TableRpcQpsCounter { |
65 | | public: |
66 | | TableRpcQpsCounter(int64_t table_id, LoadRelatedRpc rpc_type, int window_sec); |
67 | 18 | ~TableRpcQpsCounter() = default; |
68 | | |
69 | | // Record one RPC call |
70 | | void increment(); |
71 | | |
72 | | // Get current QPS (average over the configured time window) |
73 | | double get_qps() const; |
74 | | |
75 | 0 | int64_t table_id() const { return _table_id; } |
76 | 0 | LoadRelatedRpc rpc_type() const { return _rpc_type; } |
77 | | |
78 | | private: |
79 | | int64_t _table_id; |
80 | | LoadRelatedRpc _rpc_type; |
81 | | |
82 | | std::unique_ptr<bvar::Adder<int64_t>> _counter; |
83 | | std::unique_ptr<bvar::PerSecond<bvar::Adder<int64_t>>> _qps; |
84 | | }; |
85 | | |
86 | | // Registry managing QPS counters for all tables |
87 | | class TableRpcQpsRegistry { |
88 | | public: |
89 | | TableRpcQpsRegistry(); |
90 | 18 | ~TableRpcQpsRegistry() = default; |
91 | | |
92 | | // Record one RPC call for the given table |
93 | | void record(LoadRelatedRpc rpc_type, int64_t table_id); |
94 | | |
95 | | // Get the top-k tables with highest QPS for the given RPC type |
96 | | // Returns: [(table_id, qps), ...] sorted by qps in descending order |
97 | | std::vector<std::pair<int64_t, double>> get_top_k_tables(LoadRelatedRpc rpc_type, int k) const; |
98 | | |
99 | | // Get QPS for a specific table on a specific RPC type |
100 | | double get_qps(LoadRelatedRpc rpc_type, int64_t table_id) const; |
101 | | |
102 | | // Clean up counters for tables that have been inactive for a long time |
103 | | void cleanup_inactive_tables(); |
104 | | |
105 | | private: |
106 | | // Get or create counter for (rpc_type, table_id) |
107 | | TableRpcQpsCounter* get_or_create_counter(LoadRelatedRpc rpc_type, int64_t table_id); |
108 | | |
109 | | mutable std::shared_mutex _mutex; |
110 | | |
111 | | // rpc_type -> (table_id -> counter) |
112 | | std::array<std::unordered_map<int64_t, std::unique_ptr<TableRpcQpsCounter>>, |
113 | | static_cast<size_t>(LoadRelatedRpc::COUNT)> |
114 | | _counters; |
115 | | }; |
116 | | |
117 | | // Table-level throttler managing StrictQpsLimiter for each (RPC type, table) pair |
118 | | class TableRpcThrottler { |
119 | | public: |
120 | | TableRpcThrottler(); |
121 | 14 | ~TableRpcThrottler() = default; |
122 | | |
123 | | // Called before RPC execution, returns the time point when execution is allowed |
124 | | // Returns now if no limit is set |
125 | | std::chrono::steady_clock::time_point throttle(LoadRelatedRpc rpc_type, int64_t table_id); |
126 | | |
127 | | // Set or update the QPS limit for a table |
128 | | void set_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id, double qps_limit); |
129 | | |
130 | | // Remove the QPS limit for a table |
131 | | void remove_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id); |
132 | | |
133 | | // Get current QPS limit (returns 0 if not set) |
134 | | double get_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) const; |
135 | | |
136 | | // Check if a limit exists for the given (rpc_type, table_id) |
137 | | bool has_limit(LoadRelatedRpc rpc_type, int64_t table_id) const; |
138 | | |
139 | | // Get the number of throttled tables for a given RPC type |
140 | | size_t get_throttled_table_count(LoadRelatedRpc rpc_type) const; |
141 | | |
142 | | // Get all currently throttled entries: (rpc_type, table_id, qps_limit) |
143 | | struct ThrottleEntry { |
144 | | LoadRelatedRpc rpc_type; |
145 | | int64_t table_id; |
146 | | double qps_limit; |
147 | | }; |
148 | | std::vector<ThrottleEntry> get_all_throttled_entries() const; |
149 | | |
150 | | private: |
151 | | mutable std::shared_mutex _mutex; |
152 | | // (rpc_type, table_id) -> StrictQpsLimiter |
153 | | std::map<std::pair<LoadRelatedRpc, int64_t>, std::unique_ptr<StrictQpsLimiter>> _limiters; |
154 | | |
155 | | // bvar: current throttled table count per RPC type |
156 | | std::array<std::unique_ptr<bvar::Status<size_t>>, static_cast<size_t>(LoadRelatedRpc::COUNT)> |
157 | | _throttled_table_counts; |
158 | | }; |
159 | | |
160 | | // MS backpressure handler that coordinates QPS statistics, throttle upgrade and downgrade |
161 | | // Uses state machine for decisions, providing better testability |
162 | | class MSBackpressureHandler { |
163 | | public: |
164 | | MSBackpressureHandler(TableRpcQpsRegistry* qps_registry, TableRpcThrottler* throttler); |
165 | | ~MSBackpressureHandler(); |
166 | | |
167 | | // Called when receiving MS_BUSY response |
168 | | // Returns true if throttle upgrade was triggered |
169 | | bool on_ms_busy(); |
170 | | |
171 | | // Called before RPC execution, performs throttle wait |
172 | | // Returns the time point to wait until |
173 | | std::chrono::steady_clock::time_point before_rpc(LoadRelatedRpc rpc_type, int64_t table_id); |
174 | | |
175 | | // Called after RPC execution, records QPS statistics |
176 | | void after_rpc(LoadRelatedRpc rpc_type, int64_t table_id); |
177 | | |
178 | | // Runtime update parameters |
179 | | void update_throttle_params(RpcThrottleParams params); |
180 | | void update_coordinator_params(ThrottleCoordinatorParams params); |
181 | | |
182 | | // Get seconds since last MS_BUSY (for monitoring) |
183 | | int64_t seconds_since_last_ms_busy() const; |
184 | | |
185 | | // Query current state |
186 | | size_t upgrade_level() const; |
187 | | int ticks_since_last_ms_busy() const; |
188 | | int ticks_since_last_upgrade() const; |
189 | | |
190 | | private: |
191 | | // Background thread that periodically advances time |
192 | | void _tick_thread_callback(); |
193 | | |
194 | | // Advance time by specified ticks, handle any triggered events (e.g., downgrade) |
195 | | void _advance_time(int ticks); |
196 | | |
197 | | // Apply actions to the throttler |
198 | | void _apply_actions(const std::vector<RpcThrottleAction>& actions); |
199 | | |
200 | | // Build QPS snapshot from registry |
201 | | std::vector<RpcQpsSnapshot> _build_qps_snapshot() const; |
202 | | |
203 | | TableRpcQpsRegistry* _qps_registry; |
204 | | TableRpcThrottler* _throttler; |
205 | | mutable std::mutex _transition_mutex; |
206 | | |
207 | | // State machine components |
208 | | std::unique_ptr<RpcThrottleStateMachine> _state_machine; |
209 | | std::unique_ptr<RpcThrottleCoordinator> _coordinator; |
210 | | |
211 | | // Background thread for periodic tick |
212 | | std::shared_ptr<Thread> _tick_thread; |
213 | | CountDownLatch _stop_latch; |
214 | | |
215 | | // For bvar compatibility only - track approximate seconds since last MS_BUSY |
216 | | mutable std::mutex _mutex; |
217 | | std::chrono::steady_clock::time_point _last_ms_busy_time; |
218 | | }; |
219 | | |
220 | | // Global bvar metrics for backpressure handling |
221 | | extern bvar::Adder<uint64_t> g_backpressure_upgrade_count; |
222 | | extern bvar::Window<bvar::Adder<uint64_t>> g_backpressure_upgrade_60s; |
223 | | extern bvar::Adder<uint64_t> g_backpressure_downgrade_count; |
224 | | extern bvar::Window<bvar::Adder<uint64_t>> g_backpressure_downgrade_60s; |
225 | | extern bvar::Adder<uint64_t> g_ms_busy_count; |
226 | | extern bvar::Window<bvar::Adder<uint64_t>> g_ms_busy_60s; |
227 | | |
228 | | // Per-RPC-type throttle wait latency recorders |
229 | | bvar::LatencyRecorder* get_throttle_wait_recorder(LoadRelatedRpc rpc); |
230 | | |
231 | | } // namespace doris::cloud |