Coverage Report

Created: 2026-06-03 15:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/thrift_client.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <thrift/protocol/TBinaryProtocol.h>
21
#include <thrift/transport/TBufferTransports.h>
22
#include <thrift/transport/TSSLSocket.h>
23
#include <thrift/transport/TSocket.h>
24
25
#include <memory>
26
#include <sstream>
27
#include <string>
28
29
#include "common/logging.h"
30
#include "common/status.h"
31
#include "util/thrift_server.h"
32
33
namespace apache {
34
namespace thrift {
35
namespace transport {
36
class TTransport;
37
} // namespace transport
38
} // namespace thrift
39
} // namespace apache
40
41
namespace doris {
42
43
#define THRIFT_MOVE_VALUES(thrift, member, value) \
44
7.95k
    thrift.__isset.member = true;                 \
45
7.95k
    thrift.member = std::move(value);
46
47
// Super class for templatized thrift clients.
48
class ThriftClientImpl {
49
public:
50
4
    virtual ~ThriftClientImpl() { close(); }
51
95.4k
    const std::string& ipaddress() { return _ipaddress; }
52
4
    int port() const { return _port; }
53
54
    // Open the connection to the remote server. May be called
55
    // repeatedly, is idempotent unless there is a failure to connect.
56
    Status open();
57
58
    // Retry the Open num_retries time waiting wait_ms milliseconds between retries.
59
    Status open_with_retry(int num_retries, int wait_ms);
60
61
    // close the connection with the remote server. May be called
62
    // repeatedly.
63
    void close();
64
65
    // Set the connect timeout
66
29
    void set_conn_timeout(int ms) {
67
29
        if (!_socket) {
68
0
            LOG(WARNING) << "socket is nullptr";
69
0
            return;
70
0
        }
71
29
        _socket->setConnTimeout(ms);
72
29
    }
73
74
    // Set the receive timeout
75
25
    void set_recv_timeout(int ms) {
76
25
        if (!_socket) {
77
0
            LOG(WARNING) << "socket is nullptr";
78
0
            return;
79
0
        }
80
25
        _socket->setRecvTimeout(ms);
81
25
    }
82
83
    // Set the send timeout
84
25
    void set_send_timeout(int ms) {
85
25
        if (!_socket) {
86
0
            LOG(WARNING) << "socket is nullptr";
87
0
            return;
88
0
        }
89
25
        _socket->setSendTimeout(ms);
90
25
    }
91
92
protected:
93
    ThriftClientImpl(const std::string& ipaddress, int port);
94
95
    std::string _ipaddress;
96
    int _port;
97
98
    // All shared pointers, because Thrift requires them to be
99
    std::shared_ptr<apache::thrift::transport::TSocket> _socket;
100
    std::shared_ptr<apache::thrift::transport::TTransport> _transport;
101
    std::shared_ptr<apache::thrift::protocol::TBinaryProtocol> _protocol;
102
};
103
104
// Utility client to a Thrift server. The parameter type is the
105
// Thrift interface type that the server implements.
106
template <class InterfaceType>
107
class ThriftClient : public ThriftClientImpl {
108
public:
109
    ThriftClient(const std::string& ipaddress, int port);
110
111
    ThriftClient(const std::string& ipaddress, int port, ThriftServer::ServerType server_type);
112
113
    // Returns the object used to actually make RPCs against the remote server
114
29
    InterfaceType* iface() { return _iface.get(); }
_ZN5doris12ThriftClientINS_21FrontendServiceClientEE5ifaceEv
Line
Count
Source
114
29
    InterfaceType* iface() { return _iface.get(); }
Unexecuted instantiation: _ZN5doris12ThriftClientINS_20BackendServiceClientEE5ifaceEv
Unexecuted instantiation: _ZN5doris12ThriftClientINS_24TPaloBrokerServiceClientEE5ifaceEv
115
116
private:
117
    std::shared_ptr<InterfaceType> _iface;
118
};
119
120
template <class InterfaceType>
121
ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port)
122
        : ThriftClientImpl(ipaddress, port) {
123
    _transport.reset(new apache::thrift::transport::TBufferedTransport(_socket));
124
    _transport->getConfiguration()->setMaxMessageSize(config::thrift_max_message_size);
125
    _protocol.reset(new apache::thrift::protocol::TBinaryProtocol(_transport));
126
    _iface.reset(new InterfaceType(_protocol));
127
}
128
129
template <class InterfaceType>
130
ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port,
131
                                          ThriftServer::ServerType server_type)
132
29
        : ThriftClientImpl(ipaddress, port) {
133
29
    switch (server_type) {
134
0
    case ThriftServer::NON_BLOCKING:
135
0
        _transport.reset(new apache::thrift::transport::TFramedTransport(_socket));
136
0
        _transport->getConfiguration()->setMaxMessageSize(config::thrift_max_message_size);
137
0
        break;
138
29
    case ThriftServer::THREADED:
139
29
    case ThriftServer::THREAD_POOL:
140
29
        _transport.reset(new apache::thrift::transport::TBufferedTransport(_socket));
141
29
        _transport->getConfiguration()->setMaxMessageSize(config::thrift_max_message_size);
142
29
        break;
143
0
    default:
144
0
        std::stringstream error_msg;
145
0
        error_msg << "Unsupported server type: " << server_type;
146
0
        LOG(ERROR) << error_msg.str();
147
0
        DCHECK(false);
148
0
        break;
149
29
    }
150
151
29
    _protocol.reset(new apache::thrift::protocol::TBinaryProtocol(_transport));
152
29
    _iface.reset(new InterfaceType(_protocol));
153
29
}
_ZN5doris12ThriftClientINS_21FrontendServiceClientEEC2ERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiNS_12ThriftServer10ServerTypeE
Line
Count
Source
132
29
        : ThriftClientImpl(ipaddress, port) {
133
29
    switch (server_type) {
134
0
    case ThriftServer::NON_BLOCKING:
135
0
        _transport.reset(new apache::thrift::transport::TFramedTransport(_socket));
136
0
        _transport->getConfiguration()->setMaxMessageSize(config::thrift_max_message_size);
137
0
        break;
138
29
    case ThriftServer::THREADED:
139
29
    case ThriftServer::THREAD_POOL:
140
29
        _transport.reset(new apache::thrift::transport::TBufferedTransport(_socket));
141
29
        _transport->getConfiguration()->setMaxMessageSize(config::thrift_max_message_size);
142
29
        break;
143
0
    default:
144
0
        std::stringstream error_msg;
145
0
        error_msg << "Unsupported server type: " << server_type;
146
0
        LOG(ERROR) << error_msg.str();
147
0
        DCHECK(false);
148
0
        break;
149
29
    }
150
151
29
    _protocol.reset(new apache::thrift::protocol::TBinaryProtocol(_transport));
152
29
    _iface.reset(new InterfaceType(_protocol));
153
29
}
Unexecuted instantiation: _ZN5doris12ThriftClientINS_20BackendServiceClientEEC2ERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiNS_12ThriftServer10ServerTypeE
Unexecuted instantiation: _ZN5doris12ThriftClientINS_24TPaloBrokerServiceClientEEC2ERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiNS_12ThriftServer10ServerTypeE
154
155
} // namespace doris