be/src/cloud/cloud_ms_rpc_rate_limiters.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | |
22 | | #include <array> |
23 | | #include <memory> |
24 | | #include <string> |
25 | | #include <string_view> |
26 | | |
27 | | #include "common/atomic_shared_ptr.h" |
28 | | #include "cpp/token_bucket_rate_limiter.h" |
29 | | |
30 | | namespace doris::cloud { |
31 | | |
32 | | // Macro to define all Meta Service RPC types |
33 | | // Usage: META_SERVICE_RPC_TYPES(X) where X(enum_name, config_suffix, display_name) |
34 | | // - enum_name: the enum value name (e.g., GET_TABLET_META) |
35 | | // - config_suffix: suffix for the config name (e.g., get_tablet_meta -> ms_rpc_qps_get_tablet_meta) |
36 | | // - display_name: the human-readable name for bvar metrics (e.g., "get tablet meta") |
37 | | #define META_SERVICE_RPC_TYPES(X) \ |
38 | 73 | X(GET_TABLET_META, get_tablet_meta, "get tablet meta") \ |
39 | 73 | X(GET_ROWSET, get_rowset, "get rowset") \ |
40 | 73 | X(PREPARE_ROWSET, prepare_rowset, "prepare rowset") \ |
41 | 73 | X(COMMIT_ROWSET, commit_rowset, "commit rowset") \ |
42 | 73 | X(UPDATE_TMP_ROWSET, update_tmp_rowset, "update tmp rowset") \ |
43 | 73 | X(COMMIT_TXN, commit_txn, "commit txn") \ |
44 | 73 | X(ABORT_TXN, abort_txn, "abort txn") \ |
45 | 73 | X(PRECOMMIT_TXN, precommit_txn, "precommit txn") \ |
46 | 73 | X(GET_OBJ_STORE_INFO, get_obj_store_info, "get obj store info") \ |
47 | 73 | X(START_TABLET_JOB, start_tablet_job, "start tablet job") \ |
48 | 73 | X(FINISH_TABLET_JOB, finish_tablet_job, "finish tablet job") \ |
49 | 73 | X(GET_DELETE_BITMAP, get_delete_bitmap, "get delete bitmap") \ |
50 | 73 | X(UPDATE_DELETE_BITMAP, update_delete_bitmap, "update delete bitmap") \ |
51 | 73 | X(GET_DELETE_BITMAP_UPDATE_LOCK, get_delete_bitmap_update_lock, \ |
52 | 69 | "get delete bitmap update lock") \ |
53 | 73 | X(REMOVE_DELETE_BITMAP_UPDATE_LOCK, remove_delete_bitmap_update_lock, \ |
54 | 69 | "remove delete bitmap update lock") \ |
55 | 73 | X(GET_INSTANCE, get_instance, "get instance") \ |
56 | 73 | X(PREPARE_RESTORE_JOB, prepare_restore_job, "prepare restore job") \ |
57 | 73 | X(COMMIT_RESTORE_JOB, commit_restore_job, "commit restore job") \ |
58 | 73 | X(FINISH_RESTORE_JOB, finish_restore_job, "finish restore job") \ |
59 | 73 | X(LIST_SNAPSHOTS, list_snapshots, "list snapshots") \ |
60 | 73 | X(GET_CLUSTER_STATUS, get_cluster_status, "get cluster status") \ |
61 | 73 | X(UPDATE_PACKED_FILE_INFO, update_packed_file_info, "update packed file info") |
62 | | |
63 | | // Enum class for Meta Service RPC types |
64 | | enum class MetaServiceRPC : size_t { |
65 | | #define DEFINE_ENUM(enum_name, config_suffix, display_name) enum_name, |
66 | | META_SERVICE_RPC_TYPES(DEFINE_ENUM) |
67 | | #undef DEFINE_ENUM |
68 | | COUNT // Total number of RPC types |
69 | | }; |
70 | | |
71 | | // Get the display name for a MetaServiceRPC enum value |
72 | | std::string_view meta_service_rpc_display_name(MetaServiceRPC rpc); |
73 | | |
74 | | // Rate limiter with associated metrics for a single RPC method |
75 | | struct RpcRateLimiter { |
76 | | std::unique_ptr<TokenBucketRateLimiterHolder> limiter; |
77 | | std::unique_ptr<bvar::LatencyRecorder> latency_recorder; |
78 | | |
79 | | RpcRateLimiter(int qps, std::string_view op_name); |
80 | | |
81 | | // Reset the rate limiter with new QPS |
82 | | void reset(int qps); |
83 | | }; |
84 | | |
85 | | // Host-level rate limiters for MS RPCs to prevent burst traffic |
86 | | // Each RPC method has its own rate limiter and bvar metrics |
87 | | // Uses enum class MetaServiceRPC as key for O(1) lookup |
88 | | class HostLevelMSRpcRateLimiters { |
89 | | public: |
90 | | // Constructor initializes rate limiters for all RPC types from config |
91 | | HostLevelMSRpcRateLimiters(); |
92 | | |
93 | | // Constructor for testing: initializes all rate limiters with uniform QPS |
94 | | // This allows unit tests to be independent of config values |
95 | | explicit HostLevelMSRpcRateLimiters(int uniform_qps); |
96 | | |
97 | 73 | ~HostLevelMSRpcRateLimiters() = default; |
98 | | |
99 | | // Rate limit the specified RPC method, returns actual sleep time in nanoseconds |
100 | | // Thread-safe: each limiter handles its own synchronization |
101 | | int64_t limit(MetaServiceRPC rpc); |
102 | | |
103 | | // Reset a specific rate limiter with new QPS |
104 | | void reset(MetaServiceRPC rpc, int qps); |
105 | | |
106 | | // Reset all rate limiters (re-reads QPS from config) |
107 | | void reset_all(); |
108 | | |
109 | | private: |
110 | | void init_from_config(); |
111 | | void init_with_uniform_qps(int qps); |
112 | | |
113 | | // Use atomic_shared_ptr for thread-safe access during concurrent limit() and reset() calls |
114 | | std::array<doris::atomic_shared_ptr<RpcRateLimiter>, static_cast<size_t>(MetaServiceRPC::COUNT)> |
115 | | _limiters; |
116 | | }; |
117 | | |
118 | | } // namespace doris::cloud |