Coverage Report

Created: 2026-03-14 13:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/agent/heartbeat_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/heartbeat_server.h"
19
20
#include <gen_cpp/HeartbeatService.h>
21
#include <gen_cpp/HeartbeatService_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <memory>
26
#include <ostream>
27
#include <string>
28
#include <vector>
29
30
#include "cloud/cloud_cluster_info.h"
31
#include "cloud/cloud_tablet_mgr.h"
32
#include "cloud/config.h"
33
#include "common/config.h"
34
#include "common/status.h"
35
#include "runtime/cluster_info.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/fragment_mgr.h"
38
#include "runtime/heartbeat_flags.h"
39
#include "service/backend_options.h"
40
#include "storage/storage_engine.h"
41
#include "util/debug_util.h"
42
#include "util/mem_info.h"
43
#include "util/network_util.h"
44
#include "util/thrift_server.h"
45
#include "util/time.h"
46
47
namespace apache {
48
namespace thrift {
49
class TProcessor;
50
} // namespace thrift
51
} // namespace apache
52
53
namespace doris {
54
55
HeartbeatServer::HeartbeatServer(ClusterInfo* cluster_info)
56
7
        : _engine(ExecEnv::GetInstance()->storage_engine()),
57
7
          _cluster_info(cluster_info),
58
7
          _fe_epoch(0) {
59
7
    _be_epoch = GetCurrentTimeMicros() / 1000;
60
7
}
61
62
7
void HeartbeatServer::init_cluster_id() {
63
7
    _cluster_info->cluster_id = _engine.effective_cluster_id();
64
7
}
65
66
void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
67
581
                                const TMasterInfo& master_info) {
68
    //print heartbeat in every minute
69
581
    LOG_EVERY_N(INFO, 12) << "get heartbeat from FE."
70
51
                          << "host:" << master_info.network_address.hostname
71
51
                          << ", rpc port:" << master_info.network_address.port
72
51
                          << ", cluster id:" << master_info.cluster_id
73
51
                          << ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
74
51
                          << ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
75
76
581
    MonotonicStopWatch watch;
77
581
    watch.start();
78
    // do heartbeat
79
581
    Status st = _heartbeat(master_info);
80
581
    st.to_thrift(&heartbeat_result.status);
81
82
581
    if (st.ok()) {
83
581
        heartbeat_result.backend_info.__set_be_port(config::be_port);
84
581
        heartbeat_result.backend_info.__set_http_port(config::webserver_port);
85
581
        heartbeat_result.backend_info.__set_be_rpc_port(-1);
86
581
        heartbeat_result.backend_info.__set_brpc_port(config::brpc_port);
87
581
        heartbeat_result.backend_info.__set_arrow_flight_sql_port(config::arrow_flight_sql_port);
88
581
        heartbeat_result.backend_info.__set_version(get_short_version());
89
581
        heartbeat_result.backend_info.__set_be_start_time(_be_epoch);
90
581
        heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
91
        // If be is gracefully stop, then k_doris_exist is set to true
92
581
        heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit);
93
581
        heartbeat_result.backend_info.__set_fragment_executing_count(
94
581
                get_fragment_executing_count());
95
581
        heartbeat_result.backend_info.__set_fragment_last_active_time(
96
581
                get_fragment_last_active_time());
97
581
        heartbeat_result.backend_info.__set_be_mem(MemInfo::physical_mem());
98
581
    }
99
581
    watch.stop();
100
581
    if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
101
0
        LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time()
102
0
                     << ", host:" << master_info.network_address.hostname
103
0
                     << ", port:" << master_info.network_address.port
104
0
                     << ", cluster id:" << master_info.cluster_id
105
0
                     << ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
106
0
                     << ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
107
0
    }
108
581
}
109
110
581
Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
111
581
    std::lock_guard<std::mutex> lk(_hb_mtx);
112
113
    // Check cluster id
114
581
    if (_cluster_info->cluster_id == -1) {
115
1
        LOG(INFO) << "get first heartbeat. update cluster id";
116
        // write and update cluster id
117
1
        RETURN_IF_ERROR(_engine.set_cluster_id(master_info.cluster_id));
118
119
1
        _cluster_info->cluster_id = master_info.cluster_id;
120
1
        LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname
121
1
                  << ". port: " << master_info.network_address.port
122
1
                  << ". cluster id: " << master_info.cluster_id
123
1
                  << ". frontend_infos: " << PrintFrontendInfos(master_info.frontend_infos);
124
580
    } else {
125
580
        if (_cluster_info->cluster_id != master_info.cluster_id) {
126
0
            return Status::InternalError(
127
0
                    "invalid cluster id. ignore. Record cluster id ={}, record frontend info {}. "
128
0
                    "Invalid cluster_id={}, invalid frontend info {}",
129
0
                    _cluster_info->cluster_id,
130
0
                    PrintFrontendInfos(ExecEnv::GetInstance()->get_frontends()),
131
0
                    master_info.cluster_id, PrintFrontendInfos(master_info.frontend_infos));
132
0
        }
133
580
    }
134
135
581
    if (master_info.__isset.backend_ip) {
136
        // master_info.backend_ip may be an IP or domain name, and it should be renamed 'backend_host', as it requires compatibility with historical versions, the name is still 'backend_ ip'
137
581
        if (master_info.backend_ip != BackendOptions::get_localhost()) {
138
3
            LOG(INFO) << master_info.backend_ip << " not equal to to backend localhost "
139
3
                      << BackendOptions::get_localhost();
140
            // step1: check master_info.backend_ip is IP or FQDN
141
3
            if (!is_valid_ip(master_info.backend_ip)) {
142
                //step2: resolve FQDN to IP
143
0
                std::string ip;
144
0
                Status status =
145
0
                        hostname_to_ip(master_info.backend_ip, ip, BackendOptions::is_bind_ipv6());
146
0
                if (!status.ok()) {
147
0
                    std::stringstream ss;
148
0
                    ss << "can not get ip from fqdn: " << status.to_string();
149
0
                    LOG(WARNING) << ss.str();
150
0
                    return status;
151
0
                }
152
0
                LOG(INFO) << "master_info.backend_ip: " << master_info.backend_ip
153
0
                          << ", hostname_to_ip: " << ip;
154
                //step3: get all ips of the interfaces on this machine
155
0
                std::vector<InetAddress> hosts;
156
0
                status = get_hosts(&hosts);
157
0
                if (!status.ok() || hosts.empty()) {
158
0
                    return Status::InternalError(
159
0
                            "the status was not ok when get_hosts, error is {}",
160
0
                            status.to_string());
161
0
                }
162
163
                //step4: check if the IP of FQDN belongs to the current machine and update BackendOptions._s_localhost
164
0
                bool set_new_localhost = false;
165
0
                for (auto& addr : hosts) {
166
0
                    if (addr.get_host_address() == ip) {
167
0
                        BackendOptions::set_localhost(master_info.backend_ip);
168
0
                        set_new_localhost = true;
169
0
                        break;
170
0
                    }
171
0
                }
172
173
0
                if (!set_new_localhost) {
174
0
                    return Status::InternalError(
175
0
                            "the host recorded in master is {}, but we cannot found the local ip "
176
0
                            "that mapped to that host. backend={}",
177
0
                            master_info.backend_ip, BackendOptions::get_localhost());
178
0
                }
179
3
            } else {
180
                // if is ip,not check anything,use it
181
3
                BackendOptions::set_localhost(master_info.backend_ip);
182
3
            }
183
184
3
            LOG(WARNING) << "update localhost done, the new localhost is "
185
3
                         << BackendOptions::get_localhost();
186
3
        }
187
581
    }
188
189
581
    bool need_report = false;
190
581
    if (_cluster_info->master_fe_addr.hostname != master_info.network_address.hostname ||
191
581
        _cluster_info->master_fe_addr.port != master_info.network_address.port) {
192
7
        if (master_info.epoch > _fe_epoch) {
193
7
            _cluster_info->master_fe_addr.hostname = master_info.network_address.hostname;
194
7
            _cluster_info->master_fe_addr.port = master_info.network_address.port;
195
7
            _fe_epoch = master_info.epoch;
196
7
            need_report = true;
197
7
            LOG(INFO) << "master change. new master host: "
198
7
                      << _cluster_info->master_fe_addr.hostname
199
7
                      << ". port: " << _cluster_info->master_fe_addr.port
200
7
                      << ". epoch: " << _fe_epoch;
201
7
        } else {
202
0
            return Status::InternalError(
203
0
                    "epoch is not greater than local. ignore heartbeat. host: {}, port: {}, local "
204
0
                    "epoch: {}, received epoch: {}",
205
0
                    _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
206
0
                    _fe_epoch, master_info.epoch);
207
0
        }
208
574
    } else {
209
        // when Master FE restarted, host and port remains the same, but epoch will be increased.
210
574
        if (master_info.epoch > _fe_epoch) {
211
0
            _fe_epoch = master_info.epoch;
212
0
            need_report = true;
213
0
            LOG(INFO) << "master restarted. epoch: " << _fe_epoch;
214
0
        }
215
574
    }
216
217
581
    if (master_info.__isset.token) {
218
581
        if (_cluster_info->token == "") {
219
7
            _cluster_info->token = master_info.token;
220
7
            LOG(INFO) << "get token. token: " << _cluster_info->token;
221
574
        } else if (_cluster_info->token != master_info.token) {
222
0
            return Status::InternalError("invalid token. local: {}, master: {}",
223
0
                                         _cluster_info->token, master_info.token);
224
0
        }
225
581
    }
226
227
581
    if (master_info.__isset.http_port) {
228
581
        _cluster_info->master_fe_http_port = master_info.http_port;
229
581
    }
230
231
581
    if (master_info.__isset.heartbeat_flags) {
232
581
        HeartbeatFlags* heartbeat_flags = ExecEnv::GetInstance()->heartbeat_flags();
233
581
        heartbeat_flags->update(master_info.heartbeat_flags);
234
581
    }
235
236
581
    if (master_info.__isset.backend_id) {
237
581
        _cluster_info->backend_id = master_info.backend_id;
238
581
        BackendOptions::set_backend_id(master_info.backend_id);
239
581
    }
240
581
    if (master_info.__isset.frontend_infos) {
241
581
        ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
242
581
    } else {
243
0
        LOG_EVERY_N(WARNING, 2) << fmt::format(
244
0
                "Heartbeat from {}:{} does not have frontend_infos, this may because we are "
245
0
                "upgrading cluster",
246
0
                master_info.network_address.hostname, master_info.network_address.port);
247
0
    }
248
249
581
    if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
250
0
        LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. "
251
0
                     << "FE cloud mode: "
252
0
                     << (master_info.__isset.meta_service_endpoint ? "true" : "false")
253
0
                     << ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false")
254
0
                     << ". If fe is earlier than version 3.0.2, the message can be ignored.";
255
0
    }
256
257
581
    if (master_info.__isset.meta_service_endpoint) {
258
416
        if (config::meta_service_endpoint.empty() && !master_info.meta_service_endpoint.empty()) {
259
0
            auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
260
0
                                         true);
261
0
            LOG(INFO) << "set config meta_service_endpoint " << master_info.meta_service_endpoint
262
0
                      << " " << st;
263
0
        }
264
265
416
        if (master_info.meta_service_endpoint != config::meta_service_endpoint) {
266
0
            LOG(WARNING) << "Detected mismatch in meta_service_endpoint configuration between FE "
267
0
                            "and BE. "
268
0
                         << "FE meta_service_endpoint: " << master_info.meta_service_endpoint
269
0
                         << ", BE meta_service_endpoint: " << config::meta_service_endpoint;
270
0
            std::vector<std::string> old_endpoints =
271
0
                    doris::split(config::meta_service_endpoint, ",");
272
0
            std::vector<std::string> new_endpoints =
273
0
                    doris::split(master_info.meta_service_endpoint, ",");
274
0
            auto has_intersection = false;
275
0
            for (auto endpoint : new_endpoints) {
276
0
                if (std::find(old_endpoints.begin(), old_endpoints.end(), endpoint) !=
277
0
                    old_endpoints.end()) {
278
0
                    has_intersection = true;
279
0
                }
280
0
            }
281
0
            if (has_intersection) {
282
0
                auto st = config::set_config("meta_service_endpoint",
283
0
                                             master_info.meta_service_endpoint, true);
284
0
                LOG(INFO) << "change config meta_service_endpoint to "
285
0
                          << master_info.meta_service_endpoint << " " << st;
286
0
            }
287
0
            if (!has_intersection && config::enable_meta_service_endpoint_consistency_check) {
288
0
                return Status::InvalidArgument<false>(
289
0
                        "fe and be do not work in same mode or meta_service_endpoint mismatch,"
290
0
                        "fe meta_service_endpoint: {}, be meta_service_endpoint: {}",
291
0
                        master_info.meta_service_endpoint, config::meta_service_endpoint);
292
0
            }
293
0
        }
294
416
    }
295
296
581
    if (master_info.__isset.cloud_unique_id &&
297
581
        config::cloud_unique_id != master_info.cloud_unique_id &&
298
581
        config::enable_use_cloud_unique_id_from_fe) {
299
0
        auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true);
300
0
        LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
301
0
    }
302
303
581
    if (master_info.__isset.cloud_cluster_info &&
304
581
        master_info.cloud_cluster_info.__isset.isStandby) {
305
416
        auto* cloud_cluster_info = static_cast<CloudClusterInfo*>(_cluster_info);
306
416
        cloud_cluster_info->set_is_in_standby(master_info.cloud_cluster_info.isStandby);
307
416
    }
308
309
581
    if (master_info.__isset.tablet_report_inactive_duration_ms) {
310
416
        doris::g_tablet_report_inactive_duration_ms =
311
416
                master_info.tablet_report_inactive_duration_ms;
312
416
    }
313
314
581
    if (master_info.__isset.auth_token) {
315
581
        if (_cluster_info->curr_auth_token == "") {
316
7
            _cluster_info->curr_auth_token = master_info.auth_token;
317
7
            LOG(INFO) << "set new auth token: " << master_info.auth_token;
318
574
        } else if (_cluster_info->curr_auth_token != master_info.auth_token) {
319
0
            LOG(INFO) << "last auth token: " << _cluster_info->last_auth_token
320
0
                      << "set new auth token: " << master_info.auth_token;
321
0
            _cluster_info->last_auth_token = _cluster_info->curr_auth_token;
322
0
            _cluster_info->curr_auth_token = master_info.auth_token;
323
0
        }
324
581
    }
325
326
581
    if (need_report) {
327
7
        LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
328
7
        _engine.notify_listeners();
329
7
    }
330
331
581
    return Status::OK();
332
581
}
333
334
Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port,
335
                               std::unique_ptr<ThriftServer>* thrift_server,
336
7
                               uint32_t worker_thread_num, ClusterInfo* cluster_info) {
337
7
    HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info);
338
7
    if (heartbeat_server == nullptr) {
339
0
        return Status::InternalError("Get heartbeat server failed");
340
0
    }
341
342
7
    heartbeat_server->init_cluster_id();
343
344
7
    std::shared_ptr<HeartbeatServer> handler(heartbeat_server);
345
7
    std::shared_ptr<HeartbeatServiceProcessor::TProcessor> server_processor(
346
7
            new HeartbeatServiceProcessor(handler));
347
7
    *thrift_server = std::make_unique<ThriftServer>("heartbeat", server_processor, server_port,
348
7
                                                    worker_thread_num);
349
7
    return Status::OK();
350
7
}
351
} // namespace doris