/root/doris/be/src/runtime/client_cache.h
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #pragma once | 
| 19 |  |  | 
| 20 |  | #include <gen_cpp/BackendService.h>     // IWYU pragma: keep | 
| 21 |  | #include <gen_cpp/FrontendService.h>    // IWYU pragma: keep | 
| 22 |  | #include <gen_cpp/TPaloBrokerService.h> // IWYU pragma: keep | 
| 23 |  | #include <gen_cpp/Types_types.h> | 
| 24 |  | #include <glog/logging.h> | 
| 25 |  | #include <string.h> | 
| 26 |  | #include <unistd.h> | 
| 27 |  |  | 
| 28 |  | #include <functional> | 
| 29 |  | #include <list> | 
| 30 |  | #include <memory> | 
| 31 |  | #include <mutex> | 
| 32 |  | #include <ostream> | 
| 33 |  | #include <string> | 
| 34 |  | #include <typeinfo> | 
| 35 |  | #include <unordered_map> | 
| 36 |  |  | 
| 37 |  | #include "common/config.h" | 
| 38 |  | #include "common/exception.h" | 
| 39 |  | #include "common/status.h" | 
| 40 |  | #include "util/hash_util.hpp" | 
| 41 |  | #include "util/metrics.h" | 
| 42 |  | #include "util/thrift_client.h" | 
| 43 |  | #include "util/thrift_server.h" | 
| 44 |  |  | 
| 45 |  | namespace doris { | 
| 46 |  | // Helper class which implements the majority of the caching | 
| 47 |  | // functionality without using templates (i.e. pointers to the | 
| 48 |  | // superclass of all ThriftClients and a void* for the key). | 
| 49 |  | // | 
| 50 |  | // The user of this class only sees RPC proxy classes, but we have | 
| 51 |  | // to track the ThriftClient to manipulate the underlying | 
| 52 |  | // transport. To do this, we maintain a map from an opaque 'key' | 
| 53 |  | // pointer type to the client implementation. We actually know the | 
| 54 |  | // type of the pointer (it's the type parameter to ClientCache), but | 
| 55 |  | // we deliberately avoid using it so that this entire class doesn't | 
| 56 |  | // get inlined every time it gets used. | 
| 57 |  | // | 
| 58 |  | // This class is thread-safe. | 
| 59 |  | // | 
| 60 |  | // TODO: shut down clients in the background if they don't get used for a period of time | 
| 61 |  | // TODO: in order to reduce locking overhead when getting/releasing clients, | 
| 62 |  | // add call to hand back pointer to list stored in ClientCache and add separate lock | 
| 63 |  | // to list (or change to lock-free list) | 
| 64 |  | // TODO: reduce locking overhead and by adding per-address client caches, each with its | 
| 65 |  | // own lock. | 
| 66 |  | // TODO: More graceful handling of clients that have failed (maybe better | 
| 67 |  | // handled by a smart-wrapper of the interface object). | 
| 68 |  | // TODO: limits on total number of clients, and clients per-backend | 
| 69 |  | class ClientCacheHelper { | 
| 70 |  | public: | 
| 71 |  |     ~ClientCacheHelper(); | 
| 72 |  |     // Callback method which produces a client object when one cannot be | 
| 73 |  |     // found in the cache. Supplied by the ClientCache wrapper. | 
| 74 |  |     using ClientFactory = | 
| 75 |  |             std::function<ThriftClientImpl*(const TNetworkAddress& hostport, void** client_key)>; | 
| 76 |  |  | 
| 77 |  |     // Return client for specific host/port in 'client'. If a client | 
| 78 |  |     // is not available, the client parameter is set to nullptr. | 
| 79 |  |     Status get_client(const TNetworkAddress& hostport, ClientFactory& factory_method, | 
| 80 |  |                       void** client_key, int timeout_ms); | 
| 81 |  |  | 
| 82 |  |     // Close and delete the underlying transport and remove the client from _client_map. | 
| 83 |  |     // Return a new client connecting to the same host/port. | 
| 84 |  |     // Return an error status and set client_key to nullptr if a new client cannot | 
| 85 |  |     // created. | 
| 86 |  |     Status reopen_client(ClientFactory& factory_method, void** client_key, int timeout_ms); | 
| 87 |  |  | 
| 88 |  |     // Return a client to the cache, without closing it, and set *client_key to nullptr. | 
| 89 |  |     void release_client(void** client_key); | 
| 90 |  |  | 
| 91 |  |     std::string debug_string(); | 
| 92 |  |  | 
| 93 |  |     void init_metrics(const std::string& name); | 
| 94 |  |  | 
| 95 |  | private: | 
| 96 |  |     template <class T> | 
| 97 |  |     friend class ClientCache; | 
| 98 |  |     // Private constructor so that only ClientCache can instantiate this class. | 
| 99 | 0 |     ClientCacheHelper() : _metrics_enabled(false), _max_cache_size_per_host(-1) {} | 
| 100 |  |  | 
| 101 |  |     ClientCacheHelper(int max_cache_size_per_host) | 
| 102 | 0 |             : _metrics_enabled(false), _max_cache_size_per_host(max_cache_size_per_host) {} | 
| 103 |  |  | 
| 104 |  |     // Protects all member variables | 
| 105 |  |     // TODO: have more fine-grained locks or use lock-free data structures, | 
| 106 |  |     // this isn't going to scale for a high request rate | 
| 107 |  |     std::mutex _lock; | 
| 108 |  |  | 
| 109 |  |     // map from (host, port) to list of client keys for that address | 
| 110 |  |     using ClientCacheMap = std::unordered_map<TNetworkAddress, std::list<void*>>; | 
| 111 |  |     ClientCacheMap _client_cache; | 
| 112 |  |  | 
| 113 |  |     // if cache not found, set client_key as nullptr | 
| 114 |  |     void _get_client_from_cache(const TNetworkAddress& hostport, void** client_key); | 
| 115 |  |  | 
| 116 |  |     // Map from client key back to its associated ThriftClientImpl transport | 
| 117 |  |     using ClientMap = std::unordered_map<void*, ThriftClientImpl*>; | 
| 118 |  |     ClientMap _client_map; | 
| 119 |  |  | 
| 120 |  |     bool _metrics_enabled; | 
| 121 |  |  | 
| 122 |  |     // max connections per host in this cache, -1 means unlimited | 
| 123 |  |     int _max_cache_size_per_host; | 
| 124 |  |  | 
| 125 |  |     std::shared_ptr<MetricEntity> _thrift_client_metric_entity; | 
| 126 |  |  | 
| 127 |  |     // Number of clients 'checked-out' from the cache | 
| 128 |  |     IntGauge* thrift_used_clients = nullptr; | 
| 129 |  |  | 
| 130 |  |     // Total clients in the cache, including those in use | 
| 131 |  |     IntGauge* thrift_opened_clients = nullptr; | 
| 132 |  |  | 
| 133 |  |     // Create a new client for specific host/port in 'client' and put it in _client_map | 
| 134 |  |     Status _create_client(const TNetworkAddress& hostport, ClientFactory& factory_method, | 
| 135 |  |                           void** client_key, int timeout_ms); | 
| 136 |  | }; | 
| 137 |  |  | 
| 138 |  | template <class T> | 
| 139 |  | class ClientCache; | 
| 140 |  |  | 
| 141 |  | // A scoped client connection to help manage clients from a client cache. | 
| 142 |  | // | 
| 143 |  | // Example: | 
| 144 |  | //   { | 
| 145 |  | //     DorisInternalServiceConnection client(cache, address, &status); | 
| 146 |  | //     try { | 
| 147 |  | //       client->TransmitData(...); | 
| 148 |  | //     } catch (TTransportException& e) { | 
| 149 |  | //       // Retry | 
| 150 |  | //       RETURN_IF_ERROR(client.Reopen()); | 
| 151 |  | //       client->TransmitData(...); | 
| 152 |  | //     } | 
| 153 |  | //   } | 
| 154 |  | // ('client' is released back to cache upon destruction.) | 
| 155 |  | template <class T> | 
| 156 |  | class ClientConnection { | 
| 157 |  | public: | 
| 158 |  |     ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, Status* status) | 
| 159 | 0 |             : ClientConnection(client_cache, address, 0, status, 3) {} | 
| 160 |  |  | 
| 161 |  |     ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, int timeout_ms, | 
| 162 |  |                      Status* status, int max_retries = 3) | 
| 163 | 0 |             : _client_cache(client_cache), _client(nullptr) { | 
| 164 | 0 |         int num_retries = 0; | 
| 165 | 0 |         do { | 
| 166 | 0 |             *status = _client_cache->get_client(address, &_client, timeout_ms); | 
| 167 | 0 |             if (status->ok()) {  Branch (167:17): [True: 0, False: 0]
  Branch (167:17): [True: 0, False: 0]
  Branch (167:17): [True: 0, False: 0]
 | 
| 168 | 0 |                 DCHECK(_client != nullptr); | 
| 169 | 0 |                 break; | 
| 170 | 0 |             } | 
| 171 | 0 |             DCHECK(_client == nullptr); | 
| 172 | 0 |             if (num_retries++ < max_retries) {  Branch (172:17): [True: 0, False: 0]
  Branch (172:17): [True: 0, False: 0]
  Branch (172:17): [True: 0, False: 0]
 | 
| 173 |  |                 // exponential backoff retry with starting delay of 500ms | 
| 174 | 0 |                 usleep(500000 * (1 << num_retries)); | 
| 175 | 0 |                 LOG(INFO) << "Failed to get client from cache: " << status->to_string() | 
| 176 | 0 |                           << ", retrying[" << num_retries << "]..."; | 
| 177 | 0 |             } | 
| 178 | 0 |         } while (num_retries < max_retries);   Branch (178:18): [True: 0, False: 0]
  Branch (178:18): [True: 0, False: 0]
  Branch (178:18): [True: 0, False: 0]
 | 
| 179 | 0 |     } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEiUnexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEiUnexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi | 
| 180 |  |  | 
| 181 | 0 |     ~ClientConnection() { | 
| 182 | 0 |         if (_client != nullptr) {  Branch (182:13): [True: 0, False: 0]
  Branch (182:13): [True: 0, False: 0]
  Branch (182:13): [True: 0, False: 0]
 | 
| 183 | 0 |             _client_cache->release_client(&_client); | 
| 184 | 0 |         } | 
| 185 | 0 |     } Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEED2EvUnexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEED2EvUnexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEED2Ev | 
| 186 |  |  | 
| 187 | 0 |     Status reopen(int timeout_ms) { return _client_cache->reopen_client(&_client, timeout_ms); }Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEiUnexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEE6reopenEiUnexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEi | 
| 188 |  |  | 
| 189 | 0 |     Status reopen() { return _client_cache->reopen_client(&_client, 0); }Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEvUnexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEv | 
| 190 |  |  | 
| 191 | 0 |     inline bool is_alive() { return _client != nullptr; } | 
| 192 |  |  | 
| 193 | 0 |     T* operator->() const { | 
| 194 | 0 |         if (_client == nullptr) {  Branch (194:13): [True: 0, False: 0]
  Branch (194:13): [True: 0, False: 0]
  Branch (194:13): [True: 0, False: 0]
 | 
| 195 | 0 |             throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid RPC client!"); | 
| 196 | 0 |         } | 
| 197 | 0 |         return _client; | 
| 198 | 0 |     } Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_21FrontendServiceClientEEptEvUnexecuted instantiation: _ZNK5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEptEvUnexecuted instantiation: _ZNK5doris16ClientConnectionINS_20BackendServiceClientEEptEv | 
| 199 |  |  | 
| 200 |  | private: | 
| 201 |  |     ClientCache<T>* _client_cache = nullptr; | 
| 202 |  |     T* _client; | 
| 203 |  | }; | 
| 204 |  |  | 
| 205 |  | // Generic cache of Thrift clients for a given service type. | 
| 206 |  | // This class is thread-safe. | 
| 207 |  | template <class T> | 
| 208 |  | class ClientCache { | 
| 209 |  | public: | 
| 210 |  |     using Client = ThriftClient<T>; | 
| 211 |  |  | 
| 212 | 0 |     ClientCache() { | 
| 213 | 0 |         _client_factory = | 
| 214 | 0 |                 std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, | 
| 215 | 0 |                                              std::placeholders::_1, std::placeholders::_2); | 
| 216 | 0 |     } | 
| 217 |  |  | 
| 218 | 0 |     ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) { | 
| 219 | 0 |         _client_factory = | 
| 220 | 0 |                 std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this, | 
| 221 | 0 |                                              std::placeholders::_1, std::placeholders::_2); | 
| 222 | 0 |     } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2EiUnexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEEC2EiUnexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ei | 
| 223 |  |  | 
| 224 |  |     // Helper method which returns a debug string | 
| 225 |  |     std::string debug_string() { return _client_cache_helper.debug_string(); } | 
| 226 |  |  | 
| 227 |  |     // Adds metrics for this cache. | 
| 228 |  |     // The metrics have an identification by the 'name' argument | 
| 229 |  |     // (which should not end in a period). | 
| 230 |  |     // Must be called before the cache is used, otherwise the metrics might be wrong | 
| 231 | 0 |     void init_metrics(const std::string& name) { _client_cache_helper.init_metrics(name); }Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEUnexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEUnexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE | 
| 232 |  |  | 
| 233 |  | private: | 
| 234 |  |     friend class ClientConnection<T>; | 
| 235 |  |  | 
| 236 |  |     // Most operations in this class are thin wrappers around the | 
| 237 |  |     // equivalent in ClientCacheHelper, which is a non-templated cache | 
| 238 |  |     // to avoid inlining lots of code wherever this cache is used. | 
| 239 |  |     ClientCacheHelper _client_cache_helper; | 
| 240 |  |  | 
| 241 |  |     // Function pointer, bound to make_client, which produces clients when the cache is empty | 
| 242 |  |     using ClientFactory = ClientCacheHelper::ClientFactory; | 
| 243 |  |     ClientFactory _client_factory; | 
| 244 |  |  | 
| 245 |  |     // Obtains a pointer to a Thrift interface object (of type T), | 
| 246 |  |     // backed by a live transport which is already open. Returns | 
| 247 |  |     // Status::OK() unless there was an error opening the transport. | 
| 248 | 0 |     Status get_client(const TNetworkAddress& hostport, T** iface, int timeout_ms) { | 
| 249 | 0 |         return _client_cache_helper.get_client(hostport, _client_factory, | 
| 250 | 0 |                                                reinterpret_cast<void**>(iface), timeout_ms); | 
| 251 | 0 |     } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_iUnexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_iUnexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i | 
| 252 |  |  | 
| 253 |  |     // Close and delete the underlying transport. Return a new client connecting to the | 
| 254 |  |     // same host/port. | 
| 255 |  |     // Return an error status if a new connection cannot be established and *client will be | 
| 256 |  |     // nullptr in that case. | 
| 257 | 0 |     Status reopen_client(T** client, int timeout_ms) { | 
| 258 | 0 |         return _client_cache_helper.reopen_client(_client_factory, reinterpret_cast<void**>(client), | 
| 259 | 0 |                                                   timeout_ms); | 
| 260 | 0 |     } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE13reopen_clientEPPS1_iUnexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE13reopen_clientEPPS1_iUnexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE13reopen_clientEPPS1_i | 
| 261 |  |  | 
| 262 |  |     // Return the client to the cache and set *client to nullptr. | 
| 263 | 0 |     void release_client(T** client) { | 
| 264 | 0 |         return _client_cache_helper.release_client(reinterpret_cast<void**>(client)); | 
| 265 | 0 |     } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE14release_clientEPPS1_Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE14release_clientEPPS1_Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE14release_clientEPPS1_ | 
| 266 |  |  | 
| 267 |  |     // Factory method to produce a new ThriftClient<T> for the wrapped cache | 
| 268 | 0 |     ThriftClientImpl* make_client(const TNetworkAddress& hostport, void** client_key) { | 
| 269 | 0 |         static ThriftServer::ServerType server_type = get_thrift_server_type(); | 
| 270 | 0 |         Client* client = new Client(hostport.hostname, hostport.port, server_type); | 
| 271 | 0 |         *client_key = reinterpret_cast<void*>(client->iface()); | 
| 272 | 0 |         return client; | 
| 273 | 0 |     } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPvUnexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE11make_clientERKNS_15TNetworkAddressEPPvUnexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv | 
| 274 |  |  | 
| 275 |  |     // since service type is multiple, we should set thrift server type here for be thrift client | 
| 276 | 0 |     ThriftServer::ServerType get_thrift_server_type() { | 
| 277 | 0 |         auto& thrift_server_type = config::thrift_server_type_of_fe; | 
| 278 | 0 |         std::transform(thrift_server_type.begin(), thrift_server_type.end(), | 
| 279 | 0 |                        thrift_server_type.begin(), [](auto c) { return std::toupper(c); });Unexecuted instantiation: _ZZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_Unexecuted instantiation: _ZZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_Unexecuted instantiation: _ZZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_ | 
| 280 | 0 |         if (strcmp(typeid(T).name(), "N5doris21FrontendServiceClientE") == 0 &&   Branch (280:13): [True: 0, False: 0]
  Branch (280:13): [True: 0, False: 0]
  Branch (280:13): [True: 0, False: 0]
 | 
| 281 | 0 |             thrift_server_type == "THREADED_SELECTOR") {  Branch (281:13): [True: 0, False: 0]
  Branch (281:13): [True: 0, False: 0]
  Branch (281:13): [True: 0, False: 0]
 | 
| 282 | 0 |             return ThriftServer::ServerType::NON_BLOCKING; | 
| 283 | 0 |         } else { | 
| 284 | 0 |             return ThriftServer::ServerType::THREADED; | 
| 285 | 0 |         } | 
| 286 | 0 |     } Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEvUnexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEvUnexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEv | 
| 287 |  | }; | 
| 288 |  |  | 
| 289 |  | // Doris backend client cache, used by a backend to send requests | 
| 290 |  | // to any other backend. | 
| 291 |  | using BackendServiceClientCache = ClientCache<BackendServiceClient>; | 
| 292 |  | using BackendServiceConnection = ClientConnection<BackendServiceClient>; | 
| 293 |  |  | 
| 294 |  | using FrontendServiceClientCache = ClientCache<FrontendServiceClient>; | 
| 295 |  | using FrontendServiceConnection = ClientConnection<FrontendServiceClient>; | 
| 296 |  |  | 
| 297 |  | using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>; | 
| 298 |  | using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>; | 
| 299 |  |  | 
| 300 |  | } // namespace doris |