Coverage Report

Created: 2026-03-31 13:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/broker_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/broker_mgr.h"
19
20
#include <gen_cpp/PaloBrokerService_types.h>
21
#include <gen_cpp/TPaloBrokerService.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <thrift/Thrift.h>
25
#include <thrift/transport/TTransportException.h>
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <sstream>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/metrics/doris_metrics.h"
33
#include "common/metrics/metrics.h"
34
#include "common/status.h"
35
#include "runtime/exec_env.h"
36
#include "service/backend_options.h"
37
#include "util/client_cache.h"
38
#include "util/hash_util.hpp"
39
#include "util/thread.h"
40
41
namespace doris {
42
43
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_count, MetricUnit::NOUNIT);
44
45
5
BrokerMgr::BrokerMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) {
46
5
    CHECK(Thread::create(
47
5
                  "BrokerMgr", "ping_worker", [this]() { this->ping_worker(); }, &_ping_thread)
48
5
                  .ok());
49
50
5
    REGISTER_HOOK_METRIC(broker_count, [this]() {
51
        // std::lock_guard<std::mutex> l(_mutex);
52
5
        return _broker_set.size();
53
5
    });
54
5
}
55
56
2
void BrokerMgr::stop() {
57
2
    DEREGISTER_HOOK_METRIC(broker_count);
58
2
    _stop_background_threads_latch.count_down();
59
2
    if (_ping_thread) {
60
2
        _ping_thread->join();
61
2
    }
62
2
}
63
64
5
void BrokerMgr::init() {
65
5
    std::stringstream ss;
66
5
    ss << BackendOptions::get_localhost() << ":" << config::be_port;
67
5
    _client_id = ss.str();
68
5
}
69
70
2
const std::string& BrokerMgr::get_client_id(const TNetworkAddress& address) {
71
2
    std::lock_guard<std::mutex> l(_mutex);
72
2
    _broker_set.insert(address);
73
2
    return _client_id;
74
2
}
75
76
200
void BrokerMgr::ping(const TNetworkAddress& addr) {
77
200
    TBrokerPingBrokerRequest request;
78
79
200
    request.__set_version(TBrokerVersion::VERSION_ONE);
80
200
    request.__set_clientId(_client_id);
81
82
200
    TBrokerOperationStatus response;
83
200
    try {
84
200
        Status status;
85
200
        BrokerServiceConnection client(_exec_env->broker_client_cache(), addr,
86
200
                                       config::thrift_rpc_timeout_ms, &status);
87
200
        if (!status.ok()) {
88
0
            LOG(WARNING) << "Create broker client failed. broker=" << addr << ", status=" << status;
89
0
            return;
90
0
        }
91
92
200
        try {
93
200
            client->ping(response, request);
94
200
        } catch (apache::thrift::transport::TTransportException& e) {
95
0
            status = client.reopen();
96
0
            if (!status.ok()) {
97
0
                LOG(WARNING) << "Create broker client failed. broker=" << addr
98
0
                             << ", status=" << status << ", reason=" << e.what();
99
0
                return;
100
0
            }
101
0
            client->ping(response, request);
102
0
        }
103
200
    } catch (apache::thrift::TException& e) {
104
0
        LOG(WARNING) << "Broker ping failed, broker:" << addr << " failed:" << e.what();
105
0
    }
106
200
}
107
108
5
void BrokerMgr::ping_worker() {
109
2.65k
    do {
110
2.65k
        std::vector<TNetworkAddress> addresses;
111
2.65k
        {
112
2.65k
            std::lock_guard<std::mutex> l(_mutex);
113
2.65k
            for (auto& addr : _broker_set) {
114
200
                addresses.emplace_back(addr);
115
200
            }
116
2.65k
        }
117
2.65k
        for (auto& addr : addresses) {
118
200
            ping(addr);
119
200
        }
120
2.65k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
121
5
}
122
123
} // namespace doris