Coverage Report

Created: 2024-11-18 10:37

/root/doris/be/src/runtime/broker_mgr.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/broker_mgr.h"
19
20
#include <gen_cpp/PaloBrokerService_types.h>
21
#include <gen_cpp/TPaloBrokerService.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <thrift/Thrift.h>
25
#include <thrift/transport/TTransportException.h>
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <sstream>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "runtime/client_cache.h"
34
#include "runtime/exec_env.h"
35
#include "service/backend_options.h"
36
#include "util/doris_metrics.h"
37
#include "util/hash_util.hpp"
38
#include "util/metrics.h"
39
#include "util/thread.h"
40
41
namespace doris {
42
43
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_count, MetricUnit::NOUNIT);
44
45
0
BrokerMgr::BrokerMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) {
46
0
    CHECK(Thread::create(
47
0
                  "BrokerMgr", "ping_worker", [this]() { this->ping_worker(); }, &_ping_thread)
48
0
                  .ok());
49
50
0
    REGISTER_HOOK_METRIC(broker_count, [this]() {
51
        // std::lock_guard<std::mutex> l(_mutex);
52
0
        return _broker_set.size();
53
0
    });
54
0
}
55
56
0
BrokerMgr::~BrokerMgr() {
57
0
    DEREGISTER_HOOK_METRIC(broker_count);
58
0
    _stop_background_threads_latch.count_down();
59
0
    if (_ping_thread) {
60
0
        _ping_thread->join();
61
0
    }
62
0
}
63
64
0
void BrokerMgr::init() {
65
0
    std::stringstream ss;
66
0
    ss << BackendOptions::get_localhost() << ":" << config::be_port;
67
0
    _client_id = ss.str();
68
0
}
69
70
0
const std::string& BrokerMgr::get_client_id(const TNetworkAddress& address) {
71
0
    std::lock_guard<std::mutex> l(_mutex);
72
0
    _broker_set.insert(address);
73
0
    return _client_id;
74
0
}
75
76
0
void BrokerMgr::ping(const TNetworkAddress& addr) {
77
0
    TBrokerPingBrokerRequest request;
78
79
0
    request.__set_version(TBrokerVersion::VERSION_ONE);
80
0
    request.__set_clientId(_client_id);
81
82
0
    TBrokerOperationStatus response;
83
0
    try {
84
0
        Status status;
85
0
        BrokerServiceConnection client(_exec_env->broker_client_cache(), addr,
86
0
                                       config::thrift_rpc_timeout_ms, &status);
87
0
        if (!status.ok()) {
88
0
            LOG(WARNING) << "Create broker client failed. broker=" << addr << ", status=" << status;
89
0
            return;
90
0
        }
91
92
0
        try {
93
0
            client->ping(response, request);
94
0
        } catch (apache::thrift::transport::TTransportException& e) {
95
0
            status = client.reopen();
96
0
            if (!status.ok()) {
97
0
                LOG(WARNING) << "Create broker client failed. broker=" << addr
98
0
                             << ", status=" << status << ", reason=" << e.what();
99
0
                return;
100
0
            }
101
0
            client->ping(response, request);
102
0
        }
103
0
    } catch (apache::thrift::TException& e) {
104
0
        LOG(WARNING) << "Broker ping failed, broker:" << addr << " failed:" << e.what();
105
0
    }
106
0
}
107
108
0
void BrokerMgr::ping_worker() {
109
0
    do {
110
0
        std::vector<TNetworkAddress> addresses;
111
0
        {
112
0
            std::lock_guard<std::mutex> l(_mutex);
113
0
            for (auto& addr : _broker_set) {
114
0
                addresses.emplace_back(addr);
115
0
            }
116
0
        }
117
0
        for (auto& addr : addresses) {
118
0
            ping(addr);
119
0
        }
120
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
121
0
}
122
123
} // namespace doris