Coverage Report

Created: 2026-04-10 18:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/brpc_client_cache.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/adaptive_connection_type.h>
21
#include <brpc/adaptive_protocol_type.h>
22
#include <brpc/channel.h>
23
#include <brpc/controller.h>
24
#include <butil/endpoint.h>
25
#include <fmt/format.h>
26
#include <gen_cpp/Types_types.h>
27
#include <gen_cpp/types.pb.h>
28
#include <glog/logging.h>
29
#include <google/protobuf/service.h>
30
#include <parallel_hashmap/phmap.h>
31
#include <stddef.h>
32
33
#include <functional>
34
#include <memory>
35
#include <mutex>
36
#include <ostream>
37
#include <string>
38
#include <utility>
39
#include <vector>
40
41
#include "common/compiler_util.h" // IWYU pragma: keep
42
#include "common/config.h"
43
#include "common/status.h"
44
#include "runtime/exec_env.h"
45
#include "service/backend_options.h"
46
#include "util/dns_cache.h"
47
#include "util/network_util.h"
48
49
namespace doris {
50
class PBackendService_Stub;
51
class PFunctionService_Stub;
52
} // namespace doris
53
54
// Entry that holds both resolved IP and stub, similar to Java's BackendServiceClientExtIp
55
template <typename T>
56
struct StubEntry {
57
    std::string real_ip;
58
    std::shared_ptr<T> stub;
59
};
60
61
template <typename T>
62
using StubMap = phmap::parallel_flat_hash_map<
63
        std::string, StubEntry<T>, std::hash<std::string>, std::equal_to<std::string>,
64
        std::allocator<std::pair<const std::string, StubEntry<T>>>, 8, std::mutex>;
65
66
namespace doris {
67
class FailureDetectClosure : public ::google::protobuf::Closure {
68
public:
69
    FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st,
70
                         ::google::protobuf::RpcController* controller,
71
                         ::google::protobuf::Closure* done)
72
2.75M
            : _channel_st(channel_st), _controller(controller), _done(done) {}
73
74
2.73M
    void Run() override {
75
2.73M
        Defer defer {[&]() { delete this; }};
76
        // All brpc related API will use brpc::Controller, so that it is safe
77
        // to do static cast here.
78
2.73M
        auto* cntl = static_cast<brpc::Controller*>(_controller);
79
2.73M
        if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
80
1
            Status error_st = Status::NetworkError(
81
1
                    "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
82
1
                    berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(),
83
1
                    cntl->latency_us());
84
1
            LOG(WARNING) << error_st;
85
1
            _channel_st->update(error_st);
86
1
        }
87
        // Sometimes done == nullptr, for example hand_shake API.
88
2.75M
        if (_done != nullptr) {
89
2.75M
            _done->Run();
90
2.75M
        }
91
        // _done->Run may throw exception, so that move delete this to Defer.
92
        // delete this;
93
2.73M
    }
94
95
private:
96
    std::shared_ptr<AtomicStatus> _channel_st;
97
    ::google::protobuf::RpcController* _controller;
98
    ::google::protobuf::Closure* _done;
99
};
100
101
// This channel will use FailureDetectClosure to wrap the original closure
102
// If some non-recoverable rpc failure happens, it will save the error status in
103
// _channel_st.
104
// And brpc client cache will depend on it to detect if the client is health.
105
class FailureDetectChannel : public ::brpc::Channel {
106
public:
107
14
    FailureDetectChannel() : ::brpc::Channel() {
108
14
        _channel_st = std::make_shared<AtomicStatus>(); // default OK
109
14
    }
110
    void CallMethod(const google::protobuf::MethodDescriptor* method,
111
                    google::protobuf::RpcController* controller,
112
                    const google::protobuf::Message* request, google::protobuf::Message* response,
113
2.74M
                    google::protobuf::Closure* done) override {
114
2.74M
        FailureDetectClosure* failure_detect_closure = nullptr;
115
2.74M
        if (done != nullptr) {
116
            // If done == nullptr, then it means the call is sync call, so that should not
117
            // gen a failure detect closure for it. Or it will core.
118
2.73M
            failure_detect_closure = new FailureDetectClosure(_channel_st, controller, done);
119
2.73M
        }
120
2.74M
        ::brpc::Channel::CallMethod(method, controller, request, response, failure_detect_closure);
121
        // Done == nullptr, it is a sync call, should also deal with the bad channel.
122
2.74M
        if (done == nullptr) {
123
6.64k
            auto* cntl = static_cast<brpc::Controller*>(controller);
124
6.64k
            if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
125
1
                Status error_st = Status::NetworkError(
126
1
                        "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
127
1
                        berror(cntl->ErrorCode()), cntl->ErrorText(),
128
1
                        BackendOptions::get_localhost(), cntl->latency_us());
129
1
                LOG(WARNING) << error_st;
130
1
                _channel_st->update(error_st);
131
1
            }
132
6.64k
        }
133
2.74M
    }
134
135
2.79M
    std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; }
136
137
private:
138
    std::shared_ptr<AtomicStatus> _channel_st;
139
};
140
141
template <class T>
142
class BrpcClientCache {
143
public:
144
    BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "",
145
                    std::string connection_group = "");
146
    virtual ~BrpcClientCache();
147
148
    std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) {
149
        return get_client(butil::endpoint2str(endpoint).c_str());
150
    }
151
152
#ifdef BE_TEST
153
    virtual std::shared_ptr<T> get_client(const TNetworkAddress& taddr) {
154
        std::string host_port = fmt::format("{}:{}", taddr.hostname, taddr.port);
155
        return get_client(host_port);
156
    }
157
#else
158
19.1k
    std::shared_ptr<T> get_client(const TNetworkAddress& taddr) {
159
19.1k
        return get_client(taddr.hostname, taddr.port);
160
19.1k
    }
161
#endif
162
163
77
    std::shared_ptr<T> get_client(const PNetworkAddress& paddr) {
164
77
        return get_client(paddr.hostname(), paddr.port());
165
77
    }
166
167
2.77M
    std::shared_ptr<T> get_client(const std::string& host, int port) {
168
2.77M
        std::string realhost = host;
169
2.77M
        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
170
2.77M
        if (dns_cache == nullptr) {
171
8
            LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
172
2.77M
        } else if (!is_valid_ip(host)) {
173
0
            Status status = dns_cache->get(host, &realhost);
174
0
            if (!status.ok()) {
175
0
                LOG(WARNING) << "failed to get ip from host:" << status.to_string();
176
0
                return nullptr;
177
0
            }
178
0
        }
179
180
        // Use original host:port as key (like Java's TNetworkAddress address)
181
        // This allows us to detect IP changes when DNS resolution changes
182
2.77M
        std::string host_port = fmt::format("{}:{}", host, port);
183
184
2.77M
        std::shared_ptr<T> stub_ptr;
185
2.77M
        bool need_remove = false;
186
187
2.79M
        auto check_entry = [&](const auto& v) {
188
2.79M
            const StubEntry<T>& entry = v.second;
189
            // Check if cached IP matches current resolved IP
190
2.79M
            if (entry.real_ip != realhost) {
191
                // IP changed (DNS resolution changed)
192
0
                LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip
193
0
                             << ", current ip: " << realhost;
194
0
                need_remove = true;
195
2.79M
            } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel())
196
2.79M
                                ->channel_status()
197
2.79M
                                ->ok()) {
198
                // Client is not in normal state, need to recreate
199
                // At this point we cannot judge the progress of reconnecting the underlying channel.
200
                // In the worst case, it may take two minutes. But we can't stand the connection refused
201
                // for two minutes, so rebuild the channel directly.
202
2
                need_remove = true;
203
2.79M
            } else {
204
                // Cache hit: IP matches and client is healthy
205
2.79M
                stub_ptr = entry.stub;
206
2.79M
            }
207
2.79M
        };
Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
_ZZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
Line
Count
Source
187
2.79M
        auto check_entry = [&](const auto& v) {
188
2.79M
            const StubEntry<T>& entry = v.second;
189
            // Check if cached IP matches current resolved IP
190
2.79M
            if (entry.real_ip != realhost) {
191
                // IP changed (DNS resolution changed)
192
0
                LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip
193
0
                             << ", current ip: " << realhost;
194
0
                need_remove = true;
195
2.79M
            } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel())
196
2.79M
                                ->channel_status()
197
2.79M
                                ->ok()) {
198
                // Client is not in normal state, need to recreate
199
                // At this point we cannot judge the progress of reconnecting the underlying channel.
200
                // In the worst case, it may take two minutes. But we can't stand the connection refused
201
                // for two minutes, so rebuild the channel directly.
202
2
                need_remove = true;
203
2.79M
            } else {
204
                // Cache hit: IP matches and client is healthy
205
2.79M
                stub_ptr = entry.stub;
206
2.79M
            }
207
2.79M
        };
208
209
2.79M
        if (LIKELY(_stub_map.if_contains(host_port, check_entry))) {
210
2.79M
            if (stub_ptr != nullptr) {
211
2.79M
                return stub_ptr;
212
2.79M
            }
213
            // IP changed or client unhealthy, need to remove old entry
214
18.4E
            if (need_remove) {
215
2
                _stub_map.erase(host_port);
216
2
            }
217
18.4E
        }
218
219
        // Create new stub using resolved IP for actual connection
220
18.4E
        std::string real_host_port = get_host_port(realhost, port);
221
18.4E
        auto stub = get_new_client_no_cache(real_host_port);
222
18.4E
        if (stub != nullptr) {
223
13
            StubEntry<T> entry {realhost, stub};
224
13
            _stub_map.try_emplace_l(
225
13
                    host_port, [&stub](const auto& v) { stub = v.second.stub; }, entry);
Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E0_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
Unexecuted instantiation: _ZZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEiENKUlRKT_E0_clISt4pairIS9_9StubEntryIS1_EEEEDaSD_
226
13
        }
227
18.4E
        return stub;
228
2.77M
    }
Unexecuted instantiation: _ZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEi
_ZN5doris15BrpcClientCacheINS_20PBackendService_StubEE10get_clientERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEi
Line
Count
Source
167
2.77M
    std::shared_ptr<T> get_client(const std::string& host, int port) {
168
2.77M
        std::string realhost = host;
169
2.77M
        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
170
2.77M
        if (dns_cache == nullptr) {
171
8
            LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
172
2.77M
        } else if (!is_valid_ip(host)) {
173
0
            Status status = dns_cache->get(host, &realhost);
174
0
            if (!status.ok()) {
175
0
                LOG(WARNING) << "failed to get ip from host:" << status.to_string();
176
0
                return nullptr;
177
0
            }
178
0
        }
179
180
        // Use original host:port as key (like Java's TNetworkAddress address)
181
        // This allows us to detect IP changes when DNS resolution changes
182
2.77M
        std::string host_port = fmt::format("{}:{}", host, port);
183
184
2.77M
        std::shared_ptr<T> stub_ptr;
185
2.77M
        bool need_remove = false;
186
187
2.77M
        auto check_entry = [&](const auto& v) {
188
2.77M
            const StubEntry<T>& entry = v.second;
189
            // Check if cached IP matches current resolved IP
190
2.77M
            if (entry.real_ip != realhost) {
191
                // IP changed (DNS resolution changed)
192
2.77M
                LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip
193
2.77M
                             << ", current ip: " << realhost;
194
2.77M
                need_remove = true;
195
2.77M
            } else if (!static_cast<FailureDetectChannel*>(entry.stub->channel())
196
2.77M
                                ->channel_status()
197
2.77M
                                ->ok()) {
198
                // Client is not in normal state, need to recreate
199
                // At this point we cannot judge the progress of reconnecting the underlying channel.
200
                // In the worst case, it may take two minutes. But we can't stand the connection refused
201
                // for two minutes, so rebuild the channel directly.
202
2.77M
                need_remove = true;
203
2.77M
            } else {
204
                // Cache hit: IP matches and client is healthy
205
2.77M
                stub_ptr = entry.stub;
206
2.77M
            }
207
2.77M
        };
208
209
2.79M
        if (LIKELY(_stub_map.if_contains(host_port, check_entry))) {
210
2.79M
            if (stub_ptr != nullptr) {
211
2.79M
                return stub_ptr;
212
2.79M
            }
213
            // IP changed or client unhealthy, need to remove old entry
214
18.4E
            if (need_remove) {
215
2
                _stub_map.erase(host_port);
216
2
            }
217
18.4E
        }
218
219
        // Create new stub using resolved IP for actual connection
220
18.4E
        std::string real_host_port = get_host_port(realhost, port);
221
18.4E
        auto stub = get_new_client_no_cache(real_host_port);
222
18.4E
        if (stub != nullptr) {
223
13
            StubEntry<T> entry {realhost, stub};
224
13
            _stub_map.try_emplace_l(
225
13
                    host_port, [&stub](const auto& v) { stub = v.second.stub; }, entry);
226
13
        }
227
18.4E
        return stub;
228
2.77M
    }
229
230
0
    std::shared_ptr<T> get_client(const std::string& host_port) {
231
0
        const auto pos = host_port.rfind(':');
232
0
        std::string host = host_port.substr(0, pos);
233
0
        int port = 0;
234
0
        try {
235
0
            port = stoi(host_port.substr(pos + 1));
236
0
        } catch (const std::exception& err) {
237
0
            LOG(WARNING) << "failed to parse port from " << host_port << ": " << err.what();
238
0
            return nullptr;
239
0
        }
240
0
        return get_client(host, port);
241
0
    }
242
243
    std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
244
                                               const std::string& protocol = "",
245
                                               const std::string& connection_type = "",
246
14
                                               const std::string& connection_group = "") {
247
14
        brpc::ChannelOptions options;
248
14
        if (protocol != "") {
249
0
            options.protocol = protocol;
250
14
        } else if (_protocol != "") {
251
14
            options.protocol = _protocol;
252
14
        }
253
14
        if (connection_type != "") {
254
0
            options.connection_type = connection_type;
255
14
        } else if (_connection_type != "") {
256
2
            options.connection_type = _connection_type;
257
2
        }
258
14
        if (connection_group != "") {
259
0
            options.connection_group = connection_group;
260
14
        } else if (_connection_group != "") {
261
2
            options.connection_group = _connection_group;
262
2
        }
263
        // Add random connection id to connection_group to make sure use new socket
264
14
        options.connection_group += std::to_string(_connection_id.fetch_add(1));
265
14
        options.connect_timeout_ms = 2000;
266
14
        options.timeout_ms = 2000;
267
14
        options.max_retry = 10;
268
269
14
        std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel());
270
14
        int ret_code = 0;
271
14
        if (host_port.find("://") == std::string::npos) {
272
14
            ret_code = channel->Init(host_port.c_str(), &options);
273
14
        } else {
274
0
            ret_code =
275
0
                    channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options);
276
0
        }
277
14
        if (ret_code) {
278
1
            LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port;
279
1
            return nullptr;
280
1
        }
281
13
        return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
282
14
    }
_ZN5doris15BrpcClientCacheINS_20PBackendService_StubEE23get_new_client_no_cacheERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESA_SA_SA_
Line
Count
Source
246
14
                                               const std::string& connection_group = "") {
247
14
        brpc::ChannelOptions options;
248
14
        if (protocol != "") {
249
0
            options.protocol = protocol;
250
14
        } else if (_protocol != "") {
251
14
            options.protocol = _protocol;
252
14
        }
253
14
        if (connection_type != "") {
254
0
            options.connection_type = connection_type;
255
14
        } else if (_connection_type != "") {
256
2
            options.connection_type = _connection_type;
257
2
        }
258
14
        if (connection_group != "") {
259
0
            options.connection_group = connection_group;
260
14
        } else if (_connection_group != "") {
261
2
            options.connection_group = _connection_group;
262
2
        }
263
        // Add random connection id to connection_group to make sure use new socket
264
14
        options.connection_group += std::to_string(_connection_id.fetch_add(1));
265
14
        options.connect_timeout_ms = 2000;
266
14
        options.timeout_ms = 2000;
267
14
        options.max_retry = 10;
268
269
14
        std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel());
270
14
        int ret_code = 0;
271
14
        if (host_port.find("://") == std::string::npos) {
272
14
            ret_code = channel->Init(host_port.c_str(), &options);
273
14
        } else {
274
0
            ret_code =
275
0
                    channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options);
276
0
        }
277
14
        if (ret_code) {
278
1
            LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port;
279
1
            return nullptr;
280
1
        }
281
13
        return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
282
14
    }
Unexecuted instantiation: _ZN5doris15BrpcClientCacheINS_21PFunctionService_StubEE23get_new_client_no_cacheERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESA_SA_SA_
283
284
1
    size_t size() { return _stub_map.size(); }
285
286
0
    void clear() { _stub_map.clear(); }
287
288
0
    size_t erase(const std::string& host_port) { return _stub_map.erase(host_port); }
289
290
0
    size_t erase(const std::string& host, int port) {
291
0
        std::string host_port = fmt::format("{}:{}", host, port);
292
0
        return erase(host_port);
293
0
    }
294
295
0
    size_t erase(const butil::EndPoint& endpoint) {
296
0
        return _stub_map.erase(butil::endpoint2str(endpoint).c_str());
297
0
    }
298
299
0
    bool exist(const std::string& host_port) {
300
0
        return _stub_map.find(host_port) != _stub_map.end();
301
0
    }
302
303
0
    void get_all(std::vector<std::string>* endpoints) {
304
0
        for (auto it = _stub_map.begin(); it != _stub_map.end(); ++it) {
305
0
            endpoints->emplace_back(it->first.c_str());
306
0
        }
307
0
    }
308
309
    bool available(std::shared_ptr<T> stub, const butil::EndPoint& endpoint) {
310
        return available(stub, butil::endpoint2str(endpoint).c_str());
311
    }
312
313
1
    bool available(std::shared_ptr<T> stub, const std::string& host_port) {
314
1
        if (!stub) {
315
0
            LOG(WARNING) << "stub is null to: " << host_port;
316
0
            return false;
317
0
        }
318
1
        std::string message = "hello doris!";
319
1
        PHandShakeRequest request;
320
1
        request.set_hello(message);
321
1
        PHandShakeResponse response;
322
1
        brpc::Controller cntl;
323
1
        stub->hand_shake(&cntl, &request, &response, nullptr);
324
1
        if (cntl.Failed()) {
325
1
            LOG(WARNING) << "open brpc connection to " << host_port
326
1
                         << " failed: " << cntl.ErrorText();
327
1
            return false;
328
1
        } else if (response.has_status() && response.has_hello() && response.hello() == message &&
329
0
                   response.status().status_code() == 0) {
330
0
            return true;
331
0
        } else {
332
0
            LOG(WARNING) << "open brpc connection to " << host_port
333
0
                         << " failed: " << response.DebugString();
334
0
            return false;
335
0
        }
336
1
    }
337
338
1
    bool available(std::shared_ptr<T> stub, const std::string& host, int port) {
339
1
        std::string host_port = fmt::format("{}:{}", host, port);
340
1
        return available(stub, host_port);
341
1
    }
342
343
private:
344
    StubMap<T> _stub_map;
345
    const std::string _protocol;
346
    const std::string _connection_type;
347
    const std::string _connection_group;
348
    // use to generate unique connection id for each connection
349
    // to prevent the connection problem of brpc: https://github.com/apache/brpc/issues/2146
350
    std::atomic<int64_t> _connection_id {0};
351
};
352
353
using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>;
354
using FunctionServiceClientCache = BrpcClientCache<PFunctionService_Stub>;
355
} // namespace doris