Coverage Report

Created: 2026-05-11 13:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_ms_rpc_rate_limiters.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bvar/bvar.h>
21
22
#include <array>
23
#include <memory>
24
#include <string>
25
#include <string_view>
26
27
#include "common/atomic_shared_ptr.h"
28
#include "cpp/token_bucket_rate_limiter.h"
29
30
namespace doris::cloud {
31
32
// Macro to define all Meta Service RPC types
33
// Usage: META_SERVICE_RPC_TYPES(X) where X(enum_name, config_suffix, display_name)
34
// - enum_name: the enum value name (e.g., GET_TABLET_META)
35
// - config_suffix: suffix for the config name (e.g., get_tablet_meta -> ms_rpc_qps_get_tablet_meta)
36
// - display_name: the human-readable name for bvar metrics (e.g., "get tablet meta")
37
#define META_SERVICE_RPC_TYPES(X)                                         \
38
73
    X(GET_TABLET_META, get_tablet_meta, "get tablet meta")                \
39
73
    X(GET_ROWSET, get_rowset, "get rowset")                               \
40
73
    X(PREPARE_ROWSET, prepare_rowset, "prepare rowset")                   \
41
73
    X(COMMIT_ROWSET, commit_rowset, "commit rowset")                      \
42
73
    X(UPDATE_TMP_ROWSET, update_tmp_rowset, "update tmp rowset")          \
43
73
    X(COMMIT_TXN, commit_txn, "commit txn")                               \
44
73
    X(ABORT_TXN, abort_txn, "abort txn")                                  \
45
73
    X(PRECOMMIT_TXN, precommit_txn, "precommit txn")                      \
46
73
    X(GET_OBJ_STORE_INFO, get_obj_store_info, "get obj store info")       \
47
73
    X(START_TABLET_JOB, start_tablet_job, "start tablet job")             \
48
73
    X(FINISH_TABLET_JOB, finish_tablet_job, "finish tablet job")          \
49
73
    X(GET_DELETE_BITMAP, get_delete_bitmap, "get delete bitmap")          \
50
73
    X(UPDATE_DELETE_BITMAP, update_delete_bitmap, "update delete bitmap") \
51
73
    X(GET_DELETE_BITMAP_UPDATE_LOCK, get_delete_bitmap_update_lock,       \
52
69
      "get delete bitmap update lock")                                    \
53
73
    X(REMOVE_DELETE_BITMAP_UPDATE_LOCK, remove_delete_bitmap_update_lock, \
54
69
      "remove delete bitmap update lock")                                 \
55
73
    X(GET_INSTANCE, get_instance, "get instance")                         \
56
73
    X(PREPARE_RESTORE_JOB, prepare_restore_job, "prepare restore job")    \
57
73
    X(COMMIT_RESTORE_JOB, commit_restore_job, "commit restore job")       \
58
73
    X(FINISH_RESTORE_JOB, finish_restore_job, "finish restore job")       \
59
73
    X(LIST_SNAPSHOTS, list_snapshots, "list snapshots")                   \
60
73
    X(GET_CLUSTER_STATUS, get_cluster_status, "get cluster status")       \
61
73
    X(UPDATE_PACKED_FILE_INFO, update_packed_file_info, "update packed file info")
62
63
// Enum class for Meta Service RPC types
64
enum class MetaServiceRPC : size_t {
65
#define DEFINE_ENUM(enum_name, config_suffix, display_name) enum_name,
66
    META_SERVICE_RPC_TYPES(DEFINE_ENUM)
67
#undef DEFINE_ENUM
68
            COUNT // Total number of RPC types
69
};
70
71
// Get the display name for a MetaServiceRPC enum value
72
std::string_view meta_service_rpc_display_name(MetaServiceRPC rpc);
73
74
// Rate limiter with associated metrics for a single RPC method
75
struct RpcRateLimiter {
76
    std::unique_ptr<TokenBucketRateLimiterHolder> limiter;
77
    std::unique_ptr<bvar::LatencyRecorder> latency_recorder;
78
79
    RpcRateLimiter(int qps, std::string_view op_name);
80
81
    // Reset the rate limiter with new QPS
82
    void reset(int qps);
83
};
84
85
// Host-level rate limiters for MS RPCs to prevent burst traffic
86
// Each RPC method has its own rate limiter and bvar metrics
87
// Uses enum class MetaServiceRPC as key for O(1) lookup
88
class HostLevelMSRpcRateLimiters {
89
public:
90
    // Constructor initializes rate limiters for all RPC types from config
91
    HostLevelMSRpcRateLimiters();
92
93
    // Constructor for testing: initializes all rate limiters with uniform QPS
94
    // This allows unit tests to be independent of config values
95
    explicit HostLevelMSRpcRateLimiters(int uniform_qps);
96
97
73
    ~HostLevelMSRpcRateLimiters() = default;
98
99
    // Rate limit the specified RPC method, returns actual sleep time in nanoseconds
100
    // Thread-safe: each limiter handles its own synchronization
101
    int64_t limit(MetaServiceRPC rpc);
102
103
    // Reset a specific rate limiter with new QPS
104
    void reset(MetaServiceRPC rpc, int qps);
105
106
    // Reset all rate limiters (re-reads QPS from config)
107
    void reset_all();
108
109
private:
110
    void init_from_config();
111
    void init_with_uniform_qps(int qps);
112
113
    // Use atomic_shared_ptr for thread-safe access during concurrent limit() and reset() calls
114
    std::array<doris::atomic_shared_ptr<RpcRateLimiter>, static_cast<size_t>(MetaServiceRPC::COUNT)>
115
            _limiters;
116
};
117
118
} // namespace doris::cloud