be/src/util/brpc_client_cache.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <brpc/adaptive_connection_type.h> |
21 | | #include <brpc/adaptive_protocol_type.h> |
22 | | #include <brpc/channel.h> |
23 | | #include <brpc/controller.h> |
24 | | #include <butil/endpoint.h> |
25 | | #include <fmt/format.h> |
26 | | #include <gen_cpp/Types_types.h> |
27 | | #include <gen_cpp/types.pb.h> |
28 | | #include <glog/logging.h> |
29 | | #include <google/protobuf/service.h> |
30 | | #include <parallel_hashmap/phmap.h> |
31 | | #include <stddef.h> |
32 | | |
33 | | #include <functional> |
34 | | #include <memory> |
35 | | #include <mutex> |
36 | | #include <ostream> |
37 | | #include <string> |
38 | | #include <utility> |
39 | | #include <vector> |
40 | | |
41 | | #include "common/compiler_util.h" // IWYU pragma: keep |
42 | | #include "common/config.h" |
43 | | #include "common/status.h" |
44 | | #include "runtime/exec_env.h" |
45 | | #include "service/backend_options.h" |
46 | | #include "util/dns_cache.h" |
47 | | #include "util/network_util.h" |
48 | | |
49 | | namespace doris { |
50 | | class PBackendService_Stub; |
51 | | class PFunctionService_Stub; |
52 | | } // namespace doris |
53 | | |
54 | | // Entry that holds both resolved IP and stub, similar to Java's BackendServiceClientExtIp |
55 | | template <typename T> |
56 | | struct StubEntry { |
57 | | std::string real_ip; |
58 | | std::shared_ptr<T> stub; |
59 | | }; |
60 | | |
61 | | template <typename T> |
62 | | using StubMap = phmap::parallel_flat_hash_map< |
63 | | std::string, StubEntry<T>, std::hash<std::string>, std::equal_to<std::string>, |
64 | | std::allocator<std::pair<const std::string, StubEntry<T>>>, 8, std::mutex>; |
65 | | |
66 | | namespace doris { |
67 | | #include "common/compile_check_begin.h" |
68 | | class FailureDetectClosure : public ::google::protobuf::Closure { |
69 | | public: |
70 | | FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st, |
71 | | ::google::protobuf::RpcController* controller, |
72 | | ::google::protobuf::Closure* done) |
73 | 2.91M | : _channel_st(channel_st), _controller(controller), _done(done) {} |
74 | | |
75 | 2.90M | void Run() override { |
76 | 2.90M | Defer defer {[&]() { delete this; }}; |
77 | | // All brpc related API will use brpc::Controller, so that it is safe |
78 | | // to do static cast here. |
79 | 2.90M | auto* cntl = static_cast<brpc::Controller*>(_controller); |
80 | 2.90M | if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) { |
81 | 1 | Status error_st = Status::NetworkError( |
82 | 1 | "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}", |
83 | 1 | berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(), |
84 | 1 | cntl->latency_us()); |
85 | 1 | LOG(WARNING) << error_st; |
86 | 1 | _channel_st->update(error_st); |
87 | 1 | } |
88 | | // Sometimes done == nullptr, for example hand_shake API. |
89 | 2.91M | if (_done != nullptr) { |
90 | 2.91M | _done->Run(); |
91 | 2.91M | } |
92 | | // _done->Run may throw exception, so that move delete this to Defer. |
93 | | // delete this; |
94 | 2.90M | } |
95 | | |
96 | | private: |
97 | | std::shared_ptr<AtomicStatus> _channel_st; |
98 | | ::google::protobuf::RpcController* _controller; |
99 | | ::google::protobuf::Closure* _done; |
100 | | }; |
101 | | |
102 | | // This channel will use FailureDetectClosure to wrap the original closure |
103 | | // If some non-recoverable rpc failure happens, it will save the error status in |
104 | | // _channel_st. |
105 | | // And brpc client cache will depend on it to detect if the client is health. |
106 | | class FailureDetectChannel : public ::brpc::Channel { |
107 | | public: |
108 | 20 | FailureDetectChannel() : ::brpc::Channel() { |
109 | 20 | _channel_st = std::make_shared<AtomicStatus>(); // default OK |
110 | 20 | } |
111 | | void CallMethod(const google::protobuf::MethodDescriptor* method, |
112 | | google::protobuf::RpcController* controller, |
113 | | const google::protobuf::Message* request, google::protobuf::Message* response, |
114 | 2.90M | google::protobuf::Closure* done) override { |
115 | 2.90M | FailureDetectClosure* failure_detect_closure = nullptr; |
116 | 2.90M | if (done != nullptr) { |
117 | | // If done == nullptr, then it means the call is sync call, so that should not |
118 | | // gen a failure detect closure for it. Or it will core. |
119 | 2.90M | failure_detect_closure = new FailureDetectClosure(_channel_st, controller, done); |
120 | 2.90M | } |
121 | 2.90M | ::brpc::Channel::CallMethod(method, controller, request, response, failure_detect_closure); |
122 | | // Done == nullptr, it is a sync call, should also deal with the bad channel. |
123 | 2.90M | if (done == nullptr) { |
124 | 7.29k | auto* cntl = static_cast<brpc::Controller*>(controller); |
125 | 7.29k | if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) { |
126 | 1 | Status error_st = Status::NetworkError( |
127 | 1 | "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}", |
128 | 1 | berror(cntl->ErrorCode()), cntl->ErrorText(), |
129 | 1 | BackendOptions::get_localhost(), cntl->latency_us()); |
130 | 1 | LOG(WARNING) << error_st; |
131 | 1 | _channel_st->update(error_st); |
132 | 1 | } |
133 | 7.29k | } |
134 | 2.90M | } |
135 | | |
136 | 2.95M | std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; } |
137 | | |
138 | | private: |
139 | | std::shared_ptr<AtomicStatus> _channel_st; |
140 | | }; |
141 | | |
142 | | template <class T> |
143 | | class BrpcClientCache { |
144 | | public: |
145 | | BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "", |
146 | | std::string connection_group = ""); |
147 | | virtual ~BrpcClientCache(); |
148 | | |
149 | | std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) { |
150 | | return get_client(butil::endpoint2str(endpoint).c_str()); |
151 | | } |
152 | | |
153 | | #ifdef BE_TEST |
154 | | virtual std::shared_ptr<T> get_client(const TNetworkAddress& taddr) { |
155 | | std::string host_port = fmt::format("{}:{}", taddr.hostname, taddr.port); |
156 | | return get_client(host_port); |
157 | | } |
158 | | #else |
159 | 25.4k | std::shared_ptr<T> get_client(const TNetworkAddress& taddr) { |
160 | 25.4k | return get_client(taddr.hostname, taddr.port); |
161 | 25.4k | } |
162 | | #endif |
163 | | |
164 | 139 | std::shared_ptr<T> get_client(const PNetworkAddress& paddr) { |
165 | 139 | return get_client(paddr.hostname(), paddr.port()); |
166 | 139 | } |
167 | | |
168 | 2.93M | std::shared_ptr<T> get_client(const std::string& host, int port) { |
169 | 2.93M | std::string realhost = host; |
170 | 2.93M | auto dns_cache = ExecEnv::GetInstance()->dns_cache(); |
171 | 2.93M | if (dns_cache == nullptr) { |
172 | 8 | LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; |
173 | 2.93M | } else if (!is_valid_ip(host)) { |
174 | 0 | Status status = dns_cache->get(host, &realhost); |
175 | 0 | if (!status.ok()) { |
176 | 0 | LOG(WARNING) << "failed to get ip from host:" << status.to_string(); |
177 | 0 | return nullptr; |
178 | 0 | } |
179 | 0 | } |
180 | | |
181 | | // Use original host:port as key (like Java's TNetworkAddress address) |
182 | | // This allows us to detect IP changes when DNS resolution changes |
183 | 2.93M | std::string host_port = fmt::format("{}:{}", host, port); |
184 | | |
185 | 2.93M | std::shared_ptr<T> stub_ptr; |
186 | 2.93M | bool need_remove = false; |
187 | | |
188 | 2.95M | auto check_entry = [&](const auto& v) { |
189 | 2.95M | const StubEntry<T>& entry = v.second; |
190 | | // Check if cached IP matches current resolved IP |
191 | 2.95M | if (entry.real_ip != realhost) { |
192 | | // IP changed (DNS resolution changed) |
193 | 0 | LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip |
194 | 0 | << ", current ip: " << realhost; |
195 | 0 | need_remove = true; |
196 | 2.95M | } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel()) |
197 | 2.95M | ->channel_status() |
198 | 2.95M | ->ok()) { |
199 | | // Client is not in normal state, need to recreate |
200 | | // At this point we cannot judge the progress of reconnecting the underlying channel. |
201 | | // In the worst case, it may take two minutes. But we can't stand the connection refused |
202 | | // for two minutes, so rebuild the channel directly. |
203 | 2 | need_remove = true; |
204 | 2.95M | } else { |
205 | | // Cache hit: IP matches and client is healthy |
206 | 2.95M | stub_ptr = entry.stub; |
207 | 2.95M | } |
208 | 2.95M | }; Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_ _ZZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_ Line | Count | Source | 188 | 2.95M | auto check_entry = [&](const auto& v) { | 189 | 2.95M | const StubEntry<T>& entry = v.second; | 190 | | // Check if cached IP matches current resolved IP | 191 | 2.95M | if (entry.real_ip != realhost) { | 192 | | // IP changed (DNS resolution changed) | 193 | 0 | LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip | 194 | 0 | << ", current ip: " << realhost; | 195 | 0 | need_remove = true; | 196 | 2.95M | } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel()) | 197 | 2.95M | ->channel_status() | 198 | 2.95M | ->ok()) { | 199 | | // Client is not in normal state, need to recreate | 200 | | // At this point we cannot judge the progress of reconnecting the underlying channel. | 201 | | // In the worst case, it may take two minutes. But we can't stand the connection refused | 202 | | // for two minutes, so rebuild the channel directly. | 203 | 2 | need_remove = true; | 204 | 2.95M | } else { | 205 | | // Cache hit: IP matches and client is healthy | 206 | 2.95M | stub_ptr = entry.stub; | 207 | 2.95M | } | 208 | 2.95M | }; |
|
209 | | |
210 | 2.95M | if (LIKELY(_stub_map.if_contains(host_port, check_entry))) { |
211 | 2.95M | if (stub_ptr != nullptr) { |
212 | 2.95M | return stub_ptr; |
213 | 2.95M | } |
214 | | // IP changed or client unhealthy, need to remove old entry |
215 | 18.4E | if (need_remove) { |
216 | 2 | _stub_map.erase(host_port); |
217 | 2 | } |
218 | 18.4E | } |
219 | | |
220 | | // Create new stub using resolved IP for actual connection |
221 | 18.4E | std::string real_host_port = get_host_port(realhost, port); |
222 | 18.4E | auto stub = get_new_client_no_cache(real_host_port); |
223 | 18.4E | if (stub != nullptr) { |
224 | 19 | StubEntry<T> entry {realhost, stub}; |
225 | 19 | _stub_map.try_emplace_l( |
226 | 19 | host_port, [&stub](const auto& v) { stub = v.second.stub; }, entry);Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E0_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_ Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E0_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_ |
227 | 19 | } |
228 | 18.4E | return stub; |
229 | 2.93M | } Unexecuted instantiation: _ZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEi _ZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEi Line | Count | Source | 168 | 2.93M | std::shared_ptr<T> get_client(const std::string& host, int port) { | 169 | 2.93M | std::string realhost = host; | 170 | 2.93M | auto dns_cache = ExecEnv::GetInstance()->dns_cache(); | 171 | 2.93M | if (dns_cache == nullptr) { | 172 | 8 | LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; | 173 | 2.93M | } else if (!is_valid_ip(host)) { | 174 | 0 | Status status = dns_cache->get(host, &realhost); | 175 | 0 | if (!status.ok()) { | 176 | 0 | LOG(WARNING) << "failed to get ip from host:" << status.to_string(); | 177 | 0 | return nullptr; | 178 | 0 | } | 179 | 0 | } | 180 | | | 181 | | // Use original host:port as key (like Java's TNetworkAddress address) | 182 | | // This allows us to detect IP changes when DNS resolution changes | 183 | 2.93M | std::string host_port = fmt::format("{}:{}", host, port); | 184 | | | 185 | 2.93M | std::shared_ptr<T> stub_ptr; | 186 | 2.93M | bool need_remove = false; | 187 | | | 188 | 2.93M | auto check_entry = [&](const auto& v) { | 189 | 2.93M | const StubEntry<T>& entry = v.second; | 190 | | // Check if cached IP matches current resolved IP | 191 | 2.93M | if (entry.real_ip != realhost) { | 192 | | // IP changed (DNS resolution changed) | 193 | 2.93M | LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip | 194 | 2.93M | << ", current ip: " << realhost; | 195 | 2.93M | need_remove = true; | 196 | 2.93M | } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel()) | 197 | 2.93M | ->channel_status() | 198 | 2.93M | ->ok()) { | 199 | | // Client is not in normal state, need to recreate | 200 | | // At this point we cannot judge the progress of reconnecting the underlying channel. | 201 | | // In the worst case, it may take two minutes. But we can't stand the connection refused | 202 | | // for two minutes, so rebuild the channel directly. | 203 | 2.93M | need_remove = true; | 204 | 2.93M | } else { | 205 | | // Cache hit: IP matches and client is healthy | 206 | 2.93M | stub_ptr = entry.stub; | 207 | 2.93M | } | 208 | 2.93M | }; | 209 | | | 210 | 2.95M | if (LIKELY(_stub_map.if_contains(host_port, check_entry))) { | 211 | 2.95M | if (stub_ptr != nullptr) { | 212 | 2.95M | return stub_ptr; | 213 | 2.95M | } | 214 | | // IP changed or client unhealthy, need to remove old entry | 215 | 18.4E | if (need_remove) { | 216 | 2 | _stub_map.erase(host_port); | 217 | 2 | } | 218 | 18.4E | } | 219 | | | 220 | | // Create new stub using resolved IP for actual connection | 221 | 18.4E | std::string real_host_port = get_host_port(realhost, port); | 222 | 18.4E | auto stub = get_new_client_no_cache(real_host_port); | 223 | 18.4E | if (stub != nullptr) { | 224 | 19 | StubEntry<T> entry {realhost, stub}; | 225 | 19 | _stub_map.try_emplace_l( | 226 | 19 | host_port, [&stub](const auto& v) { stub = v.second.stub; }, entry); | 227 | 19 | } | 228 | 18.4E | return stub; | 229 | 2.93M | } |
|
230 | | |
231 | 0 | std::shared_ptr<T> get_client(const std::string& host_port) { |
232 | 0 | const auto pos = host_port.rfind(':'); |
233 | 0 | std::string host = host_port.substr(0, pos); |
234 | 0 | int port = 0; |
235 | 0 | try { |
236 | 0 | port = stoi(host_port.substr(pos + 1)); |
237 | 0 | } catch (const std::exception& err) { |
238 | 0 | LOG(WARNING) << "failed to parse port from " << host_port << ": " << err.what(); |
239 | 0 | return nullptr; |
240 | 0 | } |
241 | 0 | return get_client(host, port); |
242 | 0 | } |
243 | | |
244 | | std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port, |
245 | | const std::string& protocol = "", |
246 | | const std::string& connection_type = "", |
247 | 20 | const std::string& connection_group = "") { |
248 | 20 | brpc::ChannelOptions options; |
249 | 20 | if (protocol != "") { |
250 | 0 | options.protocol = protocol; |
251 | 20 | } else if (_protocol != "") { |
252 | 20 | options.protocol = _protocol; |
253 | 20 | } |
254 | 20 | if (connection_type != "") { |
255 | 0 | options.connection_type = connection_type; |
256 | 20 | } else if (_connection_type != "") { |
257 | 4 | options.connection_type = _connection_type; |
258 | 4 | } |
259 | 20 | if (connection_group != "") { |
260 | 0 | options.connection_group = connection_group; |
261 | 20 | } else if (_connection_group != "") { |
262 | 4 | options.connection_group = _connection_group; |
263 | 4 | } |
264 | | // Add random connection id to connection_group to make sure use new socket |
265 | 20 | options.connection_group += std::to_string(_connection_id.fetch_add(1)); |
266 | 20 | options.connect_timeout_ms = 2000; |
267 | 20 | options.timeout_ms = 2000; |
268 | 20 | options.max_retry = 10; |
269 | | |
270 | 20 | std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel()); |
271 | 20 | int ret_code = 0; |
272 | 20 | if (host_port.find("://") == std::string::npos) { |
273 | 20 | ret_code = channel->Init(host_port.c_str(), &options); |
274 | 20 | } else { |
275 | 0 | ret_code = |
276 | 0 | channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options); |
277 | 0 | } |
278 | 20 | if (ret_code) { |
279 | 1 | LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port; |
280 | 1 | return nullptr; |
281 | 1 | } |
282 | 19 | return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL); |
283 | 20 | } _ZN5doris15BrpcClientCacheINS_20PBackendService_StubEE23get_new_client_no_cacheERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESA_SA_SA_ Line | Count | Source | 247 | 20 | const std::string& connection_group = "") { | 248 | 20 | brpc::ChannelOptions options; | 249 | 20 | if (protocol != "") { | 250 | 0 | options.protocol = protocol; | 251 | 20 | } else if (_protocol != "") { | 252 | 20 | options.protocol = _protocol; | 253 | 20 | } | 254 | 20 | if (connection_type != "") { | 255 | 0 | options.connection_type = connection_type; | 256 | 20 | } else if (_connection_type != "") { | 257 | 4 | options.connection_type = _connection_type; | 258 | 4 | } | 259 | 20 | if (connection_group != "") { | 260 | 0 | options.connection_group = connection_group; | 261 | 20 | } else if (_connection_group != "") { | 262 | 4 | options.connection_group = _connection_group; | 263 | 4 | } | 264 | | // Add random connection id to connection_group to make sure use new socket | 265 | 20 | options.connection_group += std::to_string(_connection_id.fetch_add(1)); | 266 | 20 | options.connect_timeout_ms = 2000; | 267 | 20 | options.timeout_ms = 2000; | 268 | 20 | options.max_retry = 10; | 269 | | | 270 | 20 | std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel()); | 271 | 20 | int ret_code = 0; | 272 | 20 | if (host_port.find("://") == std::string::npos) { | 273 | 20 | ret_code = channel->Init(host_port.c_str(), &options); | 274 | 20 | } else { | 275 | 0 | ret_code = | 276 | 0 | channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options); | 277 | 0 | } | 278 | 20 | if (ret_code) { | 279 | 1 | LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port; | 280 | 1 | return nullptr; | 281 | 1 | } | 282 | 19 | return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL); | 283 | 20 | } |
Unexecuted instantiation: _ZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE23get_new_client_no_cacheERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESA_SA_SA_ |
284 | | |
285 | 1 | size_t size() { return _stub_map.size(); } |
286 | | |
287 | 0 | void clear() { _stub_map.clear(); } |
288 | | |
289 | 0 | size_t erase(const std::string& host_port) { return _stub_map.erase(host_port); } |
290 | | |
291 | 0 | size_t erase(const std::string& host, int port) { |
292 | 0 | std::string host_port = fmt::format("{}:{}", host, port); |
293 | 0 | return erase(host_port); |
294 | 0 | } |
295 | | |
296 | 0 | size_t erase(const butil::EndPoint& endpoint) { |
297 | 0 | return _stub_map.erase(butil::endpoint2str(endpoint).c_str()); |
298 | 0 | } |
299 | | |
300 | 0 | bool exist(const std::string& host_port) { |
301 | 0 | return _stub_map.find(host_port) != _stub_map.end(); |
302 | 0 | } |
303 | | |
304 | 0 | void get_all(std::vector<std::string>* endpoints) { |
305 | 0 | for (auto it = _stub_map.begin(); it != _stub_map.end(); ++it) { |
306 | 0 | endpoints->emplace_back(it->first.c_str()); |
307 | 0 | } |
308 | 0 | } |
309 | | |
310 | | bool available(std::shared_ptr<T> stub, const butil::EndPoint& endpoint) { |
311 | | return available(stub, butil::endpoint2str(endpoint).c_str()); |
312 | | } |
313 | | |
314 | 1 | bool available(std::shared_ptr<T> stub, const std::string& host_port) { |
315 | 1 | if (!stub) { |
316 | 0 | LOG(WARNING) << "stub is null to: " << host_port; |
317 | 0 | return false; |
318 | 0 | } |
319 | 1 | std::string message = "hello doris!"; |
320 | 1 | PHandShakeRequest request; |
321 | 1 | request.set_hello(message); |
322 | 1 | PHandShakeResponse response; |
323 | 1 | brpc::Controller cntl; |
324 | 1 | stub->hand_shake(&cntl, &request, &response, nullptr); |
325 | 1 | if (cntl.Failed()) { |
326 | 1 | LOG(WARNING) << "open brpc connection to " << host_port |
327 | 1 | << " failed: " << cntl.ErrorText(); |
328 | 1 | return false; |
329 | 1 | } else if (response.has_status() && response.has_hello() && response.hello() == message && |
330 | 0 | response.status().status_code() == 0) { |
331 | 0 | return true; |
332 | 0 | } else { |
333 | 0 | LOG(WARNING) << "open brpc connection to " << host_port |
334 | 0 | << " failed: " << response.DebugString(); |
335 | 0 | return false; |
336 | 0 | } |
337 | 1 | } |
338 | | |
339 | 1 | bool available(std::shared_ptr<T> stub, const std::string& host, int port) { |
340 | 1 | std::string host_port = fmt::format("{}:{}", host, port); |
341 | 1 | return available(stub, host_port); |
342 | 1 | } |
343 | | |
344 | | private: |
345 | | StubMap<T> _stub_map; |
346 | | const std::string _protocol; |
347 | | const std::string _connection_type; |
348 | | const std::string _connection_group; |
349 | | // use to generate unique connection id for each connection |
350 | | // to prevent the connection problem of brpc: https://github.com/apache/brpc/issues/2146 |
351 | | std::atomic<int64_t> _connection_id {0}; |
352 | | }; |
353 | | |
354 | | using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>; |
355 | | using FunctionServiceClientCache = BrpcClientCache<PFunctionService_Stub>; |
356 | | #include "common/compile_check_end.h" |
357 | | } // namespace doris |