Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/brpc_client_cache.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/adaptive_connection_type.h>
21
#include <brpc/adaptive_protocol_type.h>
22
#include <brpc/channel.h>
23
#include <brpc/controller.h>
24
#include <butil/endpoint.h>
25
#include <fmt/format.h>
26
#include <gen_cpp/Types_types.h>
27
#include <gen_cpp/types.pb.h>
28
#include <glog/logging.h>
29
#include <google/protobuf/service.h>
30
#include <parallel_hashmap/phmap.h>
31
#include <stddef.h>
32
33
#include <functional>
34
#include <memory>
35
#include <mutex>
36
#include <ostream>
37
#include <string>
38
#include <utility>
39
#include <vector>
40
41
#include "common/compiler_util.h" // IWYU pragma: keep
42
#include "common/config.h"
43
#include "common/status.h"
44
#include "runtime/exec_env.h"
45
#include "service/backend_options.h"
46
#include "util/dns_cache.h"
47
#include "util/network_util.h"
48
49
namespace doris {
50
class PBackendService_Stub;
51
class PFunctionService_Stub;
52
} // namespace doris
53
54
// Entry that holds both resolved IP and stub, similar to Java's BackendServiceClientExtIp
55
template <typename T>
56
struct StubEntry {
57
    std::string real_ip;
58
    std::shared_ptr<T> stub;
59
};
60
61
template <typename T>
62
using StubMap = phmap::parallel_flat_hash_map<
63
        std::string, StubEntry<T>, std::hash<std::string>, std::equal_to<std::string>,
64
        std::allocator<std::pair<const std::string, StubEntry<T>>>, 8, std::mutex>;
65
66
namespace doris {
67
#include "common/compile_check_begin.h"
68
class FailureDetectClosure : public ::google::protobuf::Closure {
69
public:
70
    FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st,
71
                         ::google::protobuf::RpcController* controller,
72
                         ::google::protobuf::Closure* done)
73
2.91M
            : _channel_st(channel_st), _controller(controller), _done(done) {}
74
75
2.90M
    void Run() override {
76
2.90M
        Defer defer {[&]() { delete this; }};
77
        // All brpc related API will use brpc::Controller, so that it is safe
78
        // to do static cast here.
79
2.90M
        auto* cntl = static_cast<brpc::Controller*>(_controller);
80
2.90M
        if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
81
1
            Status error_st = Status::NetworkError(
82
1
                    "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
83
1
                    berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(),
84
1
                    cntl->latency_us());
85
1
            LOG(WARNING) << error_st;
86
1
            _channel_st->update(error_st);
87
1
        }
88
        // Sometimes done == nullptr, for example hand_shake API.
89
2.91M
        if (_done != nullptr) {
90
2.91M
            _done->Run();
91
2.91M
        }
92
        // _done->Run may throw exception, so that move delete this to Defer.
93
        // delete this;
94
2.90M
    }
95
96
private:
97
    std::shared_ptr<AtomicStatus> _channel_st;
98
    ::google::protobuf::RpcController* _controller;
99
    ::google::protobuf::Closure* _done;
100
};
101
102
// This channel will use FailureDetectClosure to wrap the original closure
103
// If some non-recoverable rpc failure happens, it will save the error status in
104
// _channel_st.
105
// And brpc client cache will depend on it to detect if the client is health.
106
class FailureDetectChannel : public ::brpc::Channel {
107
public:
108
20
    FailureDetectChannel() : ::brpc::Channel() {
109
20
        _channel_st = std::make_shared<AtomicStatus>(); // default OK
110
20
    }
111
    void CallMethod(const google::protobuf::MethodDescriptor* method,
112
                    google::protobuf::RpcController* controller,
113
                    const google::protobuf::Message* request, google::protobuf::Message* response,
114
2.90M
                    google::protobuf::Closure* done) override {
115
2.90M
        FailureDetectClosure* failure_detect_closure = nullptr;
116
2.90M
        if (done != nullptr) {
117
            // If done == nullptr, then it means the call is sync call, so that should not
118
            // gen a failure detect closure for it. Or it will core.
119
2.90M
            failure_detect_closure = new FailureDetectClosure(_channel_st, controller, done);
120
2.90M
        }
121
2.90M
        ::brpc::Channel::CallMethod(method, controller, request, response, failure_detect_closure);
122
        // Done == nullptr, it is a sync call, should also deal with the bad channel.
123
2.90M
        if (done == nullptr) {
124
7.29k
            auto* cntl = static_cast<brpc::Controller*>(controller);
125
7.29k
            if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
126
1
                Status error_st = Status::NetworkError(
127
1
                        "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
128
1
                        berror(cntl->ErrorCode()), cntl->ErrorText(),
129
1
                        BackendOptions::get_localhost(), cntl->latency_us());
130
1
                LOG(WARNING) << error_st;
131
1
                _channel_st->update(error_st);
132
1
            }
133
7.29k
        }
134
2.90M
    }
135
136
2.95M
    std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; }
137
138
private:
139
    std::shared_ptr<AtomicStatus> _channel_st;
140
};
141
142
template <class T>
143
class BrpcClientCache {
144
public:
145
    BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "",
146
                    std::string connection_group = "");
147
    virtual ~BrpcClientCache();
148
149
    std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) {
150
        return get_client(butil::endpoint2str(endpoint).c_str());
151
    }
152
153
#ifdef BE_TEST
154
    virtual std::shared_ptr<T> get_client(const TNetworkAddress& taddr) {
155
        std::string host_port = fmt::format("{}:{}", taddr.hostname, taddr.port);
156
        return get_client(host_port);
157
    }
158
#else
159
25.4k
    std::shared_ptr<T> get_client(const TNetworkAddress& taddr) {
160
25.4k
        return get_client(taddr.hostname, taddr.port);
161
25.4k
    }
162
#endif
163
164
139
    std::shared_ptr<T> get_client(const PNetworkAddress& paddr) {
165
139
        return get_client(paddr.hostname(), paddr.port());
166
139
    }
167
168
2.93M
    std::shared_ptr<T> get_client(const std::string& host, int port) {
169
2.93M
        std::string realhost = host;
170
2.93M
        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
171
2.93M
        if (dns_cache == nullptr) {
172
8
            LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
173
2.93M
        } else if (!is_valid_ip(host)) {
174
0
            Status status = dns_cache->get(host, &realhost);
175
0
            if (!status.ok()) {
176
0
                LOG(WARNING) << "failed to get ip from host:" << status.to_string();
177
0
                return nullptr;
178
0
            }
179
0
        }
180
181
        // Use original host:port as key (like Java's TNetworkAddress address)
182
        // This allows us to detect IP changes when DNS resolution changes
183
2.93M
        std::string host_port = fmt::format("{}:{}", host, port);
184
185
2.93M
        std::shared_ptr<T> stub_ptr;
186
2.93M
        bool need_remove = false;
187
188
2.95M
        auto check_entry = [&](const auto& v) {
189
2.95M
            const StubEntry<T>& entry = v.second;
190
            // Check if cached IP matches current resolved IP
191
2.95M
            if (entry.real_ip != realhost) {
192
                // IP changed (DNS resolution changed)
193
0
                LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip
194
0
                             << ", current ip: " << realhost;
195
0
                need_remove = true;
196
2.95M
            } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel())
197
2.95M
                                ->channel_status()
198
2.95M
                                ->ok()) {
199
                // Client is not in normal state, need to recreate
200
                // At this point we cannot judge the progress of reconnecting the underlying channel.
201
                // In the worst case, it may take two minutes. But we can't stand the connection refused
202
                // for two minutes, so rebuild the channel directly.
203
2
                need_remove = true;
204
2.95M
            } else {
205
                // Cache hit: IP matches and client is healthy
206
2.95M
                stub_ptr = entry.stub;
207
2.95M
            }
208
2.95M
        };
Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
_ZZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
Line
Count
Source
188
2.95M
        auto check_entry = [&](const auto& v) {
189
2.95M
            const StubEntry<T>& entry = v.second;
190
            // Check if cached IP matches current resolved IP
191
2.95M
            if (entry.real_ip != realhost) {
192
                // IP changed (DNS resolution changed)
193
0
                LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip
194
0
                             << ", current ip: " << realhost;
195
0
                need_remove = true;
196
2.95M
            } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel())
197
2.95M
                                ->channel_status()
198
2.95M
                                ->ok()) {
199
                // Client is not in normal state, need to recreate
200
                // At this point we cannot judge the progress of reconnecting the underlying channel.
201
                // In the worst case, it may take two minutes. But we can't stand the connection refused
202
                // for two minutes, so rebuild the channel directly.
203
2
                need_remove = true;
204
2.95M
            } else {
205
                // Cache hit: IP matches and client is healthy
206
2.95M
                stub_ptr = entry.stub;
207
2.95M
            }
208
2.95M
        };
209
210
2.95M
        if (LIKELY(_stub_map.if_contains(host_port, check_entry))) {
211
2.95M
            if (stub_ptr != nullptr) {
212
2.95M
                return stub_ptr;
213
2.95M
            }
214
            // IP changed or client unhealthy, need to remove old entry
215
18.4E
            if (need_remove) {
216
2
                _stub_map.erase(host_port);
217
2
            }
218
18.4E
        }
219
220
        // Create new stub using resolved IP for actual connection
221
18.4E
        std::string real_host_port = get_host_port(realhost, port);
222
18.4E
        auto stub = get_new_client_no_cache(real_host_port);
223
18.4E
        if (stub != nullptr) {
224
19
            StubEntry<T> entry {realhost, stub};
225
19
            _stub_map.try_emplace_l(
226
19
                    host_port, [&stub](const auto& v) { stub = v.second.stub; }, entry);
Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E0_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E0_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
227
19
        }
228
18.4E
        return stub;
229
2.93M
    }
Unexecuted instantiation: _ZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEi
_ZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEi
Line
Count
Source
168
2.93M
    std::shared_ptr<T> get_client(const std::string& host, int port) {
169
2.93M
        std::string realhost = host;
170
2.93M
        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
171
2.93M
        if (dns_cache == nullptr) {
172
8
            LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
173
2.93M
        } else if (!is_valid_ip(host)) {
174
0
            Status status = dns_cache->get(host, &realhost);
175
0
            if (!status.ok()) {
176
0
                LOG(WARNING) << "failed to get ip from host:" << status.to_string();
177
0
                return nullptr;
178
0
            }
179
0
        }
180
181
        // Use original host:port as key (like Java's TNetworkAddress address)
182
        // This allows us to detect IP changes when DNS resolution changes
183
2.93M
        std::string host_port = fmt::format("{}:{}", host, port);
184
185
2.93M
        std::shared_ptr<T> stub_ptr;
186
2.93M
        bool need_remove = false;
187
188
2.93M
        auto check_entry = [&](const auto& v) {
189
2.93M
            const StubEntry<T>& entry = v.second;
190
            // Check if cached IP matches current resolved IP
191
2.93M
            if (entry.real_ip != realhost) {
192
                // IP changed (DNS resolution changed)
193
2.93M
                LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip
194
2.93M
                             << ", current ip: " << realhost;
195
2.93M
                need_remove = true;
196
2.93M
            } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel())
197
2.93M
                                ->channel_status()
198
2.93M
                                ->ok()) {
199
                // Client is not in normal state, need to recreate
200
                // At this point we cannot judge the progress of reconnecting the underlying channel.
201
                // In the worst case, it may take two minutes. But we can't stand the connection refused
202
                // for two minutes, so rebuild the channel directly.
203
2.93M
                need_remove = true;
204
2.93M
            } else {
205
                // Cache hit: IP matches and client is healthy
206
2.93M
                stub_ptr = entry.stub;
207
2.93M
            }
208
2.93M
        };
209
210
2.95M
        if (LIKELY(_stub_map.if_contains(host_port, check_entry))) {
211
2.95M
            if (stub_ptr != nullptr) {
212
2.95M
                return stub_ptr;
213
2.95M
            }
214
            // IP changed or client unhealthy, need to remove old entry
215
18.4E
            if (need_remove) {
216
2
                _stub_map.erase(host_port);
217
2
            }
218
18.4E
        }
219
220
        // Create new stub using resolved IP for actual connection
221
18.4E
        std::string real_host_port = get_host_port(realhost, port);
222
18.4E
        auto stub = get_new_client_no_cache(real_host_port);
223
18.4E
        if (stub != nullptr) {
224
19
            StubEntry<T> entry {realhost, stub};
225
19
            _stub_map.try_emplace_l(
226
19
                    host_port, [&stub](const auto& v) { stub = v.second.stub; }, entry);
227
19
        }
228
18.4E
        return stub;
229
2.93M
    }
230
231
0
    std::shared_ptr<T> get_client(const std::string& host_port) {
232
0
        const auto pos = host_port.rfind(':');
233
0
        std::string host = host_port.substr(0, pos);
234
0
        int port = 0;
235
0
        try {
236
0
            port = stoi(host_port.substr(pos + 1));
237
0
        } catch (const std::exception& err) {
238
0
            LOG(WARNING) << "failed to parse port from " << host_port << ": " << err.what();
239
0
            return nullptr;
240
0
        }
241
0
        return get_client(host, port);
242
0
    }
243
244
    std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
245
                                               const std::string& protocol = "",
246
                                               const std::string& connection_type = "",
247
20
                                               const std::string& connection_group = "") {
248
20
        brpc::ChannelOptions options;
249
20
        if (protocol != "") {
250
0
            options.protocol = protocol;
251
20
        } else if (_protocol != "") {
252
20
            options.protocol = _protocol;
253
20
        }
254
20
        if (connection_type != "") {
255
0
            options.connection_type = connection_type;
256
20
        } else if (_connection_type != "") {
257
4
            options.connection_type = _connection_type;
258
4
        }
259
20
        if (connection_group != "") {
260
0
            options.connection_group = connection_group;
261
20
        } else if (_connection_group != "") {
262
4
            options.connection_group = _connection_group;
263
4
        }
264
        // Add random connection id to connection_group to make sure use new socket
265
20
        options.connection_group += std::to_string(_connection_id.fetch_add(1));
266
20
        options.connect_timeout_ms = 2000;
267
20
        options.timeout_ms = 2000;
268
20
        options.max_retry = 10;
269
270
20
        std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel());
271
20
        int ret_code = 0;
272
20
        if (host_port.find("://") == std::string::npos) {
273
20
            ret_code = channel->Init(host_port.c_str(), &options);
274
20
        } else {
275
0
            ret_code =
276
0
                    channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options);
277
0
        }
278
20
        if (ret_code) {
279
1
            LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port;
280
1
            return nullptr;
281
1
        }
282
19
        return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
283
20
    }
_ZN5doris15BrpcClientCacheINS_20PBackendService_StubEE23get_new_client_no_cacheERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESA_SA_SA_
Line
Count
Source
247
20
                                               const std::string& connection_group = "") {
248
20
        brpc::ChannelOptions options;
249
20
        if (protocol != "") {
250
0
            options.protocol = protocol;
251
20
        } else if (_protocol != "") {
252
20
            options.protocol = _protocol;
253
20
        }
254
20
        if (connection_type != "") {
255
0
            options.connection_type = connection_type;
256
20
        } else if (_connection_type != "") {
257
4
            options.connection_type = _connection_type;
258
4
        }
259
20
        if (connection_group != "") {
260
0
            options.connection_group = connection_group;
261
20
        } else if (_connection_group != "") {
262
4
            options.connection_group = _connection_group;
263
4
        }
264
        // Add random connection id to connection_group to make sure use new socket
265
20
        options.connection_group += std::to_string(_connection_id.fetch_add(1));
266
20
        options.connect_timeout_ms = 2000;
267
20
        options.timeout_ms = 2000;
268
20
        options.max_retry = 10;
269
270
20
        std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel());
271
20
        int ret_code = 0;
272
20
        if (host_port.find("://") == std::string::npos) {
273
20
            ret_code = channel->Init(host_port.c_str(), &options);
274
20
        } else {
275
0
            ret_code =
276
0
                    channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options);
277
0
        }
278
20
        if (ret_code) {
279
1
            LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port;
280
1
            return nullptr;
281
1
        }
282
19
        return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
283
20
    }
Unexecuted instantiation: _ZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE23get_new_client_no_cacheERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESA_SA_SA_
284
285
1
    size_t size() { return _stub_map.size(); }
286
287
0
    void clear() { _stub_map.clear(); }
288
289
0
    size_t erase(const std::string& host_port) { return _stub_map.erase(host_port); }
290
291
0
    size_t erase(const std::string& host, int port) {
292
0
        std::string host_port = fmt::format("{}:{}", host, port);
293
0
        return erase(host_port);
294
0
    }
295
296
0
    size_t erase(const butil::EndPoint& endpoint) {
297
0
        return _stub_map.erase(butil::endpoint2str(endpoint).c_str());
298
0
    }
299
300
0
    bool exist(const std::string& host_port) {
301
0
        return _stub_map.find(host_port) != _stub_map.end();
302
0
    }
303
304
0
    void get_all(std::vector<std::string>* endpoints) {
305
0
        for (auto it = _stub_map.begin(); it != _stub_map.end(); ++it) {
306
0
            endpoints->emplace_back(it->first.c_str());
307
0
        }
308
0
    }
309
310
    bool available(std::shared_ptr<T> stub, const butil::EndPoint& endpoint) {
311
        return available(stub, butil::endpoint2str(endpoint).c_str());
312
    }
313
314
1
    bool available(std::shared_ptr<T> stub, const std::string& host_port) {
315
1
        if (!stub) {
316
0
            LOG(WARNING) << "stub is null to: " << host_port;
317
0
            return false;
318
0
        }
319
1
        std::string message = "hello doris!";
320
1
        PHandShakeRequest request;
321
1
        request.set_hello(message);
322
1
        PHandShakeResponse response;
323
1
        brpc::Controller cntl;
324
1
        stub->hand_shake(&cntl, &request, &response, nullptr);
325
1
        if (cntl.Failed()) {
326
1
            LOG(WARNING) << "open brpc connection to " << host_port
327
1
                         << " failed: " << cntl.ErrorText();
328
1
            return false;
329
1
        } else if (response.has_status() && response.has_hello() && response.hello() == message &&
330
0
                   response.status().status_code() == 0) {
331
0
            return true;
332
0
        } else {
333
0
            LOG(WARNING) << "open brpc connection to " << host_port
334
0
                         << " failed: " << response.DebugString();
335
0
            return false;
336
0
        }
337
1
    }
338
339
1
    bool available(std::shared_ptr<T> stub, const std::string& host, int port) {
340
1
        std::string host_port = fmt::format("{}:{}", host, port);
341
1
        return available(stub, host_port);
342
1
    }
343
344
private:
345
    StubMap<T> _stub_map;
346
    const std::string _protocol;
347
    const std::string _connection_type;
348
    const std::string _connection_group;
349
    // use to generate unique connection id for each connection
350
    // to prevent the connection problem of brpc: https://github.com/apache/brpc/issues/2146
351
    std::atomic<int64_t> _connection_id {0};
352
};
353
354
using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>;
355
using FunctionServiceClientCache = BrpcClientCache<PFunctionService_Stub>;
356
#include "common/compile_check_end.h"
357
} // namespace doris