Coverage Report

Created: 2026-05-09 10:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_throttle_state_machine.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstdint>
21
#include <map>
22
#include <mutex>
23
#include <string_view>
24
#include <utility>
25
#include <vector>
26
27
namespace doris::cloud {
28
29
// ============== Common Types ==============
30
31
// Load-related RPC types that need table-level QPS statistics
32
enum class LoadRelatedRpc : size_t {
33
    PREPARE_ROWSET,
34
    COMMIT_ROWSET,
35
    UPDATE_TMP_ROWSET,
36
    UPDATE_PACKED_FILE_INFO,
37
    UPDATE_DELETE_BITMAP,
38
    COUNT
39
};
40
41
// Get the name string for a LoadRelatedRpc type
42
std::string_view load_related_rpc_name(LoadRelatedRpc rpc);
43
44
// ============== Data Structures ==============
45
46
// QPS snapshot: the current QPS of a table on a specific RPC type
47
struct RpcQpsSnapshot {
48
    LoadRelatedRpc rpc_type;
49
    int64_t table_id;
50
    double current_qps;
51
};
52
53
// Throttle action: describes what action should be taken
54
struct RpcThrottleAction {
55
    enum class Type { SET_LIMIT, REMOVE_LIMIT };
56
57
    Type type;
58
    LoadRelatedRpc rpc_type;
59
    int64_t table_id;
60
    double qps_limit {0}; // only meaningful for SET_LIMIT
61
};
62
63
// ============== ThrottleStateMachine ==============
64
65
// Parameters for throttle state machine
66
struct RpcThrottleParams {
67
    int top_k = 3;          // Number of top tables to throttle on each upgrade
68
    double ratio = 0.5;     // Decay ratio for throttle upgrade
69
    double floor_qps = 1.0; // Floor value for table-level QPS limit
70
71
0
    bool operator==(const RpcThrottleParams& other) const {
72
0
        return top_k == other.top_k && ratio == other.ratio && floor_qps == other.floor_qps;
73
0
    }
74
};
75
76
// Pure state machine for throttle upgrade/downgrade decisions
77
// - No time awareness: caller drives events via on_upgrade/on_downgrade
78
// - No config dependency: all parameters passed via constructor/update_params
79
// - No side effects: only returns action descriptions, doesn't touch throttler
80
// - Deterministically testable: same event sequence -> same output
81
class RpcThrottleStateMachine {
82
public:
83
    explicit RpcThrottleStateMachine(RpcThrottleParams params);
84
85
    // Runtime update parameters, takes effect on next on_upgrade
86
    // Note: existing upgrade history is NOT recalculated
87
    void update_params(RpcThrottleParams params);
88
89
    // Process a throttle upgrade event
90
    // qps_snapshot: current QPS snapshot for each (rpc, table), provided by caller
91
    // Returns: list of actions to execute
92
    std::vector<RpcThrottleAction> on_upgrade(const std::vector<RpcQpsSnapshot>& qps_snapshot);
93
94
    // Process a throttle downgrade event (undo the most recent upgrade)
95
    // Returns: list of actions to execute
96
    std::vector<RpcThrottleAction> on_downgrade();
97
98
    // Query current state
99
    size_t upgrade_level() const; // Current upgrade level
100
    double get_current_limit(LoadRelatedRpc rpc_type, int64_t table_id) const; // 0 = no limit
101
    RpcThrottleParams get_params() const;
102
103
private:
104
    mutable std::mutex _mtx;
105
106
    RpcThrottleParams _params;
107
108
    // Upgrade history for downgrade rollback
109
    // changes: (rpc_type, table_id) -> (old_limit, new_limit)
110
    struct UpgradeRecord {
111
        std::map<std::pair<LoadRelatedRpc, int64_t>, std::pair<double, double>> changes;
112
    };
113
    std::vector<UpgradeRecord> _upgrade_history;
114
115
    // Current active limits for all (rpc, table)
116
    std::map<std::pair<LoadRelatedRpc, int64_t>, double> _current_limits;
117
};
118
119
// ============== ThrottleCoordinator ==============
120
121
// Coordinator parameters
122
struct ThrottleCoordinatorParams {
123
    // Minimum ticks between upgrades
124
    int upgrade_cooldown_ticks = 10;
125
    // Ticks after last MS_BUSY to trigger downgrade
126
    int downgrade_after_ticks = 60;
127
128
0
    bool operator==(const ThrottleCoordinatorParams& other) const {
129
0
        return upgrade_cooldown_ticks == other.upgrade_cooldown_ticks &&
130
0
               downgrade_after_ticks == other.downgrade_after_ticks;
131
0
    }
132
};
133
134
// Pure timing control for upgrade/downgrade triggers
135
// - No time awareness: based on tick count, driven by caller
136
// - No config dependency: all parameters passed via constructor/update_params
137
//
138
// Tick semantics:
139
// - 1 tick = 1 millisecond (fixed unit)
140
// - upgrade_cooldown_ticks and downgrade_after_ticks are in milliseconds
141
// - The tick thread advances time by 1000 ticks (1 second) each iteration
142
class RpcThrottleCoordinator {
143
public:
144
    explicit RpcThrottleCoordinator(ThrottleCoordinatorParams params);
145
146
    // Runtime update parameters, takes effect on subsequent report_ms_busy/tick calls
147
    // Note: existing tick counts are NOT reset
148
    void update_params(ThrottleCoordinatorParams params);
149
150
    // Report a MS_BUSY event
151
    // Returns true if upgrade should be triggered
152
    bool report_ms_busy();
153
154
    // Advance by specified number of ticks (caller decides actual time between ticks)
155
    // Returns true if downgrade should be triggered
156
    bool tick(int ticks = 1);
157
158
    // Tell coordinator whether there are pending upgrades that can be downgraded
159
    // Called by the state machine consumer after upgrade/downgrade
160
    void set_has_pending_upgrades(bool has);
161
162
    // Query state
163
    int ticks_since_last_ms_busy() const;
164
    int ticks_since_last_upgrade() const;
165
    ThrottleCoordinatorParams get_params() const;
166
167
private:
168
    mutable std::mutex _mtx;
169
170
    ThrottleCoordinatorParams _params;
171
    int _ticks_since_last_ms_busy = -1; // -1 means never received
172
    int _ticks_since_last_upgrade = -1; // -1 means never upgraded
173
    bool _has_pending_upgrades = false; // Whether there are upgrade records to downgrade
174
};
175
176
} // namespace doris::cloud