Coverage Report

Created: 2026-03-15 18:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/thrift_rpc_helper.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/thrift_rpc_helper.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <glog/logging.h>
22
#include <thrift/Thrift.h>
23
#include <thrift/protocol/TBinaryProtocol.h>
24
#include <thrift/transport/TTransportException.h>
25
// IWYU pragma: no_include <bits/chrono.h>
26
#include <chrono> // IWYU pragma: keep
27
#include <sstream>
28
#include <thread>
29
30
#include "common/status.h"
31
#include "runtime/exec_env.h" // IWYU pragma: keep
32
#include "util/client_cache.h"
33
#include "util/debug_points.h"
34
#include "util/network_util.h"
35
36
namespace apache {
37
namespace thrift {
38
namespace protocol {
39
class TProtocol;
40
} // namespace protocol
41
namespace transport {
42
class TBufferedTransport;
43
class TSocket;
44
class TTransport;
45
} // namespace transport
46
} // namespace thrift
47
} // namespace apache
48
49
namespace doris {
50
51
using apache::thrift::protocol::TProtocol;
52
using apache::thrift::protocol::TBinaryProtocol;
53
using apache::thrift::transport::TSocket;
54
using apache::thrift::transport::TTransport;
55
using apache::thrift::transport::TBufferedTransport;
56
57
ExecEnv* ThriftRpcHelper::_s_exec_env = nullptr;
58
59
6
void ThriftRpcHelper::setup(ExecEnv* exec_env) {
60
6
    _s_exec_env = exec_env;
61
6
}
62
63
template <typename T>
64
Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
65
6
                            std::function<void(ClientConnection<T>&)> callback, int timeout_ms) {
66
6
    return rpc<T>([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms);
_ZZN5doris15ThriftRpcHelper3rpcINS_21FrontendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEiENKUlvE_clEv
Line
Count
Source
66
6
    return rpc<T>([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms);
Unexecuted instantiation: _ZZN5doris15ThriftRpcHelper3rpcINS_20BackendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEiENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris15ThriftRpcHelper3rpcINS_24TPaloBrokerServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEiENKUlvE_clEv
67
6
}
_ZN5doris15ThriftRpcHelper3rpcINS_21FrontendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEi
Line
Count
Source
65
6
                            std::function<void(ClientConnection<T>&)> callback, int timeout_ms) {
66
6
    return rpc<T>([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms);
67
6
}
Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_20BackendServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEi
Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_24TPaloBrokerServiceClientEEENS_6StatusERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiSt8functionIFvRNS_16ClientConnectionIT_EEEEi
68
69
template <typename T>
70
Status ThriftRpcHelper::rpc(std::function<TNetworkAddress()> address_provider,
71
18
                            std::function<void(ClientConnection<T>&)> callback, int timeout_ms) {
72
18
    TNetworkAddress address = address_provider();
73
18
    if (address.hostname.empty() || address.port == 0) {
74
0
        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available");
75
0
    }
76
18
    Status status;
77
18
    DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; });
78
18
    ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, timeout_ms, &status);
79
18
    if (!status.ok()) {
80
#ifndef ADDRESS_SANITIZER
81
        LOG(WARNING) << "Connect frontend failed, address=" << address << ", status=" << status;
82
#endif
83
0
        return status;
84
0
    }
85
18
    try {
86
18
        try {
87
18
            callback(client);
88
18
        } catch (apache::thrift::transport::TTransportException& e) {
89
0
            std::cerr << "thrift error, reason=" << e.what();
90
#ifndef ADDRESS_SANITIZER
91
            LOG(WARNING) << "retrying call frontend service after "
92
                         << config::thrift_client_retry_interval_ms << " ms, address=" << address
93
                         << ", reason=" << e.what();
94
#endif
95
0
            std::this_thread::sleep_for(
96
0
                    std::chrono::milliseconds(config::thrift_client_retry_interval_ms));
97
0
            TNetworkAddress retry_address = address_provider();
98
0
            if (retry_address.hostname.empty() || retry_address.port == 0) {
99
0
                return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available");
100
0
            }
101
0
            if (retry_address.hostname != address.hostname || retry_address.port != address.port) {
102
#ifndef ADDRESS_SANITIZER
103
                LOG(INFO) << "retrying call frontend service with new address=" << retry_address;
104
#endif
105
0
                Status retry_status;
106
0
                ClientConnection<T> retry_client(_s_exec_env->get_client_cache<T>(), retry_address,
107
0
                                                 timeout_ms, &retry_status);
108
0
                if (!retry_status.ok()) {
109
#ifndef ADDRESS_SANITIZER
110
                    LOG(WARNING) << "Connect frontend failed, address=" << retry_address
111
                                 << ", status=" << retry_status;
112
#endif
113
0
                    return retry_status;
114
0
                }
115
0
                callback(retry_client);
116
0
            } else {
117
0
                status = client.reopen(timeout_ms);
118
0
                if (!status.ok()) {
119
#ifndef ADDRESS_SANITIZER
120
                    LOG(WARNING) << "client reopen failed. address=" << address
121
                                 << ", status=" << status;
122
#endif
123
0
                    return status;
124
0
                }
125
0
                callback(client);
126
0
            }
127
0
        }
128
18
    } catch (apache::thrift::TException& e) {
129
#ifndef ADDRESS_SANITIZER
130
        LOG(WARNING) << "call frontend service failed, address=" << address
131
                     << ", reason=" << e.what();
132
#endif
133
0
        std::this_thread::sleep_for(
134
0
                std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
135
        // just reopen to disable this connection
136
0
        static_cast<void>(client.reopen(timeout_ms));
137
0
        return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}",
138
0
                                address.hostname, address.port, e.what());
139
0
    }
140
18
    return Status::OK();
141
18
}
_ZN5doris15ThriftRpcHelper3rpcINS_21FrontendServiceClientEEENS_6StatusESt8functionIFNS_15TNetworkAddressEvEES4_IFvRNS_16ClientConnectionIT_EEEEi
Line
Count
Source
71
18
                            std::function<void(ClientConnection<T>&)> callback, int timeout_ms) {
72
18
    TNetworkAddress address = address_provider();
73
18
    if (address.hostname.empty() || address.port == 0) {
74
0
        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available");
75
0
    }
76
18
    Status status;
77
18
    DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; });
78
18
    ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, timeout_ms, &status);
79
18
    if (!status.ok()) {
80
#ifndef ADDRESS_SANITIZER
81
        LOG(WARNING) << "Connect frontend failed, address=" << address << ", status=" << status;
82
#endif
83
0
        return status;
84
0
    }
85
18
    try {
86
18
        try {
87
18
            callback(client);
88
18
        } catch (apache::thrift::transport::TTransportException& e) {
89
0
            std::cerr << "thrift error, reason=" << e.what();
90
#ifndef ADDRESS_SANITIZER
91
            LOG(WARNING) << "retrying call frontend service after "
92
                         << config::thrift_client_retry_interval_ms << " ms, address=" << address
93
                         << ", reason=" << e.what();
94
#endif
95
0
            std::this_thread::sleep_for(
96
0
                    std::chrono::milliseconds(config::thrift_client_retry_interval_ms));
97
0
            TNetworkAddress retry_address = address_provider();
98
0
            if (retry_address.hostname.empty() || retry_address.port == 0) {
99
0
                return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available");
100
0
            }
101
0
            if (retry_address.hostname != address.hostname || retry_address.port != address.port) {
102
#ifndef ADDRESS_SANITIZER
103
                LOG(INFO) << "retrying call frontend service with new address=" << retry_address;
104
#endif
105
0
                Status retry_status;
106
0
                ClientConnection<T> retry_client(_s_exec_env->get_client_cache<T>(), retry_address,
107
0
                                                 timeout_ms, &retry_status);
108
0
                if (!retry_status.ok()) {
109
#ifndef ADDRESS_SANITIZER
110
                    LOG(WARNING) << "Connect frontend failed, address=" << retry_address
111
                                 << ", status=" << retry_status;
112
#endif
113
0
                    return retry_status;
114
0
                }
115
0
                callback(retry_client);
116
0
            } else {
117
0
                status = client.reopen(timeout_ms);
118
0
                if (!status.ok()) {
119
#ifndef ADDRESS_SANITIZER
120
                    LOG(WARNING) << "client reopen failed. address=" << address
121
                                 << ", status=" << status;
122
#endif
123
0
                    return status;
124
0
                }
125
0
                callback(client);
126
0
            }
127
0
        }
128
18
    } catch (apache::thrift::TException& e) {
129
#ifndef ADDRESS_SANITIZER
130
        LOG(WARNING) << "call frontend service failed, address=" << address
131
                     << ", reason=" << e.what();
132
#endif
133
0
        std::this_thread::sleep_for(
134
0
                std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
135
        // just reopen to disable this connection
136
0
        static_cast<void>(client.reopen(timeout_ms));
137
0
        return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}",
138
0
                                address.hostname, address.port, e.what());
139
0
    }
140
18
    return Status::OK();
141
18
}
Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_20BackendServiceClientEEENS_6StatusESt8functionIFNS_15TNetworkAddressEvEES4_IFvRNS_16ClientConnectionIT_EEEEi
Unexecuted instantiation: _ZN5doris15ThriftRpcHelper3rpcINS_24TPaloBrokerServiceClientEEENS_6StatusESt8functionIFNS_15TNetworkAddressEvEES4_IFvRNS_16ClientConnectionIT_EEEEi
142
143
template Status ThriftRpcHelper::rpc<FrontendServiceClient>(
144
        const std::string& ip, const int32_t port,
145
        std::function<void(ClientConnection<FrontendServiceClient>&)> callback, int timeout_ms);
146
147
template Status ThriftRpcHelper::rpc<FrontendServiceClient>(
148
        std::function<TNetworkAddress()> address_provider,
149
        std::function<void(ClientConnection<FrontendServiceClient>&)> callback, int timeout_ms);
150
151
template Status ThriftRpcHelper::rpc<BackendServiceClient>(
152
        const std::string& ip, const int32_t port,
153
        std::function<void(ClientConnection<BackendServiceClient>&)> callback, int timeout_ms);
154
155
template Status ThriftRpcHelper::rpc<TPaloBrokerServiceClient>(
156
        const std::string& ip, const int32_t port,
157
        std::function<void(ClientConnection<TPaloBrokerServiceClient>&)> callback, int timeout_ms);
158
159
} // namespace doris