Coverage Report

Created: 2026-05-09 10:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_throttle_state_machine.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_throttle_state_machine.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
24
namespace doris::cloud {
25
26
// Display names for LoadRelatedRpc types
27
static constexpr std::string_view LOAD_RELATED_RPC_NAMES[] = {
28
        "prepare_rowset", "commit_rowset", "update_tmp_rowset", "update_packed_file_info",
29
        "update_delete_bitmap"};
30
31
168
std::string_view load_related_rpc_name(LoadRelatedRpc rpc) {
32
168
    size_t idx = static_cast<size_t>(rpc);
33
168
    if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) {
34
167
        return LOAD_RELATED_RPC_NAMES[idx];
35
167
    }
36
1
    return "unknown";
37
168
}
38
39
// ============== RpcThrottleStateMachine ==============
40
41
31
RpcThrottleStateMachine::RpcThrottleStateMachine(RpcThrottleParams params) : _params(params) {
42
31
    LOG(INFO) << "[ms-throttle] state machine initialized: top_k=" << params.top_k
43
31
              << ", ratio=" << params.ratio << ", floor_qps=" << params.floor_qps;
44
31
}
45
46
5
void RpcThrottleStateMachine::update_params(RpcThrottleParams params) {
47
5
    std::lock_guard lock(_mtx);
48
5
    _params = params;
49
5
    LOG(INFO) << "[ms-throttle] state machine params updated: top_k=" << params.top_k
50
5
              << ", ratio=" << params.ratio << ", floor_qps=" << params.floor_qps;
51
5
}
52
53
std::vector<RpcThrottleAction> RpcThrottleStateMachine::on_upgrade(
54
42
        const std::vector<RpcQpsSnapshot>& qps_snapshot) {
55
42
    std::lock_guard lock(_mtx);
56
57
42
    UpgradeRecord record;
58
42
    std::vector<RpcThrottleAction> actions;
59
60
42
    double ratio = _params.ratio;
61
42
    double floor_qps = _params.floor_qps;
62
63
    // Caller is responsible for providing top-k snapshot per RPC type.
64
    // State machine simply applies throttling to every entry in the snapshot.
65
53
    for (const auto& snapshot : qps_snapshot) {
66
53
        auto key = std::make_pair(snapshot.rpc_type, snapshot.table_id);
67
68
53
        double old_limit = 0.0;
69
53
        auto limit_it = _current_limits.find(key);
70
53
        if (limit_it != _current_limits.end()) {
71
17
            old_limit = limit_it->second;
72
17
        }
73
74
53
        double new_limit;
75
53
        if (old_limit > 0) {
76
            // Already has a limit, reduce it further
77
17
            new_limit = old_limit * ratio;
78
36
        } else {
79
            // No limit yet, set based on current QPS
80
36
            new_limit = snapshot.current_qps * ratio;
81
36
        }
82
83
        // Apply floor
84
53
        new_limit = std::max(new_limit, floor_qps);
85
86
        // Only apply if it's actually limiting
87
53
        if (new_limit < snapshot.current_qps || old_limit > 0) {
88
51
            RpcThrottleAction action {
89
51
                    .type = RpcThrottleAction::Type::SET_LIMIT,
90
51
                    .rpc_type = snapshot.rpc_type,
91
51
                    .table_id = snapshot.table_id,
92
51
                    .qps_limit = new_limit,
93
51
            };
94
51
            actions.push_back(action);
95
51
            record.changes[key] = {old_limit, new_limit};
96
51
            _current_limits[key] = new_limit;
97
98
51
            LOG(INFO) << "[ms-throttle] upgrade: rpc=" << load_related_rpc_name(snapshot.rpc_type)
99
51
                      << ", table_id=" << snapshot.table_id
100
51
                      << ", current_qps=" << snapshot.current_qps << ", old_limit=" << old_limit
101
51
                      << ", new_limit=" << new_limit;
102
51
        }
103
53
    }
104
105
42
    if (!record.changes.empty()) {
106
35
        _upgrade_history.push_back(std::move(record));
107
35
    }
108
109
42
    LOG(INFO) << "[ms-throttle] on_upgrade done: actions=" << actions.size()
110
42
              << ", upgrade_level=" << _upgrade_history.size()
111
42
              << ", snapshot_size=" << qps_snapshot.size();
112
113
42
    return actions;
114
42
}
115
116
17
std::vector<RpcThrottleAction> RpcThrottleStateMachine::on_downgrade() {
117
17
    std::lock_guard lock(_mtx);
118
119
17
    std::vector<RpcThrottleAction> actions;
120
121
17
    if (_upgrade_history.empty()) {
122
1
        LOG(INFO) << "[ms-throttle] on_downgrade skipped: no upgrade history";
123
1
        return actions;
124
1
    }
125
126
    // Undo the most recent upgrade
127
16
    const auto& record = _upgrade_history.back();
128
129
24
    for (const auto& [key, limits] : record.changes) {
130
24
        const auto& [rpc_type, table_id] = key;
131
24
        double old_limit = limits.first;
132
133
24
        if (old_limit > 0) {
134
            // Restore the previous limit
135
9
            RpcThrottleAction action {
136
9
                    .type = RpcThrottleAction::Type::SET_LIMIT,
137
9
                    .rpc_type = rpc_type,
138
9
                    .table_id = table_id,
139
9
                    .qps_limit = old_limit,
140
9
            };
141
142
9
            actions.push_back(action);
143
9
            _current_limits[key] = old_limit;
144
145
9
            LOG(INFO) << "[ms-throttle] downgrade: rpc=" << load_related_rpc_name(rpc_type)
146
9
                      << ", table_id=" << table_id << ", restored_limit=" << old_limit;
147
15
        } else {
148
            // No previous limit, remove it entirely
149
15
            RpcThrottleAction action {
150
15
                    .type = RpcThrottleAction::Type::REMOVE_LIMIT,
151
15
                    .rpc_type = rpc_type,
152
15
                    .table_id = table_id,
153
15
            };
154
155
15
            actions.push_back(action);
156
15
            _current_limits.erase(key);
157
158
15
            LOG(INFO) << "[ms-throttle] downgrade: rpc=" << load_related_rpc_name(rpc_type)
159
15
                      << ", table_id=" << table_id << ", removed limit";
160
15
        }
161
24
    }
162
163
16
    _upgrade_history.pop_back();
164
165
16
    LOG(INFO) << "[ms-throttle] on_downgrade done: actions=" << actions.size()
166
16
              << ", upgrade_level=" << _upgrade_history.size();
167
168
16
    return actions;
169
17
}
170
171
30
size_t RpcThrottleStateMachine::upgrade_level() const {
172
30
    std::lock_guard lock(_mtx);
173
30
    return _upgrade_history.size();
174
30
}
175
176
25
double RpcThrottleStateMachine::get_current_limit(LoadRelatedRpc rpc_type, int64_t table_id) const {
177
25
    std::lock_guard lock(_mtx);
178
25
    auto it = _current_limits.find({rpc_type, table_id});
179
25
    if (it != _current_limits.end()) {
180
16
        return it->second;
181
16
    }
182
9
    return 0.0;
183
25
}
184
185
8
RpcThrottleParams RpcThrottleStateMachine::get_params() const {
186
8
    std::lock_guard lock(_mtx);
187
8
    return _params;
188
8
}
189
190
// ============== RpcThrottleCoordinator ==============
191
192
22
RpcThrottleCoordinator::RpcThrottleCoordinator(ThrottleCoordinatorParams params) : _params(params) {
193
22
    LOG(INFO) << "[ms-throttle] coordinator initialized: upgrade_cooldown_ticks="
194
22
              << params.upgrade_cooldown_ticks
195
22
              << ", downgrade_after_ticks=" << params.downgrade_after_ticks;
196
22
}
197
198
5
void RpcThrottleCoordinator::update_params(ThrottleCoordinatorParams params) {
199
5
    std::lock_guard lock(_mtx);
200
5
    _params = params;
201
5
    LOG(INFO) << "[ms-throttle] coordinator params updated: upgrade_cooldown_ticks="
202
5
              << params.upgrade_cooldown_ticks
203
5
              << ", downgrade_after_ticks=" << params.downgrade_after_ticks;
204
5
}
205
206
93
bool RpcThrottleCoordinator::report_ms_busy() {
207
93
    std::lock_guard lock(_mtx);
208
209
    // Reset tick counter since last MS_BUSY
210
93
    _ticks_since_last_ms_busy = 0;
211
212
    // Check if cooldown has passed
213
93
    if (_ticks_since_last_upgrade == -1 ||
214
93
        _ticks_since_last_upgrade >= _params.upgrade_cooldown_ticks) {
215
        // Reset upgrade counter
216
26
        auto actual_ticks = _ticks_since_last_upgrade;
217
26
        _ticks_since_last_upgrade = 0;
218
26
        _has_pending_upgrades = true;
219
220
26
        LOG(INFO) << "[ms-throttle] upgrade triggered: ticks_since_last_upgrade=" << actual_ticks
221
26
                  << ", cooldown=" << _params.upgrade_cooldown_ticks;
222
26
        return true; // Should trigger upgrade
223
26
    }
224
67
    return false; // Cooling down
225
93
}
226
227
424
bool RpcThrottleCoordinator::tick(int ticks) {
228
424
    std::lock_guard lock(_mtx);
229
230
    // Increment tick counters
231
424
    if (_ticks_since_last_ms_busy >= 0) {
232
423
        _ticks_since_last_ms_busy += ticks;
233
423
    }
234
424
    if (_ticks_since_last_upgrade >= 0) {
235
423
        _ticks_since_last_upgrade += ticks;
236
423
    }
237
238
    // Check if downgrade should be triggered
239
424
    if (_has_pending_upgrades && _ticks_since_last_ms_busy >= _params.downgrade_after_ticks) {
240
        // Reset for next downgrade cycle
241
12
        auto actual_ticks = _ticks_since_last_ms_busy;
242
12
        _ticks_since_last_ms_busy = 0;
243
244
12
        LOG(INFO) << "[ms-throttle] downgrade triggered: ticks_since_last_ms_busy=" << actual_ticks
245
12
                  << ", threshold=" << _params.downgrade_after_ticks;
246
12
        return true; // Should trigger downgrade
247
12
    }
248
249
412
    return false;
250
424
}
251
252
24
void RpcThrottleCoordinator::set_has_pending_upgrades(bool has) {
253
24
    std::lock_guard lock(_mtx);
254
24
    _has_pending_upgrades = has;
255
24
}
256
257
4
int RpcThrottleCoordinator::ticks_since_last_ms_busy() const {
258
4
    std::lock_guard lock(_mtx);
259
4
    return _ticks_since_last_ms_busy;
260
4
}
261
262
5
int RpcThrottleCoordinator::ticks_since_last_upgrade() const {
263
5
    std::lock_guard lock(_mtx);
264
5
    return _ticks_since_last_upgrade;
265
5
}
266
267
2
ThrottleCoordinatorParams RpcThrottleCoordinator::get_params() const {
268
2
    std::lock_guard lock(_mtx);
269
2
    return _params;
270
2
}
271
272
} // namespace doris::cloud