Coverage Report

Created: 2026-05-11 13:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_ms_backpressure_handler.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bvar/bvar.h>
21
22
#include <array>
23
#include <chrono>
24
#include <map>
25
#include <memory>
26
#include <mutex>
27
#include <shared_mutex>
28
#include <string>
29
#include <string_view>
30
#include <unordered_map>
31
#include <vector>
32
33
#include "cloud/cloud_throttle_state_machine.h"
34
#include "util/countdown_latch.h"
35
#include "util/thread.h"
36
37
namespace doris::cloud {
38
39
// Strict QPS limiter that doesn't allow burst
40
// Unlike token bucket, it strictly enforces fixed intervals between requests
41
class StrictQpsLimiter {
42
public:
43
    using Clock = std::chrono::steady_clock;
44
45
    explicit StrictQpsLimiter(double qps);
46
47
    // Returns the time point when the request is allowed to execute
48
    // Caller should sleep until this time point
49
    Clock::time_point reserve();
50
51
    // Dynamically update the QPS limit
52
    void update_qps(double new_qps);
53
54
    // Get current QPS limit
55
    double get_qps() const;
56
57
private:
58
    mutable std::mutex _mtx;
59
    int64_t _interval_ns;
60
    Clock::time_point _next_allowed_time;
61
};
62
63
// QPS counter for a single (table, RPC type) pair using bvar
64
class TableRpcQpsCounter {
65
public:
66
    TableRpcQpsCounter(int64_t table_id, LoadRelatedRpc rpc_type, int window_sec);
67
18
    ~TableRpcQpsCounter() = default;
68
69
    // Record one RPC call
70
    void increment();
71
72
    // Get current QPS (average over the configured time window)
73
    double get_qps() const;
74
75
0
    int64_t table_id() const { return _table_id; }
76
0
    LoadRelatedRpc rpc_type() const { return _rpc_type; }
77
78
private:
79
    int64_t _table_id;
80
    LoadRelatedRpc _rpc_type;
81
82
    std::unique_ptr<bvar::Adder<int64_t>> _counter;
83
    std::unique_ptr<bvar::PerSecond<bvar::Adder<int64_t>>> _qps;
84
};
85
86
// Registry managing QPS counters for all tables
87
class TableRpcQpsRegistry {
88
public:
89
    TableRpcQpsRegistry();
90
18
    ~TableRpcQpsRegistry() = default;
91
92
    // Record one RPC call for the given table
93
    void record(LoadRelatedRpc rpc_type, int64_t table_id);
94
95
    // Get the top-k tables with highest QPS for the given RPC type
96
    // Returns: [(table_id, qps), ...] sorted by qps in descending order
97
    std::vector<std::pair<int64_t, double>> get_top_k_tables(LoadRelatedRpc rpc_type, int k) const;
98
99
    // Get QPS for a specific table on a specific RPC type
100
    double get_qps(LoadRelatedRpc rpc_type, int64_t table_id) const;
101
102
    // Clean up counters for tables that have been inactive for a long time
103
    void cleanup_inactive_tables();
104
105
private:
106
    // Get or create counter for (rpc_type, table_id)
107
    TableRpcQpsCounter* get_or_create_counter(LoadRelatedRpc rpc_type, int64_t table_id);
108
109
    mutable std::shared_mutex _mutex;
110
111
    // rpc_type -> (table_id -> counter)
112
    std::array<std::unordered_map<int64_t, std::unique_ptr<TableRpcQpsCounter>>,
113
               static_cast<size_t>(LoadRelatedRpc::COUNT)>
114
            _counters;
115
};
116
117
// Table-level throttler managing StrictQpsLimiter for each (RPC type, table) pair
118
class TableRpcThrottler {
119
public:
120
    TableRpcThrottler();
121
14
    ~TableRpcThrottler() = default;
122
123
    // Called before RPC execution, returns the time point when execution is allowed
124
    // Returns now if no limit is set
125
    std::chrono::steady_clock::time_point throttle(LoadRelatedRpc rpc_type, int64_t table_id);
126
127
    // Set or update the QPS limit for a table
128
    void set_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id, double qps_limit);
129
130
    // Remove the QPS limit for a table
131
    void remove_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id);
132
133
    // Get current QPS limit (returns 0 if not set)
134
    double get_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) const;
135
136
    // Check if a limit exists for the given (rpc_type, table_id)
137
    bool has_limit(LoadRelatedRpc rpc_type, int64_t table_id) const;
138
139
    // Get the number of throttled tables for a given RPC type
140
    size_t get_throttled_table_count(LoadRelatedRpc rpc_type) const;
141
142
    // Get all currently throttled entries: (rpc_type, table_id, qps_limit)
143
    struct ThrottleEntry {
144
        LoadRelatedRpc rpc_type;
145
        int64_t table_id;
146
        double qps_limit;
147
    };
148
    std::vector<ThrottleEntry> get_all_throttled_entries() const;
149
150
private:
151
    mutable std::shared_mutex _mutex;
152
    // (rpc_type, table_id) -> StrictQpsLimiter
153
    std::map<std::pair<LoadRelatedRpc, int64_t>, std::unique_ptr<StrictQpsLimiter>> _limiters;
154
155
    // bvar: current throttled table count per RPC type
156
    std::array<std::unique_ptr<bvar::Status<size_t>>, static_cast<size_t>(LoadRelatedRpc::COUNT)>
157
            _throttled_table_counts;
158
};
159
160
// MS backpressure handler that coordinates QPS statistics, throttle upgrade and downgrade
161
// Uses state machine for decisions, providing better testability
162
class MSBackpressureHandler {
163
public:
164
    MSBackpressureHandler(TableRpcQpsRegistry* qps_registry, TableRpcThrottler* throttler);
165
    ~MSBackpressureHandler();
166
167
    // Called when receiving MS_BUSY response
168
    // Returns true if throttle upgrade was triggered
169
    bool on_ms_busy();
170
171
    // Called before RPC execution, performs throttle wait
172
    // Returns the time point to wait until
173
    std::chrono::steady_clock::time_point before_rpc(LoadRelatedRpc rpc_type, int64_t table_id);
174
175
    // Called after RPC execution, records QPS statistics
176
    void after_rpc(LoadRelatedRpc rpc_type, int64_t table_id);
177
178
    // Runtime update parameters
179
    void update_throttle_params(RpcThrottleParams params);
180
    void update_coordinator_params(ThrottleCoordinatorParams params);
181
182
    // Get seconds since last MS_BUSY (for monitoring)
183
    int64_t seconds_since_last_ms_busy() const;
184
185
    // Query current state
186
    size_t upgrade_level() const;
187
    int ticks_since_last_ms_busy() const;
188
    int ticks_since_last_upgrade() const;
189
190
private:
191
    // Background thread that periodically advances time
192
    void _tick_thread_callback();
193
194
    // Advance time by specified ticks, handle any triggered events (e.g., downgrade)
195
    void _advance_time(int ticks);
196
197
    // Apply actions to the throttler
198
    void _apply_actions(const std::vector<RpcThrottleAction>& actions);
199
200
    // Build QPS snapshot from registry
201
    std::vector<RpcQpsSnapshot> _build_qps_snapshot() const;
202
203
    TableRpcQpsRegistry* _qps_registry;
204
    TableRpcThrottler* _throttler;
205
    mutable std::mutex _transition_mutex;
206
207
    // State machine components
208
    std::unique_ptr<RpcThrottleStateMachine> _state_machine;
209
    std::unique_ptr<RpcThrottleCoordinator> _coordinator;
210
211
    // Background thread for periodic tick
212
    std::shared_ptr<Thread> _tick_thread;
213
    CountDownLatch _stop_latch;
214
215
    // For bvar compatibility only - track approximate seconds since last MS_BUSY
216
    mutable std::mutex _mutex;
217
    std::chrono::steady_clock::time_point _last_ms_busy_time;
218
};
219
220
// Global bvar metrics for backpressure handling
221
extern bvar::Adder<uint64_t> g_backpressure_upgrade_count;
222
extern bvar::Window<bvar::Adder<uint64_t>> g_backpressure_upgrade_60s;
223
extern bvar::Adder<uint64_t> g_backpressure_downgrade_count;
224
extern bvar::Window<bvar::Adder<uint64_t>> g_backpressure_downgrade_60s;
225
extern bvar::Adder<uint64_t> g_ms_busy_count;
226
extern bvar::Window<bvar::Adder<uint64_t>> g_ms_busy_60s;
227
228
// Per-RPC-type throttle wait latency recorders
229
bvar::LatencyRecorder* get_throttle_wait_recorder(LoadRelatedRpc rpc);
230
231
} // namespace doris::cloud