/root/doris/be/src/runtime/client_cache.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/client_cache.h" |
19 | | |
20 | | #include <memory> |
21 | | #include <utility> |
22 | | |
23 | | #include "common/logging.h" |
24 | | #include "runtime/exec_env.h" |
25 | | #include "util/dns_cache.h" |
26 | | #include "util/doris_metrics.h" |
27 | | #include "util/network_util.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | | DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_used_clients, MetricUnit::NOUNIT, |
32 | | "Number of clients 'checked-out' from the cache"); |
33 | | DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_opened_clients, MetricUnit::NOUNIT, |
34 | | "Total clients in the cache, including those in use"); |
35 | | |
36 | 0 | ClientCacheHelper::~ClientCacheHelper() { |
37 | 0 | for (auto& it : _client_map) { |
38 | 0 | delete it.second; |
39 | 0 | } |
40 | 0 | } |
41 | | |
42 | | void ClientCacheHelper::_get_client_from_cache(const TNetworkAddress& hostport, |
43 | 0 | const std::string& resolved_ip, void** client_key) { |
44 | 0 | *client_key = nullptr; |
45 | 0 | std::lock_guard<std::mutex> lock(_lock); |
46 | | //VLOG_RPC << "get_client(" << hostport << ")"; |
47 | 0 | auto cache_entry = _client_cache.find(hostport); |
48 | |
|
49 | 0 | if (cache_entry == _client_cache.end()) { |
50 | 0 | cache_entry = |
51 | 0 | _client_cache.insert(std::make_pair(hostport, std::list<CachedClient>())).first; |
52 | 0 | DCHECK(cache_entry != _client_cache.end()); |
53 | 0 | } |
54 | |
|
55 | 0 | std::list<CachedClient>& info_list = cache_entry->second; |
56 | | // Find a cached client with matching resolved IP |
57 | 0 | for (auto it = info_list.begin(); it != info_list.end(); ++it) { |
58 | 0 | if (it->resolved_ip == resolved_ip) { |
59 | 0 | *client_key = it->client_key; |
60 | 0 | VLOG_RPC << "get_client(): cached client for " << hostport << " with ip " |
61 | 0 | << resolved_ip; |
62 | 0 | info_list.erase(it); |
63 | 0 | return; |
64 | 0 | } |
65 | 0 | } |
66 | | |
67 | | // No matching client found. Clear all cached clients with stale IPs for this hostport. |
68 | | // These clients were created with old resolved IPs and should be closed. |
69 | 0 | for (auto& cached_client : info_list) { |
70 | 0 | auto client_map_entry = _client_map.find(cached_client.client_key); |
71 | 0 | if (client_map_entry != _client_map.end()) { |
72 | 0 | ThriftClientImpl* client_to_close = client_map_entry->second; |
73 | 0 | client_to_close->close(); |
74 | 0 | delete client_to_close; |
75 | 0 | _client_map.erase(client_map_entry); |
76 | 0 | _client_hostport_map.erase(cached_client.client_key); |
77 | 0 | if (_metrics_enabled) { |
78 | 0 | thrift_opened_clients->increment(-1); |
79 | 0 | } |
80 | 0 | } |
81 | 0 | } |
82 | 0 | info_list.clear(); |
83 | 0 | } |
84 | | |
85 | | Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, ClientFactory& factory_method, |
86 | 0 | void** client_key, int timeout_ms) { |
87 | | // Resolve hostname to IP address via DNS cache. |
88 | | // If the hostname is already an IP address, DNS cache will return it directly. |
89 | 0 | std::string resolved_ip; |
90 | 0 | Status dns_status = ExecEnv::GetInstance()->dns_cache()->get(hostport.hostname, &resolved_ip); |
91 | 0 | if (!dns_status.ok() || resolved_ip.empty()) { |
92 | 0 | return Status::InternalError("Failed to resolve hostname {} to IP address: {}", |
93 | 0 | hostport.hostname, dns_status.to_string()); |
94 | 0 | } |
95 | | |
96 | | // Try to get a cached client with matching resolved IP |
97 | 0 | _get_client_from_cache(hostport, resolved_ip, client_key); |
98 | 0 | if (*client_key == nullptr) { |
99 | | // No cached client with matching IP, create a new one using the resolved IP |
100 | 0 | RETURN_IF_ERROR( |
101 | 0 | _create_client(hostport, resolved_ip, factory_method, client_key, timeout_ms)); |
102 | 0 | } |
103 | | |
104 | 0 | if (_metrics_enabled) { |
105 | 0 | thrift_used_clients->increment(1); |
106 | 0 | } |
107 | |
|
108 | 0 | return Status::OK(); |
109 | 0 | } |
110 | | |
111 | | Status ClientCacheHelper::reopen_client(ClientFactory& factory_method, void** client_key, |
112 | 0 | int timeout_ms) { |
113 | 0 | DCHECK(*client_key != nullptr) << "Trying to reopen nullptr client"; |
114 | 0 | ThriftClientImpl* client_to_close = nullptr; |
115 | 0 | TNetworkAddress hostport; |
116 | 0 | { |
117 | 0 | std::lock_guard<std::mutex> lock(_lock); |
118 | 0 | auto client_map_entry = _client_map.find(*client_key); |
119 | 0 | DCHECK(client_map_entry != _client_map.end()); |
120 | 0 | client_to_close = client_map_entry->second; |
121 | | |
122 | | // Get the original hostport (with hostname) for this client |
123 | 0 | auto hostport_entry = _client_hostport_map.find(*client_key); |
124 | 0 | DCHECK(hostport_entry != _client_hostport_map.end()); |
125 | 0 | hostport = hostport_entry->second; |
126 | 0 | } |
127 | |
|
128 | 0 | client_to_close->close(); |
129 | | |
130 | | // TODO: Thrift TBufferedTransport cannot be re-opened after Close() because it does |
131 | | // not clean up internal buffers it reopens. To work around this issue, create a new |
132 | | // client instead. |
133 | 0 | { |
134 | 0 | std::lock_guard<std::mutex> lock(_lock); |
135 | 0 | _client_map.erase(*client_key); |
136 | 0 | _client_hostport_map.erase(*client_key); |
137 | 0 | } |
138 | 0 | delete client_to_close; |
139 | 0 | *client_key = nullptr; |
140 | |
|
141 | 0 | if (_metrics_enabled) { |
142 | 0 | thrift_opened_clients->increment(-1); |
143 | 0 | } |
144 | | |
145 | | // Re-resolve hostname to IP address |
146 | 0 | std::string resolved_ip; |
147 | 0 | Status dns_status = ExecEnv::GetInstance()->dns_cache()->get(hostport.hostname, &resolved_ip); |
148 | 0 | if (!dns_status.ok() || resolved_ip.empty()) { |
149 | 0 | return Status::InternalError("Failed to resolve hostname {} to IP address: {}", |
150 | 0 | hostport.hostname, dns_status.to_string()); |
151 | 0 | } |
152 | | |
153 | 0 | RETURN_IF_ERROR(_create_client(hostport, resolved_ip, factory_method, client_key, timeout_ms)); |
154 | | |
155 | 0 | return Status::OK(); |
156 | 0 | } |
157 | | |
158 | | Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport, |
159 | | const std::string& resolved_ip, |
160 | | ClientFactory& factory_method, void** client_key, |
161 | 0 | int timeout_ms) { |
162 | | // Create a new TNetworkAddress with the resolved IP instead of hostname. |
163 | | // This ensures that all client connections are made using IP addresses. |
164 | 0 | TNetworkAddress addr_with_ip; |
165 | 0 | addr_with_ip.hostname = resolved_ip; |
166 | 0 | addr_with_ip.port = hostport.port; |
167 | |
|
168 | 0 | std::unique_ptr<ThriftClientImpl> client_impl(factory_method(addr_with_ip, client_key)); |
169 | | //VLOG_CONNECTION << "create_client(): adding new client for " |
170 | | // << client_impl->ipaddress() << ":" << client_impl->port(); |
171 | |
|
172 | 0 | client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000); |
173 | |
|
174 | 0 | Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100); |
175 | |
|
176 | 0 | if (!status.ok()) { |
177 | 0 | *client_key = nullptr; |
178 | 0 | return status; |
179 | 0 | } |
180 | | |
181 | 0 | DCHECK(*client_key != nullptr); |
182 | | // In thrift, timeout == 0, means wait infinitely, so that it should not happen. |
183 | | // See https://github.com/apache/thrift/blob/master/lib/cpp/src/thrift/transport/TSocket.cpp. |
184 | | // There is some code like this: int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_); |
185 | | // If the developer missed to set the timeout, we should use default timeout, not infinitely. |
186 | | // See https://linux.die.net/man/2/poll. Specifying a negative value in timeout means an infinite timeout. |
187 | 0 | client_impl->set_send_timeout(timeout_ms == 0 ? config::thrift_rpc_timeout_ms : timeout_ms); |
188 | 0 | client_impl->set_recv_timeout(timeout_ms == 0 ? config::thrift_rpc_timeout_ms : timeout_ms); |
189 | |
|
190 | 0 | { |
191 | 0 | std::lock_guard<std::mutex> lock(_lock); |
192 | | // Because the client starts life 'checked out', we don't add it to the cache map |
193 | 0 | DCHECK(_client_map.count(*client_key) == 0); |
194 | 0 | _client_map[*client_key] = client_impl.release(); |
195 | | // Store the original hostport (with hostname) for this client |
196 | 0 | _client_hostport_map[*client_key] = hostport; |
197 | 0 | } |
198 | |
|
199 | 0 | if (_metrics_enabled) { |
200 | 0 | thrift_opened_clients->increment(1); |
201 | 0 | } |
202 | |
|
203 | 0 | return Status::OK(); |
204 | 0 | } |
205 | | |
206 | 0 | void ClientCacheHelper::release_client(void** client_key) { |
207 | 0 | DCHECK(*client_key != nullptr) << "Trying to release nullptr client"; |
208 | 0 | ThriftClientImpl* client_to_close = nullptr; |
209 | 0 | { |
210 | 0 | std::lock_guard<std::mutex> lock(_lock); |
211 | 0 | auto client_map_entry = _client_map.find(*client_key); |
212 | 0 | DCHECK(client_map_entry != _client_map.end()); |
213 | 0 | ThriftClientImpl* client = client_map_entry->second; |
214 | | |
215 | | // Get the original hostport (with hostname) for this client |
216 | 0 | auto hostport_entry = _client_hostport_map.find(*client_key); |
217 | 0 | DCHECK(hostport_entry != _client_hostport_map.end()); |
218 | 0 | const TNetworkAddress& hostport = hostport_entry->second; |
219 | |
|
220 | 0 | auto cache_list = _client_cache.find(hostport); |
221 | 0 | DCHECK(cache_list != _client_cache.end()); |
222 | 0 | if (_max_cache_size_per_host >= 0 && |
223 | 0 | cache_list->second.size() >= _max_cache_size_per_host) { |
224 | | // cache of this host is full, close this client connection and remove from maps |
225 | 0 | _client_map.erase(*client_key); |
226 | 0 | _client_hostport_map.erase(*client_key); |
227 | 0 | client_to_close = client; |
228 | 0 | } else { |
229 | | // Store the client with its resolved IP address |
230 | 0 | CachedClient cached_client; |
231 | 0 | cached_client.client_key = *client_key; |
232 | 0 | cached_client.resolved_ip = client->ipaddress(); |
233 | 0 | cache_list->second.push_back(cached_client); |
234 | | // There is no need to close client if we put it to cache list. |
235 | 0 | client_to_close = nullptr; |
236 | 0 | } |
237 | 0 | } |
238 | |
|
239 | 0 | if (client_to_close != nullptr) { |
240 | 0 | client_to_close->close(); |
241 | 0 | delete client_to_close; |
242 | 0 | if (_metrics_enabled) { |
243 | 0 | thrift_opened_clients->increment(-1); |
244 | 0 | } |
245 | 0 | } |
246 | |
|
247 | 0 | if (_metrics_enabled) { |
248 | 0 | thrift_used_clients->increment(-1); |
249 | 0 | } |
250 | |
|
251 | 0 | *client_key = nullptr; |
252 | 0 | } |
253 | | |
254 | 0 | std::string ClientCacheHelper::debug_string() { |
255 | 0 | std::stringstream out; |
256 | 0 | out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " ["; |
257 | |
|
258 | 0 | bool isfirst = true; |
259 | 0 | for (const auto& [endpoint, client_keys] : _client_cache) { |
260 | 0 | if (!isfirst) { |
261 | 0 | out << " "; |
262 | 0 | isfirst = false; |
263 | 0 | } |
264 | 0 | out << endpoint << ":" << client_keys.size(); |
265 | 0 | } |
266 | |
|
267 | 0 | out << "])"; |
268 | 0 | return out.str(); |
269 | 0 | } |
270 | | |
271 | 0 | void ClientCacheHelper::init_metrics(const std::string& name) { |
272 | | // Not strictly needed if init_metrics is called before any cache |
273 | | // usage, but ensures that _metrics_enabled is published. |
274 | 0 | std::lock_guard<std::mutex> lock(_lock); |
275 | |
|
276 | 0 | _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( |
277 | 0 | std::string("thrift_client.") + name, {{"name", name}}); |
278 | 0 | INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients); |
279 | 0 | INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients); |
280 | |
|
281 | 0 | _metrics_enabled = true; |
282 | 0 | } |
283 | | |
284 | | } // namespace doris |