/root/doris/be/src/runtime/client_cache.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/BackendService.h> // IWYU pragma: keep |
21 | | #include <gen_cpp/FrontendService.h> // IWYU pragma: keep |
22 | | #include <gen_cpp/TPaloBrokerService.h> // IWYU pragma: keep |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <string.h> |
26 | | #include <unistd.h> |
27 | | |
28 | | #include <functional> |
29 | | #include <list> |
30 | | #include <memory> |
31 | | #include <mutex> |
32 | | #include <ostream> |
33 | | #include <string> |
34 | | #include <typeinfo> |
35 | | #include <unordered_map> |
36 | | |
37 | | #include "common/config.h" |
38 | | #include "common/exception.h" |
39 | | #include "common/status.h" |
40 | | #include "util/hash_util.hpp" |
41 | | #include "util/metrics.h" |
42 | | #include "util/thrift_client.h" |
43 | | #include "util/thrift_server.h" |
44 | | |
45 | | namespace doris { |
46 | | // Helper class which implements the majority of the caching |
47 | | // functionality without using templates (i.e. pointers to the |
48 | | // superclass of all ThriftClients and a void* for the key). |
49 | | // |
50 | | // The user of this class only sees RPC proxy classes, but we have |
51 | | // to track the ThriftClient to manipulate the underlying |
52 | | // transport. To do this, we maintain a map from an opaque 'key' |
53 | | // pointer type to the client implementation. We actually know the |
54 | | // type of the pointer (it's the type parameter to ClientCache), but |
55 | | // we deliberately avoid using it so that this entire class doesn't |
56 | | // get inlined every time it gets used. |
57 | | // |
58 | | // This class is thread-safe. |
59 | | // |
60 | | // TODO: shut down clients in the background if they don't get used for a period of time |
61 | | // TODO: in order to reduce locking overhead when getting/releasing clients, |
62 | | // add call to hand back pointer to list stored in ClientCache and add separate lock |
63 | | // to list (or change to lock-free list) |
64 | | // TODO: reduce locking overhead and by adding per-address client caches, each with its |
65 | | // own lock. |
66 | | // TODO: More graceful handling of clients that have failed (maybe better |
67 | | // handled by a smart-wrapper of the interface object). |
68 | | // TODO: limits on total number of clients, and clients per-backend |
69 | | class ClientCacheHelper { |
70 | | public: |
71 | | ~ClientCacheHelper(); |
72 | | // Callback method which produces a client object when one cannot be |
73 | | // found in the cache. Supplied by the ClientCache wrapper. |
74 | | using ClientFactory = |
75 | | std::function<ThriftClientImpl*(const TNetworkAddress& hostport, void** client_key)>; |
76 | | |
77 | | // Return client for specific host/port in 'client'. If a client |
78 | | // is not available, the client parameter is set to nullptr. |
79 | | Status get_client(const TNetworkAddress& hostport, ClientFactory& factory_method, |
80 | | void** client_key, int timeout_ms); |
81 | | |
82 | | // Close and delete the underlying transport and remove the client from _client_map. |
83 | | // Return a new client connecting to the same host/port. |
84 | | // Return an error status and set client_key to nullptr if a new client cannot |
85 | | // created. |
86 | | Status reopen_client(ClientFactory& factory_method, void** client_key, int timeout_ms); |
87 | | |
88 | | // Return a client to the cache, without closing it, and set *client_key to nullptr. |
89 | | void release_client(void** client_key); |
90 | | |
91 | | std::string debug_string(); |
92 | | |
93 | | void init_metrics(const std::string& name); |
94 | | |
95 | | private: |
96 | | template <class T> |
97 | | friend class ClientCache; |
98 | | // Private constructor so that only ClientCache can instantiate this class. |
99 | 0 | ClientCacheHelper() : _metrics_enabled(false), _max_cache_size_per_host(-1) {} |
100 | | |
101 | | ClientCacheHelper(int max_cache_size_per_host) |
102 | 0 | : _metrics_enabled(false), _max_cache_size_per_host(max_cache_size_per_host) {} |
103 | | |
104 | | // Protects all member variables |
105 | | // TODO: have more fine-grained locks or use lock-free data structures, |
106 | | // this isn't going to scale for a high request rate |
107 | | std::mutex _lock; |
108 | | |
109 | | // Cached client entry with resolved IP address |
110 | | struct CachedClient { |
111 | | void* client_key; |
112 | | std::string resolved_ip; // The IP address when this client was cached |
113 | | }; |
114 | | |
115 | | // map from (host, port) to list of cached client entries for that address |
116 | | using ClientCacheMap = std::unordered_map<TNetworkAddress, std::list<CachedClient>>; |
117 | | ClientCacheMap _client_cache; |
118 | | |
119 | | // Get a client from cache that matches the resolved IP. |
120 | | // If cache not found or IP doesn't match, set client_key as nullptr. |
121 | | void _get_client_from_cache(const TNetworkAddress& hostport, const std::string& resolved_ip, |
122 | | void** client_key); |
123 | | |
124 | | // Map from client key back to its associated ThriftClientImpl transport |
125 | | using ClientMap = std::unordered_map<void*, ThriftClientImpl*>; |
126 | | ClientMap _client_map; |
127 | | |
128 | | // Map from client key to the original hostport (with hostname, not resolved IP). |
129 | | // This is needed to correctly return clients to the cache by hostname. |
130 | | using ClientHostportMap = std::unordered_map<void*, TNetworkAddress>; |
131 | | ClientHostportMap _client_hostport_map; |
132 | | |
133 | | bool _metrics_enabled; |
134 | | |
135 | | // max connections per host in this cache, -1 means unlimited |
136 | | int _max_cache_size_per_host; |
137 | | |
138 | | std::shared_ptr<MetricEntity> _thrift_client_metric_entity; |
139 | | |
140 | | // Number of clients 'checked-out' from the cache |
141 | | IntGauge* thrift_used_clients = nullptr; |
142 | | |
143 | | // Total clients in the cache, including those in use |
144 | | IntGauge* thrift_opened_clients = nullptr; |
145 | | |
146 | | // Create a new client for specific host/port in 'client' and put it in _client_map. |
147 | | // The resolved_ip is the actual IP address to connect to (resolved from hostname). |
148 | | Status _create_client(const TNetworkAddress& hostport, const std::string& resolved_ip, |
149 | | ClientFactory& factory_method, void** client_key, int timeout_ms); |
150 | | }; |
151 | | |
152 | | template <class T> |
153 | | class ClientCache; |
154 | | |
155 | | // A scoped client connection to help manage clients from a client cache. |
156 | | // |
157 | | // Example: |
158 | | // { |
159 | | // DorisInternalServiceConnection client(cache, address, &status); |
160 | | // try { |
161 | | // client->TransmitData(...); |
162 | | // } catch (TTransportException& e) { |
163 | | // // Retry |
164 | | // RETURN_IF_ERROR(client.Reopen()); |
165 | | // client->TransmitData(...); |
166 | | // } |
167 | | // } |
168 | | // ('client' is released back to cache upon destruction.) |
169 | | template <class T> |
170 | | class ClientConnection { |
171 | | public: |
172 | | ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, Status* status) |
173 | 0 | : ClientConnection(client_cache, address, 0, status, 3) {} |
174 | | |
175 | | ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, int timeout_ms, |
176 | | Status* status, int max_retries = 3) |
177 | 0 | : _client_cache(client_cache), _client(nullptr) { |
178 | 0 | int num_retries = 0; |
179 | 0 | do { |
180 | 0 | *status = _client_cache->get_client(address, &_client, timeout_ms); |
181 | 0 | if (status->ok()) { |
182 | 0 | DCHECK(_client != nullptr); |
183 | 0 | break; |
184 | 0 | } |
185 | 0 | DCHECK(_client == nullptr); |
186 | 0 | if (num_retries++ < max_retries) { |
187 | | // exponential backoff retry with starting delay of 500ms |
188 | 0 | usleep(500000 * (1 << num_retries)); |
189 | 0 | LOG(INFO) << "Failed to get client from cache: " << status->to_string() |
190 | 0 | << ", retrying[" << num_retries << "]..."; |
191 | 0 | } |
192 | 0 | } while (num_retries < max_retries); |
193 | 0 | } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi |
194 | | |
195 | 0 | ~ClientConnection() { |
196 | 0 | if (_client != nullptr) { |
197 | 0 | _client_cache->release_client(&_client); |
198 | 0 | } |
199 | 0 | } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEED2Ev Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEED2Ev Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEED2Ev |
200 | | |
201 | 0 | Status reopen(int timeout_ms) { return _client_cache->reopen_client(&_client, timeout_ms); }Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEE6reopenEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEi |
202 | | |
203 | 0 | Status reopen() { return _client_cache->reopen_client(&_client, 0); }Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEv Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEv |
204 | | |
205 | 0 | inline bool is_alive() { return _client != nullptr; } |
206 | | |
207 | 0 | T* operator->() const { |
208 | 0 | if (_client == nullptr) { |
209 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid RPC client!"); |
210 | 0 | } |
211 | 0 | return _client; |
212 | 0 | } Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_21FrontendServiceClientEEptEv Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEptEv Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_20BackendServiceClientEEptEv |
213 | | |
214 | | private: |
215 | | ClientCache<T>* _client_cache = nullptr; |
216 | | T* _client; |
217 | | }; |
218 | | |
219 | | // Generic cache of Thrift clients for a given service type. |
220 | | // This class is thread-safe. |
221 | | template <class T> |
222 | | class ClientCache { |
223 | | public: |
224 | | using Client = ThriftClient<T>; |
225 | | |
226 | 0 | ClientCache() { |
227 | 0 | _client_factory = |
228 | 0 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, |
229 | 0 | std::placeholders::_1, std::placeholders::_2); |
230 | 0 | } |
231 | | |
232 | 0 | ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) { |
233 | 0 | _client_factory = |
234 | 0 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, |
235 | 0 | std::placeholders::_1, std::placeholders::_2); |
236 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2Ei Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEEC2Ei Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ei |
237 | | |
238 | | // Helper method which returns a debug string |
239 | | std::string debug_string() { return _client_cache_helper.debug_string(); } |
240 | | |
241 | | // Adds metrics for this cache. |
242 | | // The metrics have an identification by the 'name' argument |
243 | | // (which should not end in a period). |
244 | | // Must be called before the cache is used, otherwise the metrics might be wrong |
245 | 0 | void init_metrics(const std::string& name) { _client_cache_helper.init_metrics(name); }Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
246 | | |
247 | | private: |
248 | | friend class ClientConnection<T>; |
249 | | |
250 | | // Most operations in this class are thin wrappers around the |
251 | | // equivalent in ClientCacheHelper, which is a non-templated cache |
252 | | // to avoid inlining lots of code wherever this cache is used. |
253 | | ClientCacheHelper _client_cache_helper; |
254 | | |
255 | | // Function pointer, bound to make_client, which produces clients when the cache is empty |
256 | | using ClientFactory = ClientCacheHelper::ClientFactory; |
257 | | ClientFactory _client_factory; |
258 | | |
259 | | // Obtains a pointer to a Thrift interface object (of type T), |
260 | | // backed by a live transport which is already open. Returns |
261 | | // Status::OK() unless there was an error opening the transport. |
262 | 0 | Status get_client(const TNetworkAddress& hostport, T** iface, int timeout_ms) { |
263 | 0 | return _client_cache_helper.get_client(hostport, _client_factory, |
264 | 0 | reinterpret_cast<void**>(iface), timeout_ms); |
265 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i |
266 | | |
267 | | // Close and delete the underlying transport. Return a new client connecting to the |
268 | | // same host/port. |
269 | | // Return an error status if a new connection cannot be established and *client will be |
270 | | // nullptr in that case. |
271 | 0 | Status reopen_client(T** client, int timeout_ms) { |
272 | 0 | return _client_cache_helper.reopen_client(_client_factory, reinterpret_cast<void**>(client), |
273 | 0 | timeout_ms); |
274 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE13reopen_clientEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE13reopen_clientEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE13reopen_clientEPPS1_i |
275 | | |
276 | | // Return the client to the cache and set *client to nullptr. |
277 | 0 | void release_client(T** client) { |
278 | 0 | return _client_cache_helper.release_client(reinterpret_cast<void**>(client)); |
279 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE14release_clientEPPS1_ Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE14release_clientEPPS1_ Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE14release_clientEPPS1_ |
280 | | |
281 | | // Factory method to produce a new ThriftClient<T> for the wrapped cache |
282 | 0 | ThriftClientImpl* make_client(const TNetworkAddress& hostport, void** client_key) { |
283 | 0 | static ThriftServer::ServerType server_type = get_thrift_server_type(); |
284 | 0 | Client* client = new Client(hostport.hostname, hostport.port, server_type); |
285 | 0 | *client_key = reinterpret_cast<void*>(client->iface()); |
286 | 0 | return client; |
287 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv |
288 | | |
289 | | // since service type is multiple, we should set thrift server type here for be thrift client |
290 | 0 | ThriftServer::ServerType get_thrift_server_type() { |
291 | 0 | auto& thrift_server_type = config::thrift_server_type_of_fe; |
292 | 0 | std::transform(thrift_server_type.begin(), thrift_server_type.end(), |
293 | 0 | thrift_server_type.begin(), [](auto c) { return std::toupper(c); });Unexecuted instantiation: _ZZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ Unexecuted instantiation: _ZZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ Unexecuted instantiation: _ZZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ |
294 | 0 | if (strcmp(typeid(T).name(), "N5doris21FrontendServiceClientE") == 0 && |
295 | 0 | thrift_server_type == "THREADED_SELECTOR") { |
296 | 0 | return ThriftServer::ServerType::NON_BLOCKING; |
297 | 0 | } else { |
298 | 0 | return ThriftServer::ServerType::THREADED; |
299 | 0 | } |
300 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEv Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEv Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEv |
301 | | }; |
302 | | |
303 | | // Doris backend client cache, used by a backend to send requests |
304 | | // to any other backend. |
305 | | using BackendServiceClientCache = ClientCache<BackendServiceClient>; |
306 | | using BackendServiceConnection = ClientConnection<BackendServiceClient>; |
307 | | |
308 | | using FrontendServiceClientCache = ClientCache<FrontendServiceClient>; |
309 | | using FrontendServiceConnection = ClientConnection<FrontendServiceClient>; |
310 | | |
311 | | using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>; |
312 | | using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>; |
313 | | |
314 | | } // namespace doris |