Coverage Report

Created: 2024-11-21 17:33

/root/doris/be/src/runtime/client_cache.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/BackendService.h>     // IWYU pragma: keep
21
#include <gen_cpp/FrontendService.h>    // IWYU pragma: keep
22
#include <gen_cpp/TPaloBrokerService.h> // IWYU pragma: keep
23
#include <gen_cpp/Types_types.h>
24
#include <glog/logging.h>
25
#include <string.h>
26
#include <unistd.h>
27
28
#include <functional>
29
#include <list>
30
#include <memory>
31
#include <mutex>
32
#include <ostream>
33
#include <string>
34
#include <typeinfo>
35
#include <unordered_map>
36
37
#include "common/config.h"
38
#include "common/exception.h"
39
#include "common/status.h"
40
#include "util/hash_util.hpp"
41
#include "util/metrics.h"
42
#include "util/thrift_client.h"
43
#include "util/thrift_server.h"
44
45
namespace doris {
46
// Helper class which implements the majority of the caching
47
// functionality without using templates (i.e. pointers to the
48
// superclass of all ThriftClients and a void* for the key).
49
//
50
// The user of this class only sees RPC proxy classes, but we have
51
// to track the ThriftClient to manipulate the underlying
52
// transport. To do this, we maintain a map from an opaque 'key'
53
// pointer type to the client implementation. We actually know the
54
// type of the pointer (it's the type parameter to ClientCache), but
55
// we deliberately avoid using it so that this entire class doesn't
56
// get inlined every time it gets used.
57
//
58
// This class is thread-safe.
59
//
60
// TODO: shut down clients in the background if they don't get used for a period of time
61
// TODO: in order to reduce locking overhead when getting/releasing clients,
62
// add call to hand back pointer to list stored in ClientCache and add separate lock
63
// to list (or change to lock-free list)
64
// TODO: reduce locking overhead and by adding per-address client caches, each with its
65
// own lock.
66
// TODO: More graceful handling of clients that have failed (maybe better
67
// handled by a smart-wrapper of the interface object).
68
// TODO: limits on total number of clients, and clients per-backend
69
class ClientCacheHelper {
70
public:
71
    ~ClientCacheHelper();
72
    // Callback method which produces a client object when one cannot be
73
    // found in the cache. Supplied by the ClientCache wrapper.
74
    using ClientFactory =
75
            std::function<ThriftClientImpl*(const TNetworkAddress& hostport, void** client_key)>;
76
77
    // Return client for specific host/port in 'client'. If a client
78
    // is not available, the client parameter is set to nullptr.
79
    Status get_client(const TNetworkAddress& hostport, ClientFactory& factory_method,
80
                      void** client_key, int timeout_ms);
81
82
    // Close and delete the underlying transport and remove the client from _client_map.
83
    // Return a new client connecting to the same host/port.
84
    // Return an error status and set client_key to nullptr if a new client cannot
85
    // created.
86
    Status reopen_client(ClientFactory& factory_method, void** client_key, int timeout_ms);
87
88
    // Return a client to the cache, without closing it, and set *client_key to nullptr.
89
    void release_client(void** client_key);
90
91
    std::string debug_string();
92
93
    void init_metrics(const std::string& name);
94
95
private:
96
    template <class T>
97
    friend class ClientCache;
98
    // Private constructor so that only ClientCache can instantiate this class.
99
1
    ClientCacheHelper() : _metrics_enabled(false), _max_cache_size_per_host(-1) {}
100
101
    ClientCacheHelper(int max_cache_size_per_host)
102
0
            : _metrics_enabled(false), _max_cache_size_per_host(max_cache_size_per_host) {}
103
104
    // Protects all member variables
105
    // TODO: have more fine-grained locks or use lock-free data structures,
106
    // this isn't going to scale for a high request rate
107
    std::mutex _lock;
108
109
    // map from (host, port) to list of client keys for that address
110
    using ClientCacheMap = std::unordered_map<TNetworkAddress, std::list<void*>>;
111
    ClientCacheMap _client_cache;
112
113
    // if cache not found, set client_key as nullptr
114
    void _get_client_from_cache(const TNetworkAddress& hostport, void** client_key);
115
116
    // Map from client key back to its associated ThriftClientImpl transport
117
    using ClientMap = std::unordered_map<void*, ThriftClientImpl*>;
118
    ClientMap _client_map;
119
120
    bool _metrics_enabled;
121
122
    // max connections per host in this cache, -1 means unlimited
123
    int _max_cache_size_per_host;
124
125
    std::shared_ptr<MetricEntity> _thrift_client_metric_entity;
126
127
    // Number of clients 'checked-out' from the cache
128
    IntGauge* thrift_used_clients = nullptr;
129
130
    // Total clients in the cache, including those in use
131
    IntGauge* thrift_opened_clients = nullptr;
132
133
    // Create a new client for specific host/port in 'client' and put it in _client_map
134
    Status _create_client(const TNetworkAddress& hostport, ClientFactory& factory_method,
135
                          void** client_key, int timeout_ms);
136
};
137
138
template <class T>
139
class ClientCache;
140
141
// A scoped client connection to help manage clients from a client cache.
142
//
143
// Example:
144
//   {
145
//     DorisInternalServiceConnection client(cache, address, &status);
146
//     try {
147
//       client->TransmitData(...);
148
//     } catch (TTransportException& e) {
149
//       // Retry
150
//       RETURN_IF_ERROR(client.Reopen());
151
//       client->TransmitData(...);
152
//     }
153
//   }
154
// ('client' is released back to cache upon destruction.)
155
template <class T>
156
class ClientConnection {
157
public:
158
    ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, Status* status)
159
0
            : ClientConnection(client_cache, address, 0, status, 3) {}
160
161
    ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& address, int timeout_ms,
162
                     Status* status, int max_retries = 3)
163
0
            : _client_cache(client_cache), _client(nullptr) {
164
0
        int num_retries = 0;
165
0
        do {
166
0
            *status = _client_cache->get_client(address, &_client, timeout_ms);
167
0
            if (status->ok()) {
168
0
                DCHECK(_client != nullptr);
169
0
                break;
170
0
            }
171
0
            DCHECK(_client == nullptr);
172
0
            if (num_retries++ < max_retries) {
173
                // exponential backoff retry with starting delay of 500ms
174
0
                usleep(500000 * (1 << num_retries));
175
0
                LOG(INFO) << "Failed to get client from cache: " << status->to_string()
176
0
                          << ", retrying[" << num_retries << "]...";
177
0
            }
178
0
        } while (num_retries < max_retries);
179
0
    }
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEEC2EPNS_11ClientCacheIS1_EERKNS_15TNetworkAddressEiPNS_6StatusEi
180
181
0
    ~ClientConnection() {
182
0
        if (_client != nullptr) {
183
0
            _client_cache->release_client(&_client);
184
0
        }
185
0
    }
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEED2Ev
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEED2Ev
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEED2Ev
186
187
0
    Status reopen(int timeout_ms) { return _client_cache->reopen_client(&_client, timeout_ms); }
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEi
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_20BackendServiceClientEE6reopenEi
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEi
188
189
0
    Status reopen() { return _client_cache->reopen_client(&_client, 0); }
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_24TPaloBrokerServiceClientEE6reopenEv
Unexecuted instantiation: _ZN5doris16ClientConnectionINS_21FrontendServiceClientEE6reopenEv
190
191
0
    inline bool is_alive() { return _client != nullptr; }
192
193
0
    T* operator->() const {
194
0
        if (_client == nullptr) {
195
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid RPC client!");
196
0
        }
197
0
        return _client;
198
0
    }
Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_21FrontendServiceClientEEptEv
Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_24TPaloBrokerServiceClientEEptEv
Unexecuted instantiation: _ZNK5doris16ClientConnectionINS_20BackendServiceClientEEptEv
199
200
private:
201
    ClientCache<T>* _client_cache = nullptr;
202
    T* _client;
203
};
204
205
// Generic cache of Thrift clients for a given service type.
206
// This class is thread-safe.
207
template <class T>
208
class ClientCache {
209
public:
210
    using Client = ThriftClient<T>;
211
212
1
    ClientCache() {
213
1
        _client_factory =
214
1
                std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this,
215
1
                                             std::placeholders::_1, std::placeholders::_2);
216
1
    }
_ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2Ev
Line
Count
Source
212
1
    ClientCache() {
213
1
        _client_factory =
214
1
                std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this,
215
1
                                             std::placeholders::_1, std::placeholders::_2);
216
1
    }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ev
217
218
0
    ClientCache(int max_cache_size) : _client_cache_helper(max_cache_size) {
219
0
        _client_factory =
220
0
                std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this,
221
0
                                             std::placeholders::_1, std::placeholders::_2);
222
0
    }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEEC2Ei
Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEEC2Ei
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEEC2Ei
223
224
    // Helper method which returns a debug string
225
    std::string debug_string() { return _client_cache_helper.debug_string(); }
226
227
    // Adds metrics for this cache.
228
    // The metrics have an identification by the 'name' argument
229
    // (which should not end in a period).
230
    // Must be called before the cache is used, otherwise the metrics might be wrong
231
0
    void init_metrics(const std::string& name) { _client_cache_helper.init_metrics(name); }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE12init_metricsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
232
233
private:
234
    friend class ClientConnection<T>;
235
236
    // Most operations in this class are thin wrappers around the
237
    // equivalent in ClientCacheHelper, which is a non-templated cache
238
    // to avoid inlining lots of code wherever this cache is used.
239
    ClientCacheHelper _client_cache_helper;
240
241
    // Function pointer, bound to make_client, which produces clients when the cache is empty
242
    using ClientFactory = ClientCacheHelper::ClientFactory;
243
    ClientFactory _client_factory;
244
245
    // Obtains a pointer to a Thrift interface object (of type T),
246
    // backed by a live transport which is already open. Returns
247
    // Status::OK() unless there was an error opening the transport.
248
0
    Status get_client(const TNetworkAddress& hostport, T** iface, int timeout_ms) {
249
0
        return _client_cache_helper.get_client(hostport, _client_factory,
250
0
                                               reinterpret_cast<void**>(iface), timeout_ms);
251
0
    }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i
Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE10get_clientERKNS_15TNetworkAddressEPPS1_i
252
253
    // Close and delete the underlying transport. Return a new client connecting to the
254
    // same host/port.
255
    // Return an error status if a new connection cannot be established and *client will be
256
    // nullptr in that case.
257
0
    Status reopen_client(T** client, int timeout_ms) {
258
0
        return _client_cache_helper.reopen_client(_client_factory, reinterpret_cast<void**>(client),
259
0
                                                  timeout_ms);
260
0
    }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE13reopen_clientEPPS1_i
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE13reopen_clientEPPS1_i
Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE13reopen_clientEPPS1_i
261
262
    // Return the client to the cache and set *client to nullptr.
263
0
    void release_client(T** client) {
264
0
        return _client_cache_helper.release_client(reinterpret_cast<void**>(client));
265
0
    }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE14release_clientEPPS1_
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE14release_clientEPPS1_
Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE14release_clientEPPS1_
266
267
    // Factory method to produce a new ThriftClient<T> for the wrapped cache
268
0
    ThriftClientImpl* make_client(const TNetworkAddress& hostport, void** client_key) {
269
0
        static ThriftServer::ServerType server_type = get_thrift_server_type();
270
0
        Client* client = new Client(hostport.hostname, hostport.port, server_type);
271
0
        *client_key = reinterpret_cast<void*>(client->iface());
272
0
        return client;
273
0
    }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv
Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE11make_clientERKNS_15TNetworkAddressEPPv
274
275
    // since service type is multiple, we should set thrift server type here for be thrift client
276
0
    ThriftServer::ServerType get_thrift_server_type() {
277
0
        auto& thrift_server_type = config::thrift_server_type_of_fe;
278
0
        std::transform(thrift_server_type.begin(), thrift_server_type.end(),
279
0
                       thrift_server_type.begin(), [](auto c) { return std::toupper(c); });
Unexecuted instantiation: _ZZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_
Unexecuted instantiation: _ZZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_
Unexecuted instantiation: _ZZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEvENKUlT_E_clIcEEDaS3_
280
0
        if (strcmp(typeid(T).name(), "N5doris21FrontendServiceClientE") == 0 &&
281
0
            thrift_server_type == "THREADED_SELECTOR") {
282
0
            return ThriftServer::ServerType::NON_BLOCKING;
283
0
        } else {
284
0
            return ThriftServer::ServerType::THREADED;
285
0
        }
286
0
    }
Unexecuted instantiation: _ZN5doris11ClientCacheINS_21FrontendServiceClientEE22get_thrift_server_typeEv
Unexecuted instantiation: _ZN5doris11ClientCacheINS_24TPaloBrokerServiceClientEE22get_thrift_server_typeEv
Unexecuted instantiation: _ZN5doris11ClientCacheINS_20BackendServiceClientEE22get_thrift_server_typeEv
287
};
288
289
// Doris backend client cache, used by a backend to send requests
290
// to any other backend.
291
using BackendServiceClientCache = ClientCache<BackendServiceClient>;
292
using BackendServiceConnection = ClientConnection<BackendServiceClient>;
293
294
using FrontendServiceClientCache = ClientCache<FrontendServiceClient>;
295
using FrontendServiceConnection = ClientConnection<FrontendServiceClient>;
296
297
using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>;
298
using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>;
299
300
} // namespace doris