/root/doris/be/src/runtime/client_cache.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/BackendService.h> // IWYU pragma: keep |
21 | | #include <gen_cpp/FrontendService.h> // IWYU pragma: keep |
22 | | #include <gen_cpp/TPaloBrokerService.h> // IWYU pragma: keep |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <string.h> |
26 | | #include <unistd.h> |
27 | | |
28 | | #include <functional> |
29 | | #include <list> |
30 | | #include <memory> |
31 | | #include <mutex> |
32 | | #include <ostream> |
33 | | #include <string> |
34 | | #include <typeinfo> |
35 | | #include <unordered_map> |
36 | | |
37 | | #include "common/config.h" |
38 | | #include "common/status.h" |
39 | | #include "util/hash_util.hpp" |
40 | | #include "util/metrics.h" |
41 | | #include "util/thrift_client.h" |
42 | | #include "util/thrift_server.h" |
43 | | |
44 | | namespace doris { |
45 | | // Helper class which implements the majority of the caching |
46 | | // functionality without using templates (i.e. pointers to the |
47 | | // superclass of all ThriftClients and a void* for the key). |
48 | | // |
49 | | // The user of this class only sees RPC proxy classes, but we have |
50 | | // to track the ThriftClient to manipulate the underlying |
51 | | // transport. To do this, we maintain a map from an opaque 'key' |
52 | | // pointer type to the client implementation. We actually know the |
53 | | // type of the pointer (it's the type parameter to ClientCache), but |
54 | | // we deliberately avoid using it so that this entire class doesn't |
55 | | // get inlined every time it gets used. |
56 | | // |
57 | | // This class is thread-safe. |
58 | | // |
59 | | // TODO: shut down clients in the background if they don't get used for a period of time |
60 | | // TODO: in order to reduce locking overhead when getting/releasing clients, |
61 | | // add call to hand back pointer to list stored in ClientCache and add separate lock |
62 | | // to list (or change to lock-free list) |
63 | | // TODO: reduce locking overhead and by adding per-address client caches, each with its |
64 | | // own lock. |
65 | | // TODO: More graceful handling of clients that have failed (maybe better |
66 | | // handled by a smart-wrapper of the interface object). |
67 | | // TODO: limits on total number of clients, and clients per-backend |
68 | | class ClientCacheHelper { |
69 | | public: |
70 | | ~ClientCacheHelper(); |
71 | | // Callback method which produces a client object when one cannot be |
72 | | // found in the cache. Supplied by the ClientCache wrapper. |
73 | | using ClientFactory = |
74 | | std::function<ThriftClientImpl*(const TNetworkAddress& hostport, void** client_key)>; |
75 | | |
76 | | // Return client for specific host/port in 'client'. If a client |
77 | | // is not available, the client parameter is set to nullptr. |
78 | | Status get_client(const TNetworkAddress& hostport, ClientFactory& factory_method, |
79 | | void** client_key, int timeout_ms); |
80 | | |
81 | | // Close and delete the underlying transport and remove the client from _client_map. |
82 | | // Return a new client connecting to the same host/port. |
83 | | // Return an error status and set client_key to nullptr if a new client cannot |
84 | | // created. |
85 | | Status reopen_client(ClientFactory& factory_method, void** client_key, int timeout_ms); |
86 | | |
87 | | // Return a client to the cache, without closing it, and set *client_key to nullptr. |
88 | | void release_client(void** client_key); |
89 | | |
90 | | std::string debug_string(); |
91 | | |
92 | | void init_metrics(const std::string& name); |
93 | | |
94 | | private: |
95 | | template <class T> |
96 | | friend class ClientCache; |
97 | | // Private constructor so that only ClientCache can instantiate this class. |
98 | 1 | ClientCacheHelper() : _metrics_enabled(false), _max_cache_size_per_host(-1) {} |
99 | | |
100 | | ClientCacheHelper(int max_cache_size_per_host) |
101 | 0 | : _metrics_enabled(false), _max_cache_size_per_host(max_cache_size_per_host) {} |
102 | | |
103 | | // Protects all member variables |
104 | | // TODO: have more fine-grained locks or use lock-free data structures, |
105 | | // this isn't going to scale for a high request rate |
106 | | std::mutex _lock; |
107 | | |
108 | | // map from (host, port) to list of client keys for that address |
109 | | using ClientCacheMap = std::unordered_map<TNetworkAddress, std::list<void*>>; |
110 | | ClientCacheMap _client_cache; |
111 | | |
112 | | // if cache not found, set client_key as nullptr |
113 | | void _get_client_from_cache(const TNetworkAddress& hostport, void** client_key); |
114 | | |
115 | | // Map from client key back to its associated ThriftClientImpl transport |
116 | | using ClientMap = std::unordered_map<void*, ThriftClientImpl*>; |
117 | | ClientMap _client_map; |
118 | | |
119 | | bool _metrics_enabled; |
120 | | |
121 | | // max connections per host in this cache, -1 means unlimited |
122 | | int _max_cache_size_per_host; |
123 | | |
124 | | std::shared_ptr<MetricEntity> _thrift_client_metric_entity; |
125 | | |
126 | | // Number of clients 'checked-out' from the cache |
127 | | IntGauge* thrift_used_clients; |
128 | | |
129 | | // Total clients in the cache, including those in use |
130 | | IntGauge* thrift_opened_clients; |
131 | | |
132 | | // Create a new client for specific host/port in 'client' and put it in _client_map |
133 | | Status _create_client(const TNetworkAddress& hostport, ClientFactory& factory_method, |
134 | | void** client_key, int timeout_ms); |
135 | | }; |
136 | | |
137 | | template <class T> |
138 | | class ClientCache; |
139 | | |
140 | | // A scoped client connection to help manage clients from a client cache. |
141 | | // |
142 | | // Example: |
143 | | // { |
144 | | // DorisInternalServiceConnection client(cache, address, &status); |
145 | | // try { |
146 | | // client->TransmitData(...); |
147 | | // } catch (TTransportException& e) { |
148 | | // // Retry |
149 | | // RETURN_IF_ERROR(client.Reopen()); |
150 | | // client->TransmitData(...); |
151 | | // } |
152 | | // } |
153 | | // ('client' is released back to cache upon destruction.) |
154 | | template <class T> |
155 | | class ClientConnection { |
156 | | public: |
157 | | ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, Status* status) |
158 | 0 | : ClientConnection(client_cache, address, 0, status, 3) {} |
159 | | |
160 | | ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, int timeout_ms, |
161 | | Status* status, int max_retries = 3) |
162 | 0 | : _client_cache(client_cache), _client(nullptr) { |
163 | 0 | int num_retries = 0; |
164 | 0 | do { |
165 | 0 | *status = _client_cache->get_client(address, &_client, timeout_ms); |
166 | 0 | if (status->ok()) { |
167 | 0 | DCHECK(_client != nullptr); |
168 | 0 | break; |
169 | 0 | } |
170 | 0 | DCHECK(_client == nullptr); |
171 | 0 | if (num_retries++ < max_retries) { |
172 | | // exponential backoff retry with starting delay of 500ms |
173 | 0 | usleep(500000 * (1 << num_retries)); |
174 | 0 | LOG(INFO) << "Failed to get client from cache: " << status->to_string() |
175 | 0 | << ", retrying[" << num_retries << "]..."; |
176 | 0 | } |
177 | 0 | } while (num_retries < max_retries); |
178 | 0 | } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi |
179 | | |
180 | 0 | ~ClientConnection() { |
181 | 0 | if (_client != nullptr) { |
182 | 0 | _client_cache->release_client(&_client); |
183 | 0 | } |
184 | 0 | } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEED2Ev Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEED2Ev Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEED2Ev |
185 | | |
186 | 0 | Status reopen(int timeout_ms) { return _client_cache->reopen_client(&_client, timeout_ms); } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEE6reopenEi Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEi |
187 | | |
188 | 0 | Status reopen() { return _client_cache->reopen_client(&_client, 0); } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEv Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEv |
189 | | |
190 | 0 | inline bool is_alive() { return _client != nullptr; } |
191 | | |
192 | 0 | T* operator->() const { return _client; } Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_21FrontendServiceClientEEptEv Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEptEv Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_20BackendServiceClientEEptEv |
193 | | |
194 | | private: |
195 | | ClientCache<T>* _client_cache; |
196 | | T* _client; |
197 | | }; |
198 | | |
199 | | // Generic cache of Thrift clients for a given service type. |
200 | | // This class is thread-safe. |
201 | | template <class T> |
202 | | class ClientCache { |
203 | | public: |
204 | | using Client = ThriftClient<T>; |
205 | | |
206 | 1 | ClientCache() { |
207 | 1 | _client_factory = |
208 | 1 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, |
209 | 1 | std::placeholders::_1, std::placeholders::_2); |
210 | 1 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ev _ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2Ev Line | Count | Source | 206 | 1 | ClientCache() { | 207 | 1 | _client_factory = | 208 | 1 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, | 209 | 1 | std::placeholders::_1, std::placeholders::_2); | 210 | 1 | } |
|
211 | | |
212 | 0 | ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) { |
213 | 0 | _client_factory = |
214 | 0 | std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, |
215 | 0 | std::placeholders::_1, std::placeholders::_2); |
216 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEEC2Ei Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2Ei Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ei |
217 | | |
218 | | // Helper method which returns a debug string |
219 | | std::string debug_string() { return _client_cache_helper.debug_string(); } |
220 | | |
221 | | // Adds metrics for this cache. |
222 | | // The metrics have an identification by the 'name' argument |
223 | | // (which should not end in a period). |
224 | | // Must be called before the cache is used, otherwise the metrics might be wrong |
225 | 0 | void init_metrics(const std::string& name) { _client_cache_helper.init_metrics(name); } Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
226 | | |
227 | | private: |
228 | | friend class ClientConnection<T>; |
229 | | |
230 | | // Most operations in this class are thin wrappers around the |
231 | | // equivalent in ClientCacheHelper, which is a non-templated cache |
232 | | // to avoid inlining lots of code wherever this cache is used. |
233 | | ClientCacheHelper _client_cache_helper; |
234 | | |
235 | | // Function pointer, bound to make_client, which produces clients when the cache is empty |
236 | | using ClientFactory = ClientCacheHelper::ClientFactory; |
237 | | ClientFactory _client_factory; |
238 | | |
239 | | // Obtains a pointer to a Thrift interface object (of type T), |
240 | | // backed by a live transport which is already open. Returns |
241 | | // Status::OK() unless there was an error opening the transport. |
242 | 0 | Status get_client(const TNetworkAddress& hostport, T** iface, int timeout_ms) { |
243 | 0 | return _client_cache_helper.get_client(hostport, _client_factory, |
244 | 0 | reinterpret_cast<void**>(iface), timeout_ms); |
245 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i |
246 | | |
247 | | // Close and delete the underlying transport. Return a new client connecting to the |
248 | | // same host/port. |
249 | | // Return an error status if a new connection cannot be established and *client will be |
250 | | // nullptr in that case. |
251 | 0 | Status reopen_client(T** client, int timeout_ms) { |
252 | 0 | return _client_cache_helper.reopen_client(_client_factory, reinterpret_cast<void**>(client), |
253 | 0 | timeout_ms); |
254 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE13reopen_clientEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE13reopen_clientEPPS1_i Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE13reopen_clientEPPS1_i |
255 | | |
256 | | // Return the client to the cache and set *client to nullptr. |
257 | 0 | void release_client(T** client) { |
258 | 0 | return _client_cache_helper.release_client(reinterpret_cast<void**>(client)); |
259 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE14release_clientEPPS1_ Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE14release_clientEPPS1_ Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE14release_clientEPPS1_ |
260 | | |
261 | | // Factory method to produce a new ThriftClient<T> for the wrapped cache |
262 | 0 | ThriftClientImpl* make_client(const TNetworkAddress& hostport, void** client_key) { |
263 | 0 | static ThriftServer::ServerType server_type = get_thrift_server_type(); |
264 | 0 | Client* client = new Client(hostport.hostname, hostport.port, server_type); |
265 | 0 | *client_key = reinterpret_cast<void*>(client->iface()); |
266 | 0 | return client; |
267 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv |
268 | | |
269 | | // since service type is multiple, we should set thrift server type here for be thrift client |
270 | 0 | ThriftServer::ServerType get_thrift_server_type() { |
271 | 0 | auto& thrift_server_type = config::thrift_server_type_of_fe; |
272 | 0 | std::transform(thrift_server_type.begin(), thrift_server_type.end(), |
273 | 0 | thrift_server_type.begin(), [](auto c) { return std::toupper(c); }); Unexecuted instantiation: _ZZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ Unexecuted instantiation: _ZZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ Unexecuted instantiation: _ZZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ |
274 | 0 | if (strcmp(typeid(T).name(), "N5doris21FrontendServiceClientE") == 0 && |
275 | 0 | thrift_server_type == "THREADED_SELECTOR") { |
276 | 0 | return ThriftServer::ServerType::NON_BLOCKING; |
277 | 0 | } else { |
278 | 0 | return ThriftServer::ServerType::THREADED; |
279 | 0 | } |
280 | 0 | } Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEv Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEv Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEv |
281 | | }; |
282 | | |
283 | | // Doris backend client cache, used by a backend to send requests |
284 | | // to any other backend. |
285 | | using BackendServiceClientCache = ClientCache<BackendServiceClient>; |
286 | | using BackendServiceConnection = ClientConnection<BackendServiceClient>; |
287 | | |
288 | | using FrontendServiceClientCache = ClientCache<FrontendServiceClient>; |
289 | | using FrontendServiceConnection = ClientConnection<FrontendServiceClient>; |
290 | | |
291 | | using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>; |
292 | | using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>; |
293 | | |
294 | | } // namespace doris |