Coverage Report

Created: 2025-04-15 14:04

/root/doris/be/src/runtime/client_cache.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/client_cache.h"
19
20
#include <memory>
21
#include <utility>
22
23
#include "common/logging.h"
24
#include "util/doris_metrics.h"
25
#include "util/network_util.h"
26
27
namespace doris {
28
29
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_used_clients, MetricUnit::NOUNIT,
30
                                   "Number of clients 'checked-out' from the cache");
31
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_opened_clients, MetricUnit::NOUNIT,
32
                                   "Total clients in the cache, including those in use");
33
34
0
ClientCacheHelper::~ClientCacheHelper() {
35
0
    for (auto& it : _client_map) {
36
0
        delete it.second;
37
0
    }
38
0
}
39
40
0
void ClientCacheHelper::_get_client_from_cache(const TNetworkAddress& hostport, void** client_key) {
41
0
    *client_key = nullptr;
42
0
    std::lock_guard<std::mutex> lock(_lock);
43
    //VLOG_RPC << "get_client(" << hostport << ")";
44
0
    auto cache_entry = _client_cache.find(hostport);
45
46
0
    if (cache_entry == _client_cache.end()) {
47
0
        cache_entry = _client_cache.insert(std::make_pair(hostport, std::list<void*>())).first;
48
0
        DCHECK(cache_entry != _client_cache.end());
49
0
    }
50
51
0
    std::list<void*>& info_list = cache_entry->second;
52
0
    if (!info_list.empty()) {
53
0
        *client_key = info_list.front();
54
0
        VLOG_RPC << "get_client(): cached client for " << hostport;
55
0
        info_list.pop_front();
56
0
    }
57
0
}
58
59
Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, ClientFactory& factory_method,
60
0
                                     void** client_key, int timeout_ms) {
61
0
    _get_client_from_cache(hostport, client_key);
62
0
    if (*client_key == nullptr) {
63
0
        RETURN_IF_ERROR(_create_client(hostport, factory_method, client_key, timeout_ms));
64
0
    }
65
66
0
    if (_metrics_enabled) {
67
0
        thrift_used_clients->increment(1);
68
0
    }
69
70
0
    return Status::OK();
71
0
}
72
73
Status ClientCacheHelper::reopen_client(ClientFactory& factory_method, void** client_key,
74
0
                                        int timeout_ms) {
75
0
    DCHECK(*client_key != nullptr) << "Trying to reopen nullptr client";
76
0
    ThriftClientImpl* client_to_close = nullptr;
77
0
    {
78
0
        std::lock_guard<std::mutex> lock(_lock);
79
0
        auto client_map_entry = _client_map.find(*client_key);
80
0
        DCHECK(client_map_entry != _client_map.end());
81
0
        client_to_close = client_map_entry->second;
82
0
    }
83
0
    const std::string ipaddress = client_to_close->ipaddress();
84
0
    int port = client_to_close->port();
85
86
0
    client_to_close->close();
87
88
    // TODO: Thrift TBufferedTransport cannot be re-opened after Close() because it does
89
    // not clean up internal buffers it reopens. To work around this issue, create a new
90
    // client instead.
91
0
    {
92
0
        std::lock_guard<std::mutex> lock(_lock);
93
0
        _client_map.erase(*client_key);
94
0
    }
95
0
    delete client_to_close;
96
0
    *client_key = nullptr;
97
98
0
    if (_metrics_enabled) {
99
0
        thrift_opened_clients->increment(-1);
100
0
    }
101
102
0
    RETURN_IF_ERROR(_create_client(make_network_address(ipaddress, port), factory_method,
103
0
                                   client_key, timeout_ms));
104
105
0
    return Status::OK();
106
0
}
107
108
Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
109
                                         ClientFactory& factory_method, void** client_key,
110
0
                                         int timeout_ms) {
111
0
    std::unique_ptr<ThriftClientImpl> client_impl(factory_method(hostport, client_key));
112
    //VLOG_CONNECTION << "create_client(): adding new client for "
113
    //                << client_impl->ipaddress() << ":" << client_impl->port();
114
115
0
    client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000);
116
117
0
    Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100);
118
119
0
    if (!status.ok()) {
120
0
        *client_key = nullptr;
121
0
        return status;
122
0
    }
123
124
0
    DCHECK(*client_key != nullptr);
125
    // In thrift, timeout == 0, means wait infinitely, so that it should not happen.
126
    // See https://github.com/apache/thrift/blob/master/lib/cpp/src/thrift/transport/TSocket.cpp.
127
    // There is some code like this:      int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
128
    // If the developer missed to set the timeout, we should use default timeout, not infinitely.
129
    // See https://linux.die.net/man/2/poll. Specifying a negative value in timeout means an infinite timeout.
130
0
    client_impl->set_send_timeout(timeout_ms == 0 ? config::thrift_rpc_timeout_ms : timeout_ms);
131
0
    client_impl->set_recv_timeout(timeout_ms == 0 ? config::thrift_rpc_timeout_ms : timeout_ms);
132
133
0
    {
134
0
        std::lock_guard<std::mutex> lock(_lock);
135
        // Because the client starts life 'checked out', we don't add it to the cache map
136
0
        DCHECK(_client_map.count(*client_key) == 0);
137
0
        _client_map[*client_key] = client_impl.release();
138
0
    }
139
140
0
    if (_metrics_enabled) {
141
0
        thrift_opened_clients->increment(1);
142
0
    }
143
144
0
    return Status::OK();
145
0
}
146
147
0
void ClientCacheHelper::release_client(void** client_key) {
148
0
    DCHECK(*client_key != nullptr) << "Trying to release nullptr client";
149
0
    ThriftClientImpl* client_to_close = nullptr;
150
0
    {
151
0
        std::lock_guard<std::mutex> lock(_lock);
152
0
        auto client_map_entry = _client_map.find(*client_key);
153
0
        DCHECK(client_map_entry != _client_map.end());
154
0
        client_to_close = client_map_entry->second;
155
156
0
        auto cache_list = _client_cache.find(
157
0
                make_network_address(client_to_close->ipaddress(), client_to_close->port()));
158
0
        DCHECK(cache_list != _client_cache.end());
159
0
        if (_max_cache_size_per_host >= 0 &&
160
0
            cache_list->second.size() >= _max_cache_size_per_host) {
161
            // cache of this host is full, close this client connection and remove if from _client_map
162
0
            _client_map.erase(*client_key);
163
0
        } else {
164
0
            cache_list->second.push_back(*client_key);
165
            // There is no need to close client if we put it to cache list.
166
0
            client_to_close = nullptr;
167
0
        }
168
0
    }
169
170
0
    if (client_to_close != nullptr) {
171
0
        client_to_close->close();
172
0
        delete client_to_close;
173
0
        if (_metrics_enabled) {
174
0
            thrift_opened_clients->increment(-1);
175
0
        }
176
0
    }
177
178
0
    if (_metrics_enabled) {
179
0
        thrift_used_clients->increment(-1);
180
0
    }
181
182
0
    *client_key = nullptr;
183
0
}
184
185
0
std::string ClientCacheHelper::debug_string() {
186
0
    std::stringstream out;
187
0
    out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " [";
188
189
0
    bool isfirst = true;
190
0
    for (const auto& [endpoint, client_keys] : _client_cache) {
191
0
        if (!isfirst) {
192
0
            out << " ";
193
0
            isfirst = false;
194
0
        }
195
0
        out << endpoint << ":" << client_keys.size();
196
0
    }
197
198
0
    out << "])";
199
0
    return out.str();
200
0
}
201
202
0
void ClientCacheHelper::init_metrics(const std::string& name) {
203
    // Not strictly needed if init_metrics is called before any cache
204
    // usage, but ensures that _metrics_enabled is published.
205
0
    std::lock_guard<std::mutex> lock(_lock);
206
207
0
    _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
208
0
            std::string("thrift_client.") + name, {{"name", name}});
209
0
    INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients);
210
0
    INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients);
211
212
0
    _metrics_enabled = true;
213
0
}
214
215
} // namespace doris