/root/doris/be/src/runtime/client_cache.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/BackendService.h> // IWYU pragma: keep |
21 | | #include <gen_cpp/FrontendService.h> // IWYU pragma: keep |
22 | | #include <gen_cpp/TPaloBrokerService.h> // IWYU pragma: keep |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <string.h> |
26 | | #include <unistd.h> |
27 | | |
28 | | #include <functional> |
29 | | #include <list> |
30 | | #include <memory> |
31 | | #include <mutex> |
32 | | #include <ostream> |
33 | | #include <string> |
34 | | #include <typeinfo> |
35 | | #include <unordered_map> |
36 | | |
37 | | #include "common/config.h" |
38 | | #include "common/exception.h" |
39 | | #include "common/status.h" |
40 | | #include "util/hash_util.hpp" |
41 | | #include "util/metrics.h" |
42 | | #include "util/thrift_client.h" |
43 | | #include "util/thrift_server.h" |
44 | | |
45 | | namespace doris { |
46 | | // Helper class which implements the majority of the caching |
47 | | // functionality without using templates (i.e. pointers to the |
48 | | // superclass of all ThriftClients and a void* for the key). |
49 | | // |
50 | | // The user of this class only sees RPC proxy classes, but we have |
51 | | // to track the ThriftClient to manipulate the underlying |
52 | | // transport. To do this, we maintain a map from an opaque 'key' |
53 | | // pointer type to the client implementation. We actually know the |
54 | | // type of the pointer (it's the type parameter to ClientCache), but |
55 | | // we deliberately avoid using it so that this entire class doesn't |
56 | | // get inlined every time it gets used. |
57 | | // |
58 | | // This class is thread-safe. |
59 | | // |
60 | | // TODO: shut down clients in the background if they don't get used for a period of time |
61 | | // TODO: in order to reduce locking overhead when getting/releasing clients, |
62 | | // add call to hand back pointer to list stored in ClientCache and add separate lock |
63 | | // to list (or change to lock-free list) |
64 | | // TODO: reduce locking overhead and by adding per-address client caches, each with its |
65 | | // own lock. |
66 | | // TODO: More graceful handling of clients that have failed (maybe better |
67 | | // handled by a smart-wrapper of the interface object). |
68 | | // TODO: limits on total number of clients, and clients per-backend |
69 | | class ClientCacheHelper { |
70 | | public: |
71 | | ~ClientCacheHelper(); |
72 | | // Callback method which produces a client object when one cannot be |
73 | | // found in the cache. Supplied by the ClientCache wrapper. |
74 | | using ClientFactory = |
75 | | std::function<ThriftClientImpl*(const TNetworkAddress& hostport, void** client_key)>; |
76 | | |
77 | | // Return client for specific host/port in 'client'. If a client |
78 | | // is not available, the client parameter is set to nullptr. |
79 | | Status get_client(const TNetworkAddress& hostport, ClientFactory& factory_method, |
80 | | void** client_key, int timeout_ms); |
81 | | |
82 | | // Close and delete the underlying transport and remove the client from _client_map. |
83 | | // Return a new client connecting to the same host/port. |
84 | | // Return an error status and set client_key to nullptr if a new client cannot |
85 | | // created. |
86 | | Status reopen_client(ClientFactory& factory_method, void** client_key, int timeout_ms); |
87 | | |
88 | | // Return a client to the cache, without closing it, and set *client_key to nullptr. |
89 | | void release_client(void** client_key); |
90 | | |
91 | | std::string debug_string(); |
92 | | |
93 | | void init_metrics(const std::string& name); |
94 | | |
95 | | private: |
96 | | template <class T> |
97 | | friend class ClientCache; |
98 | | // Private constructor so that only ClientCache can instantiate this class. |
99 | 1 | ClientCacheHelper() : _metrics_enabled(false), _max_cache_size_per_host(-1) {} |
100 | | |
101 | | ClientCacheHelper(int max_cache_size_per_host) |
102 | 0 | : _metrics_enabled(false), _max_cache_size_per_host(max_cache_size_per_host) {} |
103 | | |
104 | | // Protects all member variables |
105 | | // TODO: have more fine-grained locks or use lock-free data structures, |
106 | | // this isn't going to scale for a high request rate |
107 | | std::mutex _lock; |
108 | | |
109 | | // map from (host, port) to list of client keys for that address |
110 | | using ClientCacheMap = std::unordered_map<TNetworkAddress, std::list<void*>>; |
111 | | ClientCacheMap _client_cache; |
112 | | |
113 | | // if cache not found, set client_key as nullptr |
114 | | void _get_client_from_cache(const TNetworkAddress& hostport, void** client_key); |
115 | | |
116 | | // Map from client key back to its associated ThriftClientImpl transport |
117 | | using ClientMap = std::unordered_map<void*, ThriftClientImpl*>; |
118 | | ClientMap _client_map; |
119 | | |
120 | | bool _metrics_enabled; |
121 | | |
122 | | // max connections per host in this cache, -1 means unlimited |
123 | | int _max_cache_size_per_host; |
124 | | |
125 | | std::shared_ptr<MetricEntity> _thrift_client_metric_entity; |
126 | | |
127 | | // Number of clients 'checked-out' from the cache |
128 | | IntGauge* thrift_used_clients = nullptr; |
129 | | |
130 | | // Total clients in the cache, including those in use |
131 | | IntGauge* thrift_opened_clients = nullptr; |
132 | | |
133 | | // Create a new client for specific host/port in 'client' and put it in _client_map |
134 | | Status _create_client(const TNetworkAddress& hostport, ClientFactory& factory_method, |
135 | | void** client_key, int timeout_ms); |
136 | | }; |
137 | | |
138 | | template <class T> |
139 | | class ClientCache; |
140 | | |
141 | | // A scoped client connection to help manage clients from a client cache. |
142 | | // |
143 | | // Example: |
144 | | // { |
145 | | // DorisInternalServiceConnection client(cache, address, &status); |
146 | | // try { |
147 | | // client->TransmitData(...); |
148 | | // } catch (TTransportException& e) { |
149 | | // // Retry |
150 | | // RETURN_IF_ERROR(client.Reopen()); |
151 | | // client->TransmitData(...); |
152 | | // } |
153 | | // } |
154 | | // ('client' is released back to cache upon destruction.) |
155 | | template <class T> |
156 | | class ClientConnection { |
157 | | public: |
158 | | ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, Status* status) |
159 | 0 | : ClientConnection(client_cache, address, 0, status, 3) {} |
160 | | |
161 | | ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, int timeout_ms, |
162 | | Status* status, int max_retries = 3) |
163 | 0 | : _client_cache(client_cache), _client(nullptr) { |
164 | 0 | int num_retries = 0; |
165 | 0 | do { |
166 | 0 | *status = _client_cache->get_client(address, &_client, timeout_ms); |
167 | 0 | if (status->ok()) { |
168 | 0 | DCHECK(_client != nullptr); |
169 | 0 | break; |
170 | 0 | } |
171 | 0 | DCHECK(_client == nullptr); |
172 | 0 | if (num_retries++ < max_retries) { |
173 | | // exponential backoff retry with starting delay of 500ms |
174 | 0 | usleep(500000 * (1 << num_retries)); |
175 | 0 | LOG(INFO) << "Failed to get client from cache: " << status->to_string() |
176 | 0 | << ", retrying[" << num_retries << "]..."; |
177 | 0 | } |
178 | 0 | } while (num_retries < max_retries); |
179 | 0 | } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi |
180 | | |
181 | 0 | ~ClientConnection() { |
182 | 0 | if (_client != nullptr) { |
183 | 0 | _client_cache->release_client(&_client); |
184 | 0 | } |
185 | 0 | } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEED2Ev Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEED2Ev Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEED2Ev |
186 | | |
187 | 0 | Status reopen(int timeout_ms) { return _client_cache->reopen_client(&_client, timeout_ms); } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEE6reopenEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEi |
188 | | |
189 | 0 | Status reopen() { return _client_cache->reopen_client(&_client, 0); } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEv Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEv |
190 | | |
191 | 0 | inline bool is_alive() { return _client != nullptr; } |
192 | | |
193 | 0 | T* operator->() const { |
194 | 0 | if (_client == nullptr) { |
195 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid RPC client!"); |
196 | 0 | } |
197 | 0 | return _client; |
198 | 0 | } Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_21FrontendServiceClientEEptEv Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEptEv Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_20BackendServiceClientEEptEv |
199 | | |
200 | | private: |
201 | | ClientCache<T>* _client_cache = nullptr; |
202 | | T* _client; |
203 | | }; |
204 | | |
205 | | // Generic cache of Thrift clients for a given service type. |
206 | | // This class is thread-safe. |
207 | | template <class T> |
208 | | class ClientCache { |
209 | | public: |
210 | | using Client = ThriftClient<T>; |
211 | | |
212 | 1 | ClientCache() { |
213 | 1 | _client_factory = |
214 | 1 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, |
215 | 1 | std::placeholders::_1, std::placeholders::_2); |
216 | 1 | } _ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2Ev Line | Count | Source | 212 | 1 | ClientCache() { | 213 | 1 | _client_factory = | 214 | 1 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, | 215 | 1 | std::placeholders::_1, std::placeholders::_2); | 216 | 1 | } |
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ev |
217 | | |
218 | 0 | ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) { |
219 | 0 | _client_factory = |
220 | 0 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, |
221 | 0 | std::placeholders::_1, std::placeholders::_2); |
222 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEEC2Ei Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2Ei Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ei |
223 | | |
224 | | // Helper method which returns a debug string |
225 | | std::string debug_string() { return _client_cache_helper.debug_string(); } |
226 | | |
227 | | // Adds metrics for this cache. |
228 | | // The metrics have an identification by the 'name' argument |
229 | | // (which should not end in a period). |
230 | | // Must be called before the cache is used, otherwise the metrics might be wrong |
231 | 0 | void init_metrics(const std::string& name) { _client_cache_helper.init_metrics(name); } Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
232 | | |
233 | | private: |
234 | | friend class ClientConnection<T>; |
235 | | |
236 | | // Most operations in this class are thin wrappers around the |
237 | | // equivalent in ClientCacheHelper, which is a non-templated cache |
238 | | // to avoid inlining lots of code wherever this cache is used. |
239 | | ClientCacheHelper _client_cache_helper; |
240 | | |
241 | | // Function pointer, bound to make_client, which produces clients when the cache is empty |
242 | | using ClientFactory = ClientCacheHelper::ClientFactory; |
243 | | ClientFactory _client_factory; |
244 | | |
245 | | // Obtains a pointer to a Thrift interface object (of type T), |
246 | | // backed by a live transport which is already open. Returns |
247 | | // Status::OK() unless there was an error opening the transport. |
248 | 0 | Status get_client(const TNetworkAddress& hostport, T** iface, int timeout_ms) { |
249 | 0 | return _client_cache_helper.get_client(hostport, _client_factory, |
250 | 0 | reinterpret_cast<void**>(iface), timeout_ms); |
251 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i |
252 | | |
253 | | // Close and delete the underlying transport. Return a new client connecting to the |
254 | | // same host/port. |
255 | | // Return an error status if a new connection cannot be established and *client will be |
256 | | // nullptr in that case. |
257 | 0 | Status reopen_client(T** client, int timeout_ms) { |
258 | 0 | return _client_cache_helper.reopen_client(_client_factory, reinterpret_cast<void**>(client), |
259 | 0 | timeout_ms); |
260 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE13reopen_clientEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE13reopen_clientEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE13reopen_clientEPPS1_i |
261 | | |
262 | | // Return the client to the cache and set *client to nullptr. |
263 | 0 | void release_client(T** client) { |
264 | 0 | return _client_cache_helper.release_client(reinterpret_cast<void**>(client)); |
265 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE14release_clientEPPS1_ Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE14release_clientEPPS1_ Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE14release_clientEPPS1_ |
266 | | |
267 | | // Factory method to produce a new ThriftClient<T> for the wrapped cache |
268 | 0 | ThriftClientImpl* make_client(const TNetworkAddress& hostport, void** client_key) { |
269 | 0 | static ThriftServer::ServerType server_type = get_thrift_server_type(); |
270 | 0 | Client* client = new Client(hostport.hostname, hostport.port, server_type); |
271 | 0 | *client_key = reinterpret_cast<void*>(client->iface()); |
272 | 0 | return client; |
273 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv |
274 | | |
275 | | // since service type is multiple, we should set thrift server type here for be thrift client |
276 | 0 | ThriftServer::ServerType get_thrift_server_type() { |
277 | 0 | auto& thrift_server_type = config::thrift_server_type_of_fe; |
278 | 0 | std::transform(thrift_server_type.begin(), thrift_server_type.end(), |
279 | 0 | thrift_server_type.begin(), [](auto c) { return std::toupper(c); }); Unexecuted instantiation: _ZZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ Unexecuted instantiation: _ZZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ Unexecuted instantiation: _ZZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ |
280 | 0 | if (strcmp(typeid(T).name(), "N5doris21FrontendServiceClientE") == 0 && |
281 | 0 | thrift_server_type == "THREADED_SELECTOR") { |
282 | 0 | return ThriftServer::ServerType::NON_BLOCKING; |
283 | 0 | } else { |
284 | 0 | return ThriftServer::ServerType::THREADED; |
285 | 0 | } |
286 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEv Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEv Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEv |
287 | | }; |
288 | | |
289 | | // Doris backend client cache, used by a backend to send requests |
290 | | // to any other backend. |
291 | | using BackendServiceClientCache = ClientCache<BackendServiceClient>; |
292 | | using BackendServiceConnection = ClientConnection<BackendServiceClient>; |
293 | | |
294 | | using FrontendServiceClientCache = ClientCache<FrontendServiceClient>; |
295 | | using FrontendServiceConnection = ClientConnection<FrontendServiceClient>; |
296 | | |
297 | | using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>; |
298 | | using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>; |
299 | | |
300 | | } // namespace doris |