be/src/util/thrift_rpc_helper.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/thrift_rpc_helper.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <glog/logging.h> |
22 | | #include <thrift/Thrift.h> |
23 | | #include <thrift/protocol/TBinaryProtocol.h> |
24 | | #include <thrift/transport/TTransportException.h> |
25 | | // IWYU pragma: no_include <bits/chrono.h> |
26 | | #include <chrono> // IWYU pragma: keep |
27 | | #include <sstream> |
28 | | #include <thread> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "runtime/exec_env.h" // IWYU pragma: keep |
32 | | #include "util/client_cache.h" |
33 | | #include "util/debug_points.h" |
34 | | #include "util/network_util.h" |
35 | | |
36 | | namespace apache { |
37 | | namespace thrift { |
38 | | namespace protocol { |
39 | | class TProtocol; |
40 | | } // namespace protocol |
41 | | namespace transport { |
42 | | class TBufferedTransport; |
43 | | class TSocket; |
44 | | class TTransport; |
45 | | } // namespace transport |
46 | | } // namespace thrift |
47 | | } // namespace apache |
48 | | |
49 | | namespace doris { |
50 | | |
51 | | using apache::thrift::protocol::TProtocol; |
52 | | using apache::thrift::protocol::TBinaryProtocol; |
53 | | using apache::thrift::transport::TSocket; |
54 | | using apache::thrift::transport::TTransport; |
55 | | using apache::thrift::transport::TBufferedTransport; |
56 | | |
57 | | ExecEnv* ThriftRpcHelper::_s_exec_env = nullptr; |
58 | | |
59 | 7 | void ThriftRpcHelper::setup(ExecEnv* exec_env) { |
60 | 7 | _s_exec_env = exec_env; |
61 | 7 | } |
62 | | |
63 | | template <typename T> |
64 | | Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port, |
65 | 25.8k | std::function<void(ClientConnection<T>&)> callback, int timeout_ms) { |
66 | 25.8k | return rpc<T>([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms);_ZZN5doris15ThriftRpcHelper3rpcINS_21FrontendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEiENKUlvE_clEv Line | Count | Source | 66 | 25.8k | return rpc<T>([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms); |
Unexecuted instantiation: _ZZN5doris15ThriftRpcHelper3rpcINS_20BackendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEiENKUlvE_clEv Unexecuted instantiation: _ZZN5doris15ThriftRpcHelper3rpcINS_24TPaloBrokerServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEiENKUlvE_clEv |
67 | 25.8k | } _ZN5doris15ThriftRpcHelper3rpcINS_21FrontendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEi Line | Count | Source | 65 | 25.8k | std::function<void(ClientConnection<T>&)> callback, int timeout_ms) { | 66 | 25.8k | return rpc<T>([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms); | 67 | 25.8k | } |
Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_20BackendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEi Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_24TPaloBrokerServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEi |
68 | | |
69 | | template <typename T> |
70 | | Status ThriftRpcHelper::rpc(std::function<TNetworkAddress()> address_provider, |
71 | 29.6k | std::function<void(ClientConnection<T>&)> callback, int timeout_ms) { |
72 | 29.6k | TNetworkAddress address = address_provider(); |
73 | 29.6k | if (address.hostname.empty() || address.port == 0) { |
74 | 0 | return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available"); |
75 | 0 | } |
76 | 29.6k | Status status; |
77 | 29.6k | DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; }); |
78 | 29.6k | ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, timeout_ms, &status); |
79 | 29.6k | if (!status.ok()) { |
80 | | #ifndef ADDRESS_SANITIZER |
81 | | LOG(WARNING) << "Connect frontend failed, address=" << address << ", status=" << status; |
82 | | #endif |
83 | 0 | return status; |
84 | 0 | } |
85 | 29.6k | try { |
86 | 29.6k | try { |
87 | 29.6k | callback(client); |
88 | 29.6k | } catch (apache::thrift::transport::TTransportException& e) { |
89 | 0 | std::cerr << "thrift error, reason=" << e.what(); |
90 | | #ifndef ADDRESS_SANITIZER |
91 | | LOG(WARNING) << "retrying call frontend service after " |
92 | | << config::thrift_client_retry_interval_ms << " ms, address=" << address |
93 | | << ", reason=" << e.what(); |
94 | | #endif |
95 | 0 | std::this_thread::sleep_for( |
96 | 0 | std::chrono::milliseconds(config::thrift_client_retry_interval_ms)); |
97 | 0 | TNetworkAddress retry_address = address_provider(); |
98 | 0 | if (retry_address.hostname.empty() || retry_address.port == 0) { |
99 | 0 | return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available"); |
100 | 0 | } |
101 | 0 | if (retry_address.hostname != address.hostname || retry_address.port != address.port) { |
102 | | #ifndef ADDRESS_SANITIZER |
103 | | LOG(INFO) << "retrying call frontend service with new address=" << retry_address; |
104 | | #endif |
105 | 0 | Status retry_status; |
106 | 0 | ClientConnection<T> retry_client(_s_exec_env->get_client_cache<T>(), retry_address, |
107 | 0 | timeout_ms, &retry_status); |
108 | 0 | if (!retry_status.ok()) { |
109 | | #ifndef ADDRESS_SANITIZER |
110 | | LOG(WARNING) << "Connect frontend failed, address=" << retry_address |
111 | | << ", status=" << retry_status; |
112 | | #endif |
113 | 0 | return retry_status; |
114 | 0 | } |
115 | 0 | callback(retry_client); |
116 | 0 | } else { |
117 | 0 | status = client.reopen(timeout_ms); |
118 | 0 | if (!status.ok()) { |
119 | | #ifndef ADDRESS_SANITIZER |
120 | | LOG(WARNING) << "client reopen failed. address=" << address |
121 | | << ", status=" << status; |
122 | | #endif |
123 | 0 | return status; |
124 | 0 | } |
125 | 0 | callback(client); |
126 | 0 | } |
127 | 0 | } |
128 | 29.6k | } catch (apache::thrift::TException& e) { |
129 | | #ifndef ADDRESS_SANITIZER |
130 | | LOG(WARNING) << "call frontend service failed, address=" << address |
131 | | << ", reason=" << e.what(); |
132 | | #endif |
133 | 0 | std::this_thread::sleep_for( |
134 | 0 | std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2)); |
135 | | // just reopen to disable this connection |
136 | 0 | static_cast<void>(client.reopen(timeout_ms)); |
137 | 0 | return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}", |
138 | 0 | address.hostname, address.port, e.what()); |
139 | 0 | } |
140 | 29.6k | return Status::OK(); |
141 | 29.6k | } _ZN5doris15ThriftRpcHelper3rpcINS_21FrontendServiceClientEEENS_6StatusESt8functionIFNS_15TNetworkAddressEvEES4_IFvRNS_16ClientConnectionIT_EEEEi Line | Count | Source | 71 | 29.6k | std::function<void(ClientConnection<T>&)> callback, int timeout_ms) { | 72 | 29.6k | TNetworkAddress address = address_provider(); | 73 | 29.6k | if (address.hostname.empty() || address.port == 0) { | 74 | 0 | return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available"); | 75 | 0 | } | 76 | 29.6k | Status status; | 77 | 29.6k | DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; }); | 78 | 29.6k | ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, timeout_ms, &status); | 79 | 29.6k | if (!status.ok()) { | 80 | | #ifndef ADDRESS_SANITIZER | 81 | | LOG(WARNING) << "Connect frontend failed, address=" << address << ", status=" << status; | 82 | | #endif | 83 | 0 | return status; | 84 | 0 | } | 85 | 29.6k | try { | 86 | 29.6k | try { | 87 | 29.6k | callback(client); | 88 | 29.6k | } catch (apache::thrift::transport::TTransportException& e) { | 89 | 0 | std::cerr << "thrift error, reason=" << e.what(); | 90 | | #ifndef ADDRESS_SANITIZER | 91 | | LOG(WARNING) << "retrying call frontend service after " | 92 | | << config::thrift_client_retry_interval_ms << " ms, address=" << address | 93 | | << ", reason=" << e.what(); | 94 | | #endif | 95 | 0 | std::this_thread::sleep_for( | 96 | 0 | std::chrono::milliseconds(config::thrift_client_retry_interval_ms)); | 97 | 0 | TNetworkAddress retry_address = address_provider(); | 98 | 0 | if (retry_address.hostname.empty() || retry_address.port == 0) { | 99 | 0 | return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available"); | 100 | 0 | } | 101 | 0 | if (retry_address.hostname != address.hostname || retry_address.port != address.port) { | 102 | | #ifndef ADDRESS_SANITIZER | 103 | | LOG(INFO) << "retrying call frontend service with new address=" << retry_address; | 104 | | #endif | 105 | 0 | Status retry_status; | 106 | 0 | ClientConnection<T> retry_client(_s_exec_env->get_client_cache<T>(), retry_address, | 107 | 0 | timeout_ms, &retry_status); | 108 | 0 | if (!retry_status.ok()) { | 109 | | #ifndef ADDRESS_SANITIZER | 110 | | LOG(WARNING) << "Connect frontend failed, address=" << retry_address | 111 | | << ", status=" << retry_status; | 112 | | #endif | 113 | 0 | return retry_status; | 114 | 0 | } | 115 | 0 | callback(retry_client); | 116 | 0 | } else { | 117 | 0 | status = client.reopen(timeout_ms); | 118 | 0 | if (!status.ok()) { | 119 | | #ifndef ADDRESS_SANITIZER | 120 | | LOG(WARNING) << "client reopen failed. address=" << address | 121 | | << ", status=" << status; | 122 | | #endif | 123 | 0 | return status; | 124 | 0 | } | 125 | 0 | callback(client); | 126 | 0 | } | 127 | 0 | } | 128 | 29.6k | } catch (apache::thrift::TException& e) { | 129 | | #ifndef ADDRESS_SANITIZER | 130 | | LOG(WARNING) << "call frontend service failed, address=" << address | 131 | | << ", reason=" << e.what(); | 132 | | #endif | 133 | 0 | std::this_thread::sleep_for( | 134 | 0 | std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2)); | 135 | | // just reopen to disable this connection | 136 | 0 | static_cast<void>(client.reopen(timeout_ms)); | 137 | 0 | return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}", | 138 | 0 | address.hostname, address.port, e.what()); | 139 | 0 | } | 140 | 29.6k | return Status::OK(); | 141 | 29.6k | } |
Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_20BackendServiceClientEEENS_6StatusESt8functionIFNS_15TNetworkAddressEvEES4_IFvRNS_16ClientConnectionIT_EEEEi Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_24TPaloBrokerServiceClientEEENS_6StatusESt8functionIFNS_15TNetworkAddressEvEES4_IFvRNS_16ClientConnectionIT_EEEEi |
142 | | |
143 | | template Status ThriftRpcHelper::rpc<FrontendServiceClient>( |
144 | | const std::string& ip, const int32_t port, |
145 | | std::function<void(ClientConnection<FrontendServiceClient>&)> callback, int timeout_ms); |
146 | | |
147 | | template Status ThriftRpcHelper::rpc<FrontendServiceClient>( |
148 | | std::function<TNetworkAddress()> address_provider, |
149 | | std::function<void(ClientConnection<FrontendServiceClient>&)> callback, int timeout_ms); |
150 | | |
151 | | template Status ThriftRpcHelper::rpc<BackendServiceClient>( |
152 | | const std::string& ip, const int32_t port, |
153 | | std::function<void(ClientConnection<BackendServiceClient>&)> callback, int timeout_ms); |
154 | | |
155 | | template Status ThriftRpcHelper::rpc<TPaloBrokerServiceClient>( |
156 | | const std::string& ip, const int32_t port, |
157 | | std::function<void(ClientConnection<TPaloBrokerServiceClient>&)> callback, int timeout_ms); |
158 | | |
159 | | } // namespace doris |