Coverage Report

Created: 2026-03-12 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/broker_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/broker_mgr.h"
19
20
#include <gen_cpp/PaloBrokerService_types.h>
21
#include <gen_cpp/TPaloBrokerService.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <thrift/Thrift.h>
25
#include <thrift/transport/TTransportException.h>
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <sstream>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/metrics/doris_metrics.h"
33
#include "common/metrics/metrics.h"
34
#include "common/status.h"
35
#include "runtime/exec_env.h"
36
#include "service/backend_options.h"
37
#include "util/client_cache.h"
38
#include "util/hash_util.hpp"
39
#include "util/thread.h"
40
41
namespace doris {
42
43
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_count, MetricUnit::NOUNIT);
44
45
7
BrokerMgr::BrokerMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) {
46
7
    CHECK(Thread::create(
47
7
                  "BrokerMgr", "ping_worker", [this]() { this->ping_worker(); }, &_ping_thread)
48
7
                  .ok());
49
50
7
    REGISTER_HOOK_METRIC(broker_count, [this]() {
51
        // std::lock_guard<std::mutex> l(_mutex);
52
7
        return _broker_set.size();
53
7
    });
54
7
}
55
56
3
void BrokerMgr::stop() {
57
3
    DEREGISTER_HOOK_METRIC(broker_count);
58
3
    _stop_background_threads_latch.count_down();
59
3
    if (_ping_thread) {
60
3
        _ping_thread->join();
61
3
    }
62
3
}
63
64
7
void BrokerMgr::init() {
65
7
    std::stringstream ss;
66
7
    ss << BackendOptions::get_localhost() << ":" << config::be_port;
67
7
    _client_id = ss.str();
68
7
}
69
70
0
const std::string& BrokerMgr::get_client_id(const TNetworkAddress& address) {
71
0
    std::lock_guard<std::mutex> l(_mutex);
72
0
    _broker_set.insert(address);
73
0
    return _client_id;
74
0
}
75
76
0
void BrokerMgr::ping(const TNetworkAddress& addr) {
77
0
    TBrokerPingBrokerRequest request;
78
79
0
    request.__set_version(TBrokerVersion::VERSION_ONE);
80
0
    request.__set_clientId(_client_id);
81
82
0
    TBrokerOperationStatus response;
83
0
    try {
84
0
        Status status;
85
0
        BrokerServiceConnection client(_exec_env->broker_client_cache(), addr,
86
0
                                       config::thrift_rpc_timeout_ms, &status);
87
0
        if (!status.ok()) {
88
0
            LOG(WARNING) << "Create broker client failed. broker=" << addr << ", status=" << status;
89
0
            return;
90
0
        }
91
92
0
        try {
93
0
            client->ping(response, request);
94
0
        } catch (apache::thrift::transport::TTransportException& e) {
95
0
            status = client.reopen();
96
0
            if (!status.ok()) {
97
0
                LOG(WARNING) << "Create broker client failed. broker=" << addr
98
0
                             << ", status=" << status << ", reason=" << e.what();
99
0
                return;
100
0
            }
101
0
            client->ping(response, request);
102
0
        }
103
0
    } catch (apache::thrift::TException& e) {
104
0
        LOG(WARNING) << "Broker ping failed, broker:" << addr << " failed:" << e.what();
105
0
    }
106
0
}
107
108
7
void BrokerMgr::ping_worker() {
109
1.24k
    do {
110
1.24k
        std::vector<TNetworkAddress> addresses;
111
1.24k
        {
112
1.24k
            std::lock_guard<std::mutex> l(_mutex);
113
1.24k
            for (auto& addr : _broker_set) {
114
0
                addresses.emplace_back(addr);
115
0
            }
116
1.24k
        }
117
1.24k
        for (auto& addr : addresses) {
118
0
            ping(addr);
119
0
        }
120
1.24k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
121
7
}
122
123
} // namespace doris