Coverage Report

Created: 2026-05-17 14:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_ms_rpc_rate_limiters.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_ms_rpc_rate_limiters.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
24
#include "cloud/config.h"
25
#include "util/cpu_info.h"
26
27
namespace doris::cloud {
28
29
// Display names for each RPC type (used in bvar metrics)
30
static constexpr std::string_view META_SERVICE_RPC_DISPLAY_NAMES[] = {
31
#define DEFINE_DISPLAY_NAME(enum_name, config_suffix, display_name) display_name,
32
        META_SERVICE_RPC_TYPES(DEFINE_DISPLAY_NAME)
33
#undef DEFINE_DISPLAY_NAME
34
};
35
36
3.10k
std::string_view meta_service_rpc_display_name(MetaServiceRPC rpc) {
37
3.10k
    size_t idx = static_cast<size_t>(rpc);
38
3.10k
    if (idx < static_cast<size_t>(MetaServiceRPC::COUNT)) {
39
3.10k
        return META_SERVICE_RPC_DISPLAY_NAMES[idx];
40
3.10k
    }
41
2
    return "unknown";
42
3.10k
}
43
44
// Get QPS config value for each RPC type
45
// Returns actual QPS (already multiplied by num_cores)
46
// Returns 0 if rate limiting should be disabled for this RPC
47
1.49k
static int get_rpc_qps_from_config(MetaServiceRPC rpc) {
48
1.49k
    int num_cores = CpuInfo::num_cores();
49
1.49k
    int qps_per_core = 0;
50
51
    // Get the per-RPC config value, -1 means use default
52
1.49k
#define GET_RPC_QPS_CONFIG(enum_name, config_suffix, display_name) \
53
1.49k
    case MetaServiceRPC::enum_name:                                \
54
1.49k
        qps_per_core = config::ms_rpc_qps_##config_suffix;         \
55
1.49k
        break;
56
57
1.49k
    switch (rpc) {
58
1.49k
        META_SERVICE_RPC_TYPES(GET_RPC_QPS_CONFIG)
59
0
    default:
60
0
        return 0;
61
1.49k
    }
62
1.49k
#undef GET_RPC_QPS_CONFIG
63
64
    // -1 means use default config
65
1.49k
    if (qps_per_core < 0) {
66
757
        qps_per_core = config::ms_rpc_qps_default;
67
757
    }
68
69
    // 0 means disabled
70
1.49k
    if (qps_per_core <= 0) {
71
65
        return 0;
72
65
    }
73
74
1.43k
    return std::max(1, qps_per_core * num_cores);
75
1.49k
}
76
77
1.56k
RpcRateLimiter::RpcRateLimiter(int qps, std::string_view op_name) {
78
1.56k
    latency_recorder = std::make_unique<bvar::LatencyRecorder>("host_level_ms_rpc_rate_limit_sleep",
79
1.56k
                                                               std::string(op_name));
80
1.56k
    limiter = std::make_unique<TokenBucketRateLimiterHolder>(
81
1.56k
            qps, qps, /*limit=*/0, [this](int64_t sleep_ns) {
82
523
                if (sleep_ns > 0) {
83
                    // Convert ns to us for LatencyRecorder
84
280
                    *latency_recorder << (sleep_ns / 1000);
85
280
                }
86
523
            });
87
1.56k
}
88
89
1
void RpcRateLimiter::reset(int qps) {
90
1
    limiter->reset(qps, qps, 0);
91
1
}
92
93
67
HostLevelMSRpcRateLimiters::HostLevelMSRpcRateLimiters() {
94
67
    init_from_config();
95
67
}
96
97
6
HostLevelMSRpcRateLimiters::HostLevelMSRpcRateLimiters(int uniform_qps) {
98
6
    init_with_uniform_qps(uniform_qps);
99
6
}
100
101
67
void HostLevelMSRpcRateLimiters::init_from_config() {
102
67
    LOG(INFO) << "Initializing MS RPC rate limiters from config";
103
104
    // Initialize rate limiters for all RPC types
105
1.54k
    for (size_t i = 0; i < static_cast<size_t>(MetaServiceRPC::COUNT); ++i) {
106
1.47k
        MetaServiceRPC rpc = static_cast<MetaServiceRPC>(i);
107
1.47k
        int qps = get_rpc_qps_from_config(rpc);
108
1.47k
        if (qps > 0) {
109
1.43k
            _limiters[i].store(
110
1.43k
                    std::make_shared<RpcRateLimiter>(qps, meta_service_rpc_display_name(rpc)));
111
1.43k
            LOG(INFO) << "  " << meta_service_rpc_display_name(rpc) << ": qps=" << qps;
112
1.43k
        } else {
113
44
            _limiters[i].store(nullptr);
114
44
            LOG(INFO) << "  " << meta_service_rpc_display_name(rpc) << ": disabled";
115
44
        }
116
1.47k
    }
117
67
}
118
119
6
void HostLevelMSRpcRateLimiters::init_with_uniform_qps(int qps) {
120
6
    qps = std::max(qps, 1);
121
6
    LOG(INFO) << "Initializing MS RPC rate limiters with uniform qps=" << qps;
122
123
    // Initialize rate limiters for all RPC types with the same QPS
124
138
    for (size_t i = 0; i < static_cast<size_t>(MetaServiceRPC::COUNT); ++i) {
125
132
        MetaServiceRPC rpc = static_cast<MetaServiceRPC>(i);
126
132
        _limiters[i].store(
127
132
                std::make_shared<RpcRateLimiter>(qps, meta_service_rpc_display_name(rpc)));
128
132
    }
129
6
}
130
131
723
int64_t HostLevelMSRpcRateLimiters::limit(MetaServiceRPC rpc) {
132
723
    if (!config::enable_ms_rpc_host_level_rate_limit) {
133
200
        return 0;
134
200
    }
135
136
523
    size_t idx = static_cast<size_t>(rpc);
137
523
    if (idx >= static_cast<size_t>(MetaServiceRPC::COUNT)) {
138
0
        return 0;
139
0
    }
140
141
523
    auto limiter = _limiters[idx].load();
142
523
    if (limiter && limiter->limiter) {
143
523
        return limiter->limiter->add(1);
144
523
    }
145
0
    return 0;
146
523
}
147
148
1
void HostLevelMSRpcRateLimiters::reset(MetaServiceRPC rpc, int qps) {
149
1
    size_t idx = static_cast<size_t>(rpc);
150
1
    if (idx >= static_cast<size_t>(MetaServiceRPC::COUNT)) {
151
0
        return;
152
0
    }
153
154
1
    qps = std::max(qps, 1);
155
1
    LOG(INFO) << "Resetting MS RPC rate limiter for " << meta_service_rpc_display_name(rpc)
156
1
              << " with qps=" << qps;
157
158
1
    auto limiter = _limiters[idx].load();
159
1
    if (limiter) {
160
1
        limiter->reset(qps);
161
1
    } else {
162
0
        _limiters[idx].store(
163
0
                std::make_shared<RpcRateLimiter>(qps, meta_service_rpc_display_name(rpc)));
164
0
    }
165
1
}
166
167
1
void HostLevelMSRpcRateLimiters::reset_all() {
168
1
    LOG(INFO) << "Resetting all MS RPC rate limiters from config";
169
170
23
    for (size_t i = 0; i < static_cast<size_t>(MetaServiceRPC::COUNT); ++i) {
171
22
        MetaServiceRPC rpc = static_cast<MetaServiceRPC>(i);
172
22
        int qps = get_rpc_qps_from_config(rpc);
173
22
        if (qps > 0) {
174
1
            auto limiter = _limiters[i].load();
175
1
            if (limiter) {
176
0
                limiter->reset(qps);
177
1
            } else {
178
1
                _limiters[i].store(
179
1
                        std::make_shared<RpcRateLimiter>(qps, meta_service_rpc_display_name(rpc)));
180
1
            }
181
1
            LOG(INFO) << "  " << meta_service_rpc_display_name(rpc) << ": qps=" << qps;
182
21
        } else {
183
21
            _limiters[i].store(nullptr);
184
            LOG(INFO) << "  " << meta_service_rpc_display_name(rpc) << ": disabled";
185
21
        }
186
22
    }
187
1
}
188
189
} // namespace doris::cloud