Coverage Report

Created: 2026-05-21 07:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/thrift_client.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/thrift_client.h"
19
20
#include <thrift/transport/TTransport.h>
21
#include <thrift/transport/TTransportException.h>
22
// IWYU pragma: no_include <bits/chrono.h>
23
#include <chrono> // IWYU pragma: keep
24
#include <memory>
25
#include <string>
26
#include <thread>
27
28
#include "absl/strings/substitute.h"
29
#include "util/client_connection_provider.h"
30
31
namespace doris {
32
33
ThriftClientImpl::ThriftClientImpl(const std::string& ipaddress, int port)
34
32
        : _ipaddress(ipaddress),
35
32
          _port(port),
36
32
          _socket(doris::client::create_thrift_client_socket(ipaddress, port)) {}
37
38
32
Status ThriftClientImpl::open() {
39
32
    if (!_socket) {
40
0
        return Status::RpcError("socket not created");
41
0
    }
42
43
32
    try {
44
32
        if (!_transport->isOpen()) {
45
32
            _transport->open();
46
32
        }
47
32
    } catch (const apache::thrift::transport::TTransportException& e) {
48
4
        try {
49
4
            _transport->close();
50
4
        } catch (const apache::thrift::transport::TTransportException& e) {
51
0
            VLOG_CRITICAL << "Error closing socket to: " << ipaddress() << ":" << port()
52
0
                          << ", ignoring (" << e.what() << ")";
53
0
        }
54
        // In certain cases in which the remote host is overloaded, this failure can
55
        // happen quite frequently. Let's print this error message without the stack
56
        // trace as there aren't many callers of this function.
57
4
        const std::string& err_msg = absl::Substitute("Couldn't open transport for $0:$1 ($2)",
58
4
                                                      ipaddress(), port(), e.what());
59
4
        VLOG_CRITICAL << err_msg;
60
4
        return Status::RpcError(err_msg);
61
4
    }
62
28
    return Status::OK();
63
32
}
64
65
32
Status ThriftClientImpl::open_with_retry(int num_tries, int wait_ms) {
66
32
    DCHECK_GE(wait_ms, 0);
67
32
    Status status;
68
32
    int try_count = 0L;
69
70
36
    while (num_tries <= 0 || try_count < num_tries) {
71
32
        ++try_count;
72
32
        status = open();
73
74
32
        if (status.ok()) {
75
28
            return status;
76
28
        }
77
78
32
        LOG(INFO) << "Unable to connect to " << _ipaddress << ":" << _port;
79
80
4
        if (num_tries < 0) {
81
0
            LOG(INFO) << "(Attempt " << try_count << ", will retry indefinitely)";
82
4
        } else {
83
4
            LOG(INFO) << "(Attempt " << try_count << " of " << num_tries << ")";
84
4
        }
85
86
4
        std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
87
4
    }
88
89
4
    return status;
90
32
}
91
92
4
void ThriftClientImpl::close() {
93
4
    try {
94
4
        if (_transport != nullptr && _transport->isOpen()) {
95
0
            _transport->close();
96
0
        }
97
4
    } catch (const apache::thrift::transport::TTransportException& e) {
98
0
        LOG(INFO) << "Error closing connection to: " << ipaddress() << ":" << port()
99
0
                  << ", ignoring (" << e.what() << ")";
100
        // Forcibly close the socket (since the transport may have failed to get that far
101
        // during close())
102
0
        try {
103
0
            if (_socket != nullptr) {
104
0
                _socket->close();
105
0
            }
106
0
        } catch (const apache::thrift::transport::TTransportException& e) {
107
            LOG(INFO) << "Error closing socket to: " << ipaddress() << ":" << port()
108
0
                      << ", ignoring (" << e.what() << ")";
109
0
        }
110
0
    }
111
4
}
112
113
} // namespace doris