Coverage Report

Created: 2024-11-20 16:51

/root/doris/be/src/runtime/client_cache.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/client_cache.h"
19
20
#include <memory>
21
#include <utility>
22
23
#include "common/logging.h"
24
#include "util/doris_metrics.h"
25
#include "util/network_util.h"
26
27
namespace doris {
28
29
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_used_clients, MetricUnit::NOUNIT,
30
                                   "Number of clients 'checked-out' from the cache");
31
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_opened_clients, MetricUnit::NOUNIT,
32
                                   "Total clients in the cache, including those in use");
33
34
0
ClientCacheHelper::~ClientCacheHelper() {
35
0
    for (auto& it : _client_map) {
36
0
        delete it.second;
37
0
    }
38
0
}
39
40
0
void ClientCacheHelper::_get_client_from_cache(const TNetworkAddress& hostport, void** client_key) {
41
0
    *client_key = nullptr;
42
0
    std::lock_guard<std::mutex> lock(_lock);
43
    //VLOG_RPC << "get_client(" << hostport << ")";
44
0
    auto cache_entry = _client_cache.find(hostport);
45
46
0
    if (cache_entry == _client_cache.end()) {
47
0
        cache_entry = _client_cache.insert(std::make_pair(hostport, std::list<void*>())).first;
48
0
        DCHECK(cache_entry != _client_cache.end());
49
0
    }
50
51
0
    std::list<void*>& info_list = cache_entry->second;
52
0
    if (!info_list.empty()) {
53
0
        *client_key = info_list.front();
54
0
        VLOG_RPC << "get_client(): cached client for " << hostport;
55
0
        info_list.pop_front();
56
0
    }
57
0
}
58
59
Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, ClientFactory& factory_method,
60
0
                                     void** client_key, int timeout_ms) {
61
0
    _get_client_from_cache(hostport, client_key);
62
0
    if (*client_key == nullptr) {
63
0
        RETURN_IF_ERROR(_create_client(hostport, factory_method, client_key, timeout_ms));
64
0
    }
65
66
0
    if (_metrics_enabled) {
67
0
        thrift_used_clients->increment(1);
68
0
    }
69
70
0
    return Status::OK();
71
0
}
72
73
Status ClientCacheHelper::reopen_client(ClientFactory& factory_method, void** client_key,
74
0
                                        int timeout_ms) {
75
0
    DCHECK(*client_key != nullptr) << "Trying to reopen nullptr client";
76
0
    ThriftClientImpl* client_to_close = nullptr;
77
0
    {
78
0
        std::lock_guard<std::mutex> lock(_lock);
79
0
        auto client_map_entry = _client_map.find(*client_key);
80
0
        DCHECK(client_map_entry != _client_map.end());
81
0
        client_to_close = client_map_entry->second;
82
0
    }
83
0
    const std::string ipaddress = client_to_close->ipaddress();
84
0
    int port = client_to_close->port();
85
86
0
    client_to_close->close();
87
88
    // TODO: Thrift TBufferedTransport cannot be re-opened after Close() because it does
89
    // not clean up internal buffers it reopens. To work around this issue, create a new
90
    // client instead.
91
0
    {
92
0
        std::lock_guard<std::mutex> lock(_lock);
93
0
        _client_map.erase(*client_key);
94
0
    }
95
0
    delete client_to_close;
96
0
    *client_key = nullptr;
97
98
0
    if (_metrics_enabled) {
99
0
        thrift_opened_clients->increment(-1);
100
0
    }
101
102
0
    RETURN_IF_ERROR(_create_client(make_network_address(ipaddress, port), factory_method,
103
0
                                   client_key, timeout_ms));
104
105
0
    return Status::OK();
106
0
}
107
108
Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
109
                                         ClientFactory& factory_method, void** client_key,
110
0
                                         int timeout_ms) {
111
0
    std::unique_ptr<ThriftClientImpl> client_impl(factory_method(hostport, client_key));
112
    //VLOG_CONNECTION << "create_client(): adding new client for "
113
    //                << client_impl->ipaddress() << ":" << client_impl->port();
114
115
0
    client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000);
116
117
0
    Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100);
118
119
0
    if (!status.ok()) {
120
0
        *client_key = nullptr;
121
0
        return status;
122
0
    }
123
124
0
    DCHECK(*client_key != nullptr);
125
0
    client_impl->set_send_timeout(timeout_ms);
126
0
    client_impl->set_recv_timeout(timeout_ms);
127
128
0
    {
129
0
        std::lock_guard<std::mutex> lock(_lock);
130
        // Because the client starts life 'checked out', we don't add it to the cache map
131
0
        DCHECK(_client_map.count(*client_key) == 0);
132
0
        _client_map[*client_key] = client_impl.release();
133
0
    }
134
135
0
    if (_metrics_enabled) {
136
0
        thrift_opened_clients->increment(1);
137
0
    }
138
139
0
    return Status::OK();
140
0
}
141
142
0
void ClientCacheHelper::release_client(void** client_key) {
143
0
    DCHECK(*client_key != nullptr) << "Trying to release nullptr client";
144
0
    ThriftClientImpl* client_to_close = nullptr;
145
0
    {
146
0
        std::lock_guard<std::mutex> lock(_lock);
147
0
        auto client_map_entry = _client_map.find(*client_key);
148
0
        DCHECK(client_map_entry != _client_map.end());
149
0
        client_to_close = client_map_entry->second;
150
151
0
        auto cache_list = _client_cache.find(
152
0
                make_network_address(client_to_close->ipaddress(), client_to_close->port()));
153
0
        DCHECK(cache_list != _client_cache.end());
154
0
        if (_max_cache_size_per_host >= 0 &&
155
0
            cache_list->second.size() >= _max_cache_size_per_host) {
156
            // cache of this host is full, close this client connection and remove if from _client_map
157
0
            _client_map.erase(*client_key);
158
0
        } else {
159
0
            cache_list->second.push_back(*client_key);
160
            // There is no need to close client if we put it to cache list.
161
0
            client_to_close = nullptr;
162
0
        }
163
0
    }
164
165
0
    if (client_to_close != nullptr) {
166
0
        client_to_close->close();
167
0
        delete client_to_close;
168
0
        if (_metrics_enabled) {
169
0
            thrift_opened_clients->increment(-1);
170
0
        }
171
0
    }
172
173
0
    if (_metrics_enabled) {
174
0
        thrift_used_clients->increment(-1);
175
0
    }
176
177
0
    *client_key = nullptr;
178
0
}
179
180
0
std::string ClientCacheHelper::debug_string() {
181
0
    std::stringstream out;
182
0
    out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " [";
183
184
0
    bool isfirst = true;
185
0
    for (const auto& [endpoint, client_keys] : _client_cache) {
186
0
        if (!isfirst) {
187
0
            out << " ";
188
0
            isfirst = false;
189
0
        }
190
0
        out << endpoint << ":" << client_keys.size();
191
0
    }
192
193
0
    out << "])";
194
0
    return out.str();
195
0
}
196
197
0
void ClientCacheHelper::init_metrics(const std::string& name) {
198
    // Not strictly needed if init_metrics is called before any cache
199
    // usage, but ensures that _metrics_enabled is published.
200
0
    std::lock_guard<std::mutex> lock(_lock);
201
202
0
    _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
203
0
            std::string("thrift_client.") + name, {{"name", name}});
204
0
    INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients);
205
0
    INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients);
206
207
0
    _metrics_enabled = true;
208
0
}
209
210
} // namespace doris