Coverage Report

Created: 2026-01-27 18:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/runtime/client_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/client_cache.h"
19
20
#include <memory>
21
#include <utility>
22
23
#include "common/logging.h"
24
#include "runtime/exec_env.h"
25
#include "util/dns_cache.h"
26
#include "util/doris_metrics.h"
27
#include "util/network_util.h"
28
29
namespace doris {
30
31
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_used_clients, MetricUnit::NOUNIT,
32
                                   "Number of clients 'checked-out' from the cache");
33
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_opened_clients, MetricUnit::NOUNIT,
34
                                   "Total clients in the cache, including those in use");
35
36
0
ClientCacheHelper::~ClientCacheHelper() {
37
0
    for (auto& it : _client_map) {
38
0
        delete it.second;
39
0
    }
40
0
}
41
42
void ClientCacheHelper::_get_client_from_cache(const TNetworkAddress& hostport,
43
0
                                               const std::string& resolved_ip, void** client_key) {
44
0
    *client_key = nullptr;
45
0
    std::lock_guard<std::mutex> lock(_lock);
46
    //VLOG_RPC << "get_client(" << hostport << ")";
47
0
    auto cache_entry = _client_cache.find(hostport);
48
49
0
    if (cache_entry == _client_cache.end()) {
50
0
        cache_entry =
51
0
                _client_cache.insert(std::make_pair(hostport, std::list<CachedClient>())).first;
52
0
        DCHECK(cache_entry != _client_cache.end());
53
0
    }
54
55
0
    std::list<CachedClient>& info_list = cache_entry->second;
56
    // Find a cached client with matching resolved IP
57
0
    for (auto it = info_list.begin(); it != info_list.end(); ++it) {
58
0
        if (it->resolved_ip == resolved_ip) {
59
0
            *client_key = it->client_key;
60
0
            VLOG_RPC << "get_client(): cached client for " << hostport << " with ip "
61
0
                     << resolved_ip;
62
0
            info_list.erase(it);
63
0
            return;
64
0
        }
65
0
    }
66
67
    // No matching client found. Clear all cached clients with stale IPs for this hostport.
68
    // These clients were created with old resolved IPs and should be closed.
69
0
    for (auto& cached_client : info_list) {
70
0
        auto client_map_entry = _client_map.find(cached_client.client_key);
71
0
        if (client_map_entry != _client_map.end()) {
72
0
            ThriftClientImpl* client_to_close = client_map_entry->second;
73
0
            client_to_close->close();
74
0
            delete client_to_close;
75
0
            _client_map.erase(client_map_entry);
76
0
            _client_hostport_map.erase(cached_client.client_key);
77
0
            if (_metrics_enabled) {
78
0
                thrift_opened_clients->increment(-1);
79
0
            }
80
0
        }
81
0
    }
82
0
    info_list.clear();
83
0
}
84
85
Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, ClientFactory& factory_method,
86
0
                                     void** client_key, int timeout_ms) {
87
    // Resolve hostname to IP address via DNS cache.
88
    // If the hostname is already an IP address, DNS cache will return it directly.
89
0
    std::string resolved_ip;
90
0
    Status dns_status = ExecEnv::GetInstance()->dns_cache()->get(hostport.hostname, &resolved_ip);
91
0
    if (!dns_status.ok() || resolved_ip.empty()) {
92
0
        return Status::InternalError("Failed to resolve hostname {} to IP address: {}",
93
0
                                     hostport.hostname, dns_status.to_string());
94
0
    }
95
96
    // Try to get a cached client with matching resolved IP
97
0
    _get_client_from_cache(hostport, resolved_ip, client_key);
98
0
    if (*client_key == nullptr) {
99
        // No cached client with matching IP, create a new one using the resolved IP
100
0
        RETURN_IF_ERROR(
101
0
                _create_client(hostport, resolved_ip, factory_method, client_key, timeout_ms));
102
0
    }
103
104
0
    if (_metrics_enabled) {
105
0
        thrift_used_clients->increment(1);
106
0
    }
107
108
0
    return Status::OK();
109
0
}
110
111
Status ClientCacheHelper::reopen_client(ClientFactory& factory_method, void** client_key,
112
0
                                        int timeout_ms) {
113
0
    DCHECK(*client_key != nullptr) << "Trying to reopen nullptr client";
114
0
    ThriftClientImpl* client_to_close = nullptr;
115
0
    TNetworkAddress hostport;
116
0
    {
117
0
        std::lock_guard<std::mutex> lock(_lock);
118
0
        auto client_map_entry = _client_map.find(*client_key);
119
0
        DCHECK(client_map_entry != _client_map.end());
120
0
        client_to_close = client_map_entry->second;
121
122
        // Get the original hostport (with hostname) for this client
123
0
        auto hostport_entry = _client_hostport_map.find(*client_key);
124
0
        DCHECK(hostport_entry != _client_hostport_map.end());
125
0
        hostport = hostport_entry->second;
126
0
    }
127
128
0
    client_to_close->close();
129
130
    // TODO: Thrift TBufferedTransport cannot be re-opened after Close() because it does
131
    // not clean up internal buffers it reopens. To work around this issue, create a new
132
    // client instead.
133
0
    {
134
0
        std::lock_guard<std::mutex> lock(_lock);
135
0
        _client_map.erase(*client_key);
136
0
        _client_hostport_map.erase(*client_key);
137
0
    }
138
0
    delete client_to_close;
139
0
    *client_key = nullptr;
140
141
0
    if (_metrics_enabled) {
142
0
        thrift_opened_clients->increment(-1);
143
0
    }
144
145
    // Re-resolve hostname to IP address
146
0
    std::string resolved_ip;
147
0
    Status dns_status = ExecEnv::GetInstance()->dns_cache()->get(hostport.hostname, &resolved_ip);
148
0
    if (!dns_status.ok() || resolved_ip.empty()) {
149
0
        return Status::InternalError("Failed to resolve hostname {} to IP address: {}",
150
0
                                     hostport.hostname, dns_status.to_string());
151
0
    }
152
153
0
    RETURN_IF_ERROR(_create_client(hostport, resolved_ip, factory_method, client_key, timeout_ms));
154
155
0
    return Status::OK();
156
0
}
157
158
Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
159
                                         const std::string& resolved_ip,
160
                                         ClientFactory& factory_method, void** client_key,
161
0
                                         int timeout_ms) {
162
    // Create a new TNetworkAddress with the resolved IP instead of hostname.
163
    // This ensures that all client connections are made using IP addresses.
164
0
    TNetworkAddress addr_with_ip;
165
0
    addr_with_ip.hostname = resolved_ip;
166
0
    addr_with_ip.port = hostport.port;
167
168
0
    std::unique_ptr<ThriftClientImpl> client_impl(factory_method(addr_with_ip, client_key));
169
    //VLOG_CONNECTION << "create_client(): adding new client for "
170
    //                << client_impl->ipaddress() << ":" << client_impl->port();
171
172
0
    client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000);
173
174
0
    Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100);
175
176
0
    if (!status.ok()) {
177
0
        *client_key = nullptr;
178
0
        return status;
179
0
    }
180
181
0
    DCHECK(*client_key != nullptr);
182
    // In thrift, timeout == 0, means wait infinitely, so that it should not happen.
183
    // See https://github.com/apache/thrift/blob/master/lib/cpp/src/thrift/transport/TSocket.cpp.
184
    // There is some code like this:      int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
185
    // If the developer missed to set the timeout, we should use default timeout, not infinitely.
186
    // See https://linux.die.net/man/2/poll. Specifying a negative value in timeout means an infinite timeout.
187
0
    client_impl->set_send_timeout(timeout_ms == 0 ? config::thrift_rpc_timeout_ms : timeout_ms);
188
0
    client_impl->set_recv_timeout(timeout_ms == 0 ? config::thrift_rpc_timeout_ms : timeout_ms);
189
190
0
    {
191
0
        std::lock_guard<std::mutex> lock(_lock);
192
        // Because the client starts life 'checked out', we don't add it to the cache map
193
0
        DCHECK(_client_map.count(*client_key) == 0);
194
0
        _client_map[*client_key] = client_impl.release();
195
        // Store the original hostport (with hostname) for this client
196
0
        _client_hostport_map[*client_key] = hostport;
197
0
    }
198
199
0
    if (_metrics_enabled) {
200
0
        thrift_opened_clients->increment(1);
201
0
    }
202
203
0
    return Status::OK();
204
0
}
205
206
0
void ClientCacheHelper::release_client(void** client_key) {
207
0
    DCHECK(*client_key != nullptr) << "Trying to release nullptr client";
208
0
    ThriftClientImpl* client_to_close = nullptr;
209
0
    {
210
0
        std::lock_guard<std::mutex> lock(_lock);
211
0
        auto client_map_entry = _client_map.find(*client_key);
212
0
        DCHECK(client_map_entry != _client_map.end());
213
0
        ThriftClientImpl* client = client_map_entry->second;
214
215
        // Get the original hostport (with hostname) for this client
216
0
        auto hostport_entry = _client_hostport_map.find(*client_key);
217
0
        DCHECK(hostport_entry != _client_hostport_map.end());
218
0
        const TNetworkAddress& hostport = hostport_entry->second;
219
220
0
        auto cache_list = _client_cache.find(hostport);
221
0
        DCHECK(cache_list != _client_cache.end());
222
0
        if (_max_cache_size_per_host >= 0 &&
223
0
            cache_list->second.size() >= _max_cache_size_per_host) {
224
            // cache of this host is full, close this client connection and remove from maps
225
0
            _client_map.erase(*client_key);
226
0
            _client_hostport_map.erase(*client_key);
227
0
            client_to_close = client;
228
0
        } else {
229
            // Store the client with its resolved IP address
230
0
            CachedClient cached_client;
231
0
            cached_client.client_key = *client_key;
232
0
            cached_client.resolved_ip = client->ipaddress();
233
0
            cache_list->second.push_back(cached_client);
234
            // There is no need to close client if we put it to cache list.
235
0
            client_to_close = nullptr;
236
0
        }
237
0
    }
238
239
0
    if (client_to_close != nullptr) {
240
0
        client_to_close->close();
241
0
        delete client_to_close;
242
0
        if (_metrics_enabled) {
243
0
            thrift_opened_clients->increment(-1);
244
0
        }
245
0
    }
246
247
0
    if (_metrics_enabled) {
248
0
        thrift_used_clients->increment(-1);
249
0
    }
250
251
0
    *client_key = nullptr;
252
0
}
253
254
0
std::string ClientCacheHelper::debug_string() {
255
0
    std::stringstream out;
256
0
    out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " [";
257
258
0
    bool isfirst = true;
259
0
    for (const auto& [endpoint, client_keys] : _client_cache) {
260
0
        if (!isfirst) {
261
0
            out << " ";
262
0
            isfirst = false;
263
0
        }
264
0
        out << endpoint << ":" << client_keys.size();
265
0
    }
266
267
0
    out << "])";
268
0
    return out.str();
269
0
}
270
271
0
void ClientCacheHelper::init_metrics(const std::string& name) {
272
    // Not strictly needed if init_metrics is called before any cache
273
    // usage, but ensures that _metrics_enabled is published.
274
0
    std::lock_guard<std::mutex> lock(_lock);
275
276
0
    _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
277
0
            std::string("thrift_client.") + name, {{"name", name}});
278
0
    INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients);
279
0
    INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients);
280
281
0
    _metrics_enabled = true;
282
0
}
283
284
} // namespace doris