/root/doris/be/src/runtime/client_cache.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/client_cache.h" |
19 | | |
20 | | #include <memory> |
21 | | #include <utility> |
22 | | |
23 | | #include "common/logging.h" |
24 | | #include "util/doris_metrics.h" |
25 | | #include "util/network_util.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_used_clients, MetricUnit::NOUNIT, |
30 | | "Number of clients 'checked-out' from the cache"); |
31 | | DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_opened_clients, MetricUnit::NOUNIT, |
32 | | "Total clients in the cache, including those in use"); |
33 | | |
34 | 0 | ClientCacheHelper::~ClientCacheHelper() { |
35 | 0 | for (auto& it : _client_map) { |
36 | 0 | delete it.second; |
37 | 0 | } |
38 | 0 | } |
39 | | |
40 | 0 | void ClientCacheHelper::_get_client_from_cache(const TNetworkAddress& hostport, void** client_key) { |
41 | 0 | *client_key = nullptr; |
42 | 0 | std::lock_guard<std::mutex> lock(_lock); |
43 | | //VLOG_RPC << "get_client(" << hostport << ")"; |
44 | 0 | auto cache_entry = _client_cache.find(hostport); |
45 | |
|
46 | 0 | if (cache_entry == _client_cache.end()) { |
47 | 0 | cache_entry = _client_cache.insert(std::make_pair(hostport, std::list<void*>())).first; |
48 | 0 | DCHECK(cache_entry != _client_cache.end()); |
49 | 0 | } |
50 | |
|
51 | 0 | std::list<void*>& info_list = cache_entry->second; |
52 | 0 | if (!info_list.empty()) { |
53 | 0 | *client_key = info_list.front(); |
54 | 0 | VLOG_RPC << "get_client(): cached client for " << hostport; |
55 | 0 | info_list.pop_front(); |
56 | 0 | } |
57 | 0 | } |
58 | | |
59 | | Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, ClientFactory& factory_method, |
60 | 0 | void** client_key, int timeout_ms) { |
61 | 0 | _get_client_from_cache(hostport, client_key); |
62 | 0 | if (*client_key == nullptr) { |
63 | 0 | RETURN_IF_ERROR(_create_client(hostport, factory_method, client_key, timeout_ms)); |
64 | 0 | } |
65 | | |
66 | 0 | if (_metrics_enabled) { |
67 | 0 | thrift_used_clients->increment(1); |
68 | 0 | } |
69 | |
|
70 | 0 | return Status::OK(); |
71 | 0 | } |
72 | | |
73 | | Status ClientCacheHelper::reopen_client(ClientFactory& factory_method, void** client_key, |
74 | 0 | int timeout_ms) { |
75 | 0 | DCHECK(*client_key != nullptr) << "Trying to reopen nullptr client"; |
76 | 0 | ThriftClientImpl* client_to_close = nullptr; |
77 | 0 | { |
78 | 0 | std::lock_guard<std::mutex> lock(_lock); |
79 | 0 | auto client_map_entry = _client_map.find(*client_key); |
80 | 0 | DCHECK(client_map_entry != _client_map.end()); |
81 | 0 | client_to_close = client_map_entry->second; |
82 | 0 | } |
83 | 0 | const std::string ipaddress = client_to_close->ipaddress(); |
84 | 0 | int port = client_to_close->port(); |
85 | |
|
86 | 0 | client_to_close->close(); |
87 | | |
88 | | // TODO: Thrift TBufferedTransport cannot be re-opened after Close() because it does |
89 | | // not clean up internal buffers it reopens. To work around this issue, create a new |
90 | | // client instead. |
91 | 0 | { |
92 | 0 | std::lock_guard<std::mutex> lock(_lock); |
93 | 0 | _client_map.erase(*client_key); |
94 | 0 | } |
95 | 0 | delete client_to_close; |
96 | 0 | *client_key = nullptr; |
97 | |
|
98 | 0 | if (_metrics_enabled) { |
99 | 0 | thrift_opened_clients->increment(-1); |
100 | 0 | } |
101 | |
|
102 | 0 | RETURN_IF_ERROR(_create_client(make_network_address(ipaddress, port), factory_method, |
103 | 0 | client_key, timeout_ms)); |
104 | | |
105 | 0 | return Status::OK(); |
106 | 0 | } |
107 | | |
108 | | Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport, |
109 | | ClientFactory& factory_method, void** client_key, |
110 | 0 | int timeout_ms) { |
111 | 0 | std::unique_ptr<ThriftClientImpl> client_impl(factory_method(hostport, client_key)); |
112 | | //VLOG_CONNECTION << "create_client(): adding new client for " |
113 | | // << client_impl->ipaddress() << ":" << client_impl->port(); |
114 | |
|
115 | 0 | client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000); |
116 | |
|
117 | 0 | Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100); |
118 | |
|
119 | 0 | if (!status.ok()) { |
120 | 0 | *client_key = nullptr; |
121 | 0 | return status; |
122 | 0 | } |
123 | | |
124 | 0 | DCHECK(*client_key != nullptr); |
125 | 0 | client_impl->set_send_timeout(timeout_ms); |
126 | 0 | client_impl->set_recv_timeout(timeout_ms); |
127 | |
|
128 | 0 | { |
129 | 0 | std::lock_guard<std::mutex> lock(_lock); |
130 | | // Because the client starts life 'checked out', we don't add it to the cache map |
131 | 0 | DCHECK(_client_map.count(*client_key) == 0); |
132 | 0 | _client_map[*client_key] = client_impl.release(); |
133 | 0 | } |
134 | |
|
135 | 0 | if (_metrics_enabled) { |
136 | 0 | thrift_opened_clients->increment(1); |
137 | 0 | } |
138 | |
|
139 | 0 | return Status::OK(); |
140 | 0 | } |
141 | | |
142 | 0 | void ClientCacheHelper::release_client(void** client_key) { |
143 | 0 | DCHECK(*client_key != nullptr) << "Trying to release nullptr client"; |
144 | 0 | ThriftClientImpl* client_to_close = nullptr; |
145 | 0 | { |
146 | 0 | std::lock_guard<std::mutex> lock(_lock); |
147 | 0 | auto client_map_entry = _client_map.find(*client_key); |
148 | 0 | DCHECK(client_map_entry != _client_map.end()); |
149 | 0 | client_to_close = client_map_entry->second; |
150 | |
|
151 | 0 | auto cache_list = _client_cache.find( |
152 | 0 | make_network_address(client_to_close->ipaddress(), client_to_close->port())); |
153 | 0 | DCHECK(cache_list != _client_cache.end()); |
154 | 0 | if (_max_cache_size_per_host >= 0 && |
155 | 0 | cache_list->second.size() >= _max_cache_size_per_host) { |
156 | | // cache of this host is full, close this client connection and remove if from _client_map |
157 | 0 | _client_map.erase(*client_key); |
158 | 0 | } else { |
159 | 0 | cache_list->second.push_back(*client_key); |
160 | | // There is no need to close client if we put it to cache list. |
161 | 0 | client_to_close = nullptr; |
162 | 0 | } |
163 | 0 | } |
164 | |
|
165 | 0 | if (client_to_close != nullptr) { |
166 | 0 | client_to_close->close(); |
167 | 0 | delete client_to_close; |
168 | 0 | if (_metrics_enabled) { |
169 | 0 | thrift_opened_clients->increment(-1); |
170 | 0 | } |
171 | 0 | } |
172 | |
|
173 | 0 | if (_metrics_enabled) { |
174 | 0 | thrift_used_clients->increment(-1); |
175 | 0 | } |
176 | |
|
177 | 0 | *client_key = nullptr; |
178 | 0 | } |
179 | | |
180 | 0 | std::string ClientCacheHelper::debug_string() { |
181 | 0 | std::stringstream out; |
182 | 0 | out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " ["; |
183 | |
|
184 | 0 | bool isfirst = true; |
185 | 0 | for (const auto& [endpoint, client_keys] : _client_cache) { |
186 | 0 | if (!isfirst) { |
187 | 0 | out << " "; |
188 | 0 | isfirst = false; |
189 | 0 | } |
190 | 0 | out << endpoint << ":" << client_keys.size(); |
191 | 0 | } |
192 | |
|
193 | 0 | out << "])"; |
194 | 0 | return out.str(); |
195 | 0 | } |
196 | | |
197 | 0 | void ClientCacheHelper::init_metrics(const std::string& name) { |
198 | | // Not strictly needed if init_metrics is called before any cache |
199 | | // usage, but ensures that _metrics_enabled is published. |
200 | 0 | std::lock_guard<std::mutex> lock(_lock); |
201 | |
|
202 | 0 | _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( |
203 | 0 | std::string("thrift_client.") + name, {{"name", name}}); |
204 | 0 | INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients); |
205 | 0 | INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients); |
206 | |
|
207 | 0 | _metrics_enabled = true; |
208 | 0 | } |
209 | | |
210 | | } // namespace doris |