MetaServiceRpcLimiterManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.rpc;
import org.apache.doris.cloud.rpc.RpcRateLimiter.BackpressureQpsLimiter;
import org.apache.doris.cloud.rpc.RpcRateLimiter.CostLimiter;
import org.apache.doris.cloud.rpc.RpcRateLimiter.QpsLimiter;
import org.apache.doris.common.Config;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.metric.CloudMetrics;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class MetaServiceRpcLimiterManager {
private static final Logger LOG = LogManager.getLogger(MetaServiceRpcLimiterManager.class);
private static final String GET_VERSION_METHOD = "getVersion";
public static final String GET_TABLE_VERSION_METHOD = "getTableVersion";
public static final String GET_PARTITION_VERSION_METHOD = "getPartitionVersion";
private final int processorCount;
private static volatile MetaServiceRpcLimiterManager instance;
private volatile boolean lastEnabled = false;
private volatile int lastMaxWaitRequestNum = 0;
private volatile int lastDefaultQps = 0;
private volatile String lastQpsConfig = "";
private volatile String lastCostConfig = "";
private volatile boolean lastOverloadThrottleEnabled = false;
private volatile String lastOverloadThrottleMethodAllowlist = "";
private Map<String, Integer> methodQpsConfig = new ConcurrentHashMap<>();
private Map<String, Integer> methodCostConfig = new ConcurrentHashMap<>();
private Set<String> overloadThrottleMethodAllowlist = ConcurrentHashMap.newKeySet();
private final Map<String, QpsLimiter> qpsLimiters = new ConcurrentHashMap<>();
private final Map<String, CostLimiter> costLimiters = new ConcurrentHashMap<>();
private final Map<String, BackpressureQpsLimiter> backpressureQpsLimiters = new ConcurrentHashMap<>();
public static MetaServiceRpcLimiterManager getInstance() {
if (instance == null) {
synchronized (MetaServiceRpcLimiterManager.class) {
if (instance == null) {
instance = new MetaServiceRpcLimiterManager(Runtime.getRuntime().availableProcessors());
}
}
}
return instance;
}
MetaServiceRpcLimiterManager(int processorCount) {
this.processorCount = processorCount;
reloadConfig();
MetaServiceOverloadThrottle.getInstance().setFactorChangeListener(this::setOverloadFactor);
}
@VisibleForTesting
boolean isConfigChanged() {
return Config.meta_service_rpc_rate_limit_enabled != lastEnabled
|| Config.meta_service_rpc_rate_limit_default_qps_per_core != lastDefaultQps
|| Config.meta_service_rpc_rate_limit_max_waiting_request_num != lastMaxWaitRequestNum
|| !Objects.equals(Config.meta_service_rpc_rate_limit_qps_per_core_config, lastQpsConfig)
|| !Objects.equals(Config.meta_service_rpc_cost_limit_per_core_config, lastCostConfig)
|| Config.meta_service_rpc_overload_throttle_enabled != lastOverloadThrottleEnabled
|| !Objects.equals(Config.meta_service_rpc_overload_throttle_method_allowlist,
lastOverloadThrottleMethodAllowlist);
}
@VisibleForTesting
protected boolean reloadConfig() {
if (!isConfigChanged()) {
return false;
}
synchronized (this) {
if (!isConfigChanged()) {
return false;
}
reloadRateLimiterConfig();
reloadOverloadThrottleConfig();
}
return true;
}
private void reloadRateLimiterConfig() {
boolean enabled = Config.meta_service_rpc_rate_limit_enabled;
int maxWaitRequestNum = Config.meta_service_rpc_rate_limit_max_waiting_request_num;
int defaultQpsPerCore = Config.meta_service_rpc_rate_limit_default_qps_per_core;
String qpsConfig = Config.meta_service_rpc_rate_limit_qps_per_core_config;
String costConfig = Config.meta_service_rpc_cost_limit_per_core_config;
// Parse the qps and cost config
methodQpsConfig = parseConfig(qpsConfig, "QPS");
methodCostConfig = parseConfig(costConfig, "cost limit");
updateQpsLimiters(defaultQpsPerCore, maxWaitRequestNum);
// If disabled, clear all limiters
if (!enabled) {
methodCostConfig.clear();
qpsLimiters.clear();
costLimiters.clear();
} else {
updateCostLimiters();
}
// Update last config
lastEnabled = enabled;
lastMaxWaitRequestNum = maxWaitRequestNum;
lastDefaultQps = defaultQpsPerCore;
lastQpsConfig = qpsConfig;
lastCostConfig = costConfig;
LOG.info("Reload meta service rpc rate limit config. enabled: {}, maxWaitRequestNum: {}, "
+ "defaultQps: {}, qpsConfig: [{}], costConfig: [{}]", lastEnabled, lastMaxWaitRequestNum,
lastDefaultQps, lastQpsConfig, lastCostConfig);
}
private void reloadOverloadThrottleConfig() {
boolean overloadThrottleEnabled = Config.meta_service_rpc_overload_throttle_enabled;
String overloadThrottleMethodAllowlist = Config.meta_service_rpc_overload_throttle_method_allowlist;
if (!overloadThrottleEnabled) {
this.overloadThrottleMethodAllowlist.clear();
this.backpressureQpsLimiters.clear();
} else {
Set<String> newOverloadThrottleMethods = new HashSet<>();
if (overloadThrottleMethodAllowlist != null && !overloadThrottleMethodAllowlist.isEmpty()) {
for (String method : overloadThrottleMethodAllowlist.split(",")) {
String trimmed = method.trim();
if (!trimmed.isEmpty()) {
if (trimmed.equalsIgnoreCase(GET_VERSION_METHOD)) {
newOverloadThrottleMethods.add(GET_TABLE_VERSION_METHOD);
newOverloadThrottleMethods.add(GET_PARTITION_VERSION_METHOD);
} else {
newOverloadThrottleMethods.add(trimmed);
}
}
}
}
Set<String> toRemove = new HashSet<>();
for (String method : this.overloadThrottleMethodAllowlist) {
if (!newOverloadThrottleMethods.contains(method)) {
toRemove.add(method);
}
}
this.overloadThrottleMethodAllowlist.removeAll(toRemove);
this.overloadThrottleMethodAllowlist.addAll(newOverloadThrottleMethods);
this.backpressureQpsLimiters.keySet()
.removeIf(method -> !this.overloadThrottleMethodAllowlist.contains(method));
}
lastOverloadThrottleEnabled = overloadThrottleEnabled;
lastOverloadThrottleMethodAllowlist = overloadThrottleMethodAllowlist;
}
private void updateQpsLimiters(int defaultQpsPerCore, int maxWaitRequestNum) {
List<String> toRemove = new ArrayList<>();
for (Entry<String, QpsLimiter> entry : qpsLimiters.entrySet()) {
String methodName = entry.getKey();
int qps = getMethodTotalQps(methodName, defaultQpsPerCore);
if (qps <= 0) {
toRemove.add(methodName);
continue;
}
QpsLimiter limiter = entry.getValue();
limiter.update(maxWaitRequestNum, qps);
LOG.info("Updated rate limiter for method: {}, maxWaitRequestNum: {}, qps: {}", methodName,
maxWaitRequestNum, qps);
}
if (!toRemove.isEmpty()) {
LOG.info("Remove zero qps rate limiter for methods: {}", toRemove);
for (String methodName : toRemove) {
qpsLimiters.remove(methodName);
}
}
}
private void updateCostLimiters() {
List<String> toRemove = new ArrayList<>();
for (Entry<String, CostLimiter> entry : costLimiters.entrySet()) {
String methodName = entry.getKey();
int costLimit = getMethodTotalCostLimit(methodName);
if (costLimit <= 0) {
toRemove.add(methodName);
continue;
}
CostLimiter limiter = entry.getValue();
limiter.setLimit(costLimit);
LOG.info("Updated cost limiter for method: {}, cost: {}", methodName, costLimit);
}
if (!toRemove.isEmpty()) {
LOG.info("Remove cost limiter for methods: {}", toRemove);
for (String methodName : toRemove) {
costLimiters.remove(methodName);
}
}
}
private Map<String, Integer> parseConfig(String config, String configName) {
if (config == null || config.isEmpty()) {
return new HashMap<>(0);
}
Map<String, Integer> target = new HashMap<>();
String[] entries = config.split(";");
for (String entry : entries) {
if (entry.trim().isEmpty()) {
continue;
}
String[] parts = entry.trim().split(":");
if (parts.length == 2) {
try {
String methodName = parts[0].trim();
int limit = Integer.parseInt(parts[1].trim());
if (methodName.equalsIgnoreCase(GET_VERSION_METHOD)) {
target.put(GET_TABLE_VERSION_METHOD, limit);
target.put(GET_PARTITION_VERSION_METHOD, limit);
} else {
target.put(methodName, limit);
}
} catch (NumberFormatException e) {
LOG.warn("Invalid {} config entry: {}", configName, entry);
}
} else {
LOG.warn("Invalid {} config entry: {}", configName, entry);
}
}
return target;
}
private int getMethodTotalQps(String methodName, int defaultQpsPerCore) {
int qpsPerCore = methodQpsConfig.getOrDefault(methodName, defaultQpsPerCore);
if (qpsPerCore <= 0) {
return 0;
}
return qpsPerCore * processorCount;
}
protected int getClampedCost(String methodName, int cost) {
if (Config.meta_service_rpc_cost_clamped_to_limit_enabled) {
int limit = getMethodTotalCostLimit(methodName);
if (limit > 0 && cost > limit) {
LOG.info("Clamped cost: {} for method: {} to limit: {}", cost,
methodName, limit);
cost = limit;
}
}
return cost;
}
private int getMethodTotalCostLimit(String methodName) {
int costPerCore = methodCostConfig.getOrDefault(methodName, 0);
if (costPerCore <= 0) {
return 0;
}
return costPerCore * processorCount;
}
private QpsLimiter getQpsLimiter(String methodName) {
return qpsLimiters.compute(methodName, (name, limiter) -> {
if (limiter != null) {
return limiter;
}
int qps = getMethodTotalQps(name, Config.meta_service_rpc_rate_limit_default_qps_per_core);
if (qps > 0) {
return new QpsLimiter(name, Config.meta_service_rpc_rate_limit_max_waiting_request_num, qps);
}
return null;
});
}
private CostLimiter getCostLimiter(String methodName) {
return costLimiters.compute(methodName, (name, limiter) -> {
if (limiter != null) {
return limiter;
}
int costLimit = getMethodTotalCostLimit(name);
if (costLimit > 0) {
return new CostLimiter(methodName, costLimit);
}
return null;
});
}
private BackpressureQpsLimiter getBackpressureQpsLimiter(String methodName, double factor) {
return backpressureQpsLimiters.compute(methodName, (name, limiter) -> {
if (limiter != null) {
return limiter;
}
if (!overloadThrottleMethodAllowlist.contains(name)) {
return null;
}
int qps = getMethodTotalQps(name, Config.meta_service_rpc_rate_limit_default_qps_per_core);
if (qps > 0) {
return new BackpressureQpsLimiter(name, Config.meta_service_rpc_rate_limit_max_waiting_request_num, qps,
factor);
}
return null;
});
}
public boolean acquire(String methodName, int cost) throws RpcRateLimitException {
if (isConfigChanged()) {
reloadConfig();
}
long startAt = System.nanoTime();
try {
// Step1: Check backpressure limiter first (if overload throttle is active with factor < 1.0)
if (Config.meta_service_rpc_overload_throttle_enabled) {
double factor = MetaServiceOverloadThrottle.getInstance().getFactor();
if (factor < 1.0) {
QpsLimiter backpressureLimiter = getBackpressureQpsLimiter(methodName, factor);
if (backpressureLimiter != null) {
backpressureLimiter.acquire();
}
}
}
if (Config.meta_service_rpc_rate_limit_enabled) {
// Step2: Check qps limiter
QpsLimiter qpsLimiter = getQpsLimiter(methodName);
if (qpsLimiter != null) {
qpsLimiter.acquire();
}
// Step3: Check cost limiter
CostLimiter costLimiter = getCostLimiter(methodName);
if (costLimiter != null && cost > 0) {
costLimiter.acquire(cost);
return true;
}
}
return false;
} catch (RpcRateLimitException e) {
if (MetricRepo.isInit && Config.isCloudMode()) {
CloudMetrics.META_SERVICE_RPC_RATE_LIMIT_THROTTLED.getOrAdd(methodName).increase(1L);
}
throw e;
} finally {
long durationNs = System.nanoTime() - startAt;
SummaryProfile summaryProfile = SummaryProfile.getSummaryProfile(ConnectContext.get());
if (summaryProfile != null) {
summaryProfile.addWaitMsRpcRateLimiterTime(durationNs);
}
if (MetricRepo.isInit && Config.isCloudMode()) {
CloudMetrics.META_SERVICE_RPC_RATE_LIMIT_THROTTLED_LATENCY.getOrAdd(methodName)
.update(TimeUnit.NANOSECONDS.toMillis(durationNs));
}
}
}
public void release(String methodName, int cost) {
CostLimiter limiter = costLimiters.get(methodName);
if (limiter != null) {
try {
limiter.release(cost);
} catch (Exception e) {
LOG.warn("Failed to release cost limiter for method: {}, cost: {}", methodName, cost, e);
}
}
}
public void setOverloadFactor(double factor) {
if (Double.compare(factor, 1.0) >= 0) {
LOG.info("Overload factor is {}, clearing {} backpressure qps limiters", factor,
backpressureQpsLimiters.size());
backpressureQpsLimiters.clear();
return;
}
for (Entry<String, BackpressureQpsLimiter> entry : backpressureQpsLimiters.entrySet()) {
BackpressureQpsLimiter limiter = entry.getValue();
limiter.applyFactor(factor);
}
LOG.info("Applied overload factor {} to {} backpressure qps limiters", factor, backpressureQpsLimiters.size());
}
// only used for testing
Set<String> getOverloadThrottleMethodAllowlist() {
return overloadThrottleMethodAllowlist;
}
// only used for testing
Map<String, Integer> getMethodQpsConfig() {
return methodQpsConfig;
}
// only used for testing
Map<String, Integer> getMethodCostConfig() {
return methodCostConfig;
}
// only used for testing
Map<String, QpsLimiter> getQpsLimiters() {
return qpsLimiters;
}
// only used for testing
Map<String, CostLimiter> getCostLimiters() {
return costLimiters;
}
// only used for testing
Map<String, BackpressureQpsLimiter> getBackpressureQpsLimiters() {
return backpressureQpsLimiters;
}
}