be/src/agent/heartbeat_server.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "agent/heartbeat_server.h" |
19 | | |
20 | | #include <gen_cpp/HeartbeatService.h> |
21 | | #include <gen_cpp/HeartbeatService_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <glog/logging.h> |
24 | | |
25 | | #include <memory> |
26 | | #include <ostream> |
27 | | #include <string> |
28 | | #include <vector> |
29 | | |
30 | | #include "cloud/cloud_cluster_info.h" |
31 | | #include "cloud/cloud_tablet_mgr.h" |
32 | | #include "cloud/config.h" |
33 | | #include "common/config.h" |
34 | | #include "common/status.h" |
35 | | #include "runtime/cluster_info.h" |
36 | | #include "runtime/exec_env.h" |
37 | | #include "runtime/fragment_mgr.h" |
38 | | #include "runtime/heartbeat_flags.h" |
39 | | #include "service/backend_options.h" |
40 | | #include "storage/storage_engine.h" |
41 | | #include "util/debug_util.h" |
42 | | #include "util/mem_info.h" |
43 | | #include "util/network_util.h" |
44 | | #include "util/thrift_server.h" |
45 | | #include "util/time.h" |
46 | | |
47 | | namespace apache { |
48 | | namespace thrift { |
49 | | class TProcessor; |
50 | | } // namespace thrift |
51 | | } // namespace apache |
52 | | |
53 | | namespace doris { |
54 | | |
55 | | HeartbeatServer::HeartbeatServer(ClusterInfo* cluster_info) |
56 | 7 | : _engine(ExecEnv::GetInstance()->storage_engine()), |
57 | 7 | _cluster_info(cluster_info), |
58 | 7 | _fe_epoch(0) { |
59 | 7 | _be_epoch = GetCurrentTimeMicros() / 1000; |
60 | 7 | } |
61 | | |
62 | 7 | void HeartbeatServer::init_cluster_id() { |
63 | 7 | _cluster_info->cluster_id = _engine.effective_cluster_id(); |
64 | 7 | } |
65 | | |
66 | | void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, |
67 | 581 | const TMasterInfo& master_info) { |
68 | | //print heartbeat in every minute |
69 | 581 | LOG_EVERY_N(INFO, 12) << "get heartbeat from FE." |
70 | 51 | << "host:" << master_info.network_address.hostname |
71 | 51 | << ", rpc port:" << master_info.network_address.port |
72 | 51 | << ", cluster id:" << master_info.cluster_id |
73 | 51 | << ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos) |
74 | 51 | << ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch; |
75 | | |
76 | 581 | MonotonicStopWatch watch; |
77 | 581 | watch.start(); |
78 | | // do heartbeat |
79 | 581 | Status st = _heartbeat(master_info); |
80 | 581 | st.to_thrift(&heartbeat_result.status); |
81 | | |
82 | 581 | if (st.ok()) { |
83 | 581 | heartbeat_result.backend_info.__set_be_port(config::be_port); |
84 | 581 | heartbeat_result.backend_info.__set_http_port(config::webserver_port); |
85 | 581 | heartbeat_result.backend_info.__set_be_rpc_port(-1); |
86 | 581 | heartbeat_result.backend_info.__set_brpc_port(config::brpc_port); |
87 | 581 | heartbeat_result.backend_info.__set_arrow_flight_sql_port(config::arrow_flight_sql_port); |
88 | 581 | heartbeat_result.backend_info.__set_version(get_short_version()); |
89 | 581 | heartbeat_result.backend_info.__set_be_start_time(_be_epoch); |
90 | 581 | heartbeat_result.backend_info.__set_be_node_role(config::be_node_role); |
91 | | // If be is gracefully stop, then k_doris_exist is set to true |
92 | 581 | heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit); |
93 | 581 | heartbeat_result.backend_info.__set_fragment_executing_count( |
94 | 581 | get_fragment_executing_count()); |
95 | 581 | heartbeat_result.backend_info.__set_fragment_last_active_time( |
96 | 581 | get_fragment_last_active_time()); |
97 | 581 | heartbeat_result.backend_info.__set_be_mem(MemInfo::physical_mem()); |
98 | 581 | } |
99 | 581 | watch.stop(); |
100 | 581 | if (watch.elapsed_time() > 1000L * 1000L * 1000L) { |
101 | 0 | LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time() |
102 | 0 | << ", host:" << master_info.network_address.hostname |
103 | 0 | << ", port:" << master_info.network_address.port |
104 | 0 | << ", cluster id:" << master_info.cluster_id |
105 | 0 | << ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos) |
106 | 0 | << ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch; |
107 | 0 | } |
108 | 581 | } |
109 | | |
110 | 581 | Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { |
111 | 581 | std::lock_guard<std::mutex> lk(_hb_mtx); |
112 | | |
113 | | // Check cluster id |
114 | 581 | if (_cluster_info->cluster_id == -1) { |
115 | 1 | LOG(INFO) << "get first heartbeat. update cluster id"; |
116 | | // write and update cluster id |
117 | 1 | RETURN_IF_ERROR(_engine.set_cluster_id(master_info.cluster_id)); |
118 | | |
119 | 1 | _cluster_info->cluster_id = master_info.cluster_id; |
120 | 1 | LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname |
121 | 1 | << ". port: " << master_info.network_address.port |
122 | 1 | << ". cluster id: " << master_info.cluster_id |
123 | 1 | << ". frontend_infos: " << PrintFrontendInfos(master_info.frontend_infos); |
124 | 580 | } else { |
125 | 580 | if (_cluster_info->cluster_id != master_info.cluster_id) { |
126 | 0 | return Status::InternalError( |
127 | 0 | "invalid cluster id. ignore. Record cluster id ={}, record frontend info {}. " |
128 | 0 | "Invalid cluster_id={}, invalid frontend info {}", |
129 | 0 | _cluster_info->cluster_id, |
130 | 0 | PrintFrontendInfos(ExecEnv::GetInstance()->get_frontends()), |
131 | 0 | master_info.cluster_id, PrintFrontendInfos(master_info.frontend_infos)); |
132 | 0 | } |
133 | 580 | } |
134 | | |
135 | 581 | if (master_info.__isset.backend_ip) { |
136 | | // master_info.backend_ip may be an IP or domain name, and it should be renamed 'backend_host', as it requires compatibility with historical versions, the name is still 'backend_ ip' |
137 | 581 | if (master_info.backend_ip != BackendOptions::get_localhost()) { |
138 | 3 | LOG(INFO) << master_info.backend_ip << " not equal to to backend localhost " |
139 | 3 | << BackendOptions::get_localhost(); |
140 | | // step1: check master_info.backend_ip is IP or FQDN |
141 | 3 | if (!is_valid_ip(master_info.backend_ip)) { |
142 | | //step2: resolve FQDN to IP |
143 | 0 | std::string ip; |
144 | 0 | Status status = |
145 | 0 | hostname_to_ip(master_info.backend_ip, ip, BackendOptions::is_bind_ipv6()); |
146 | 0 | if (!status.ok()) { |
147 | 0 | std::stringstream ss; |
148 | 0 | ss << "can not get ip from fqdn: " << status.to_string(); |
149 | 0 | LOG(WARNING) << ss.str(); |
150 | 0 | return status; |
151 | 0 | } |
152 | 0 | LOG(INFO) << "master_info.backend_ip: " << master_info.backend_ip |
153 | 0 | << ", hostname_to_ip: " << ip; |
154 | | //step3: get all ips of the interfaces on this machine |
155 | 0 | std::vector<InetAddress> hosts; |
156 | 0 | status = get_hosts(&hosts); |
157 | 0 | if (!status.ok() || hosts.empty()) { |
158 | 0 | return Status::InternalError( |
159 | 0 | "the status was not ok when get_hosts, error is {}", |
160 | 0 | status.to_string()); |
161 | 0 | } |
162 | | |
163 | | //step4: check if the IP of FQDN belongs to the current machine and update BackendOptions._s_localhost |
164 | 0 | bool set_new_localhost = false; |
165 | 0 | for (auto& addr : hosts) { |
166 | 0 | if (addr.get_host_address() == ip) { |
167 | 0 | BackendOptions::set_localhost(master_info.backend_ip); |
168 | 0 | set_new_localhost = true; |
169 | 0 | break; |
170 | 0 | } |
171 | 0 | } |
172 | |
|
173 | 0 | if (!set_new_localhost) { |
174 | 0 | return Status::InternalError( |
175 | 0 | "the host recorded in master is {}, but we cannot found the local ip " |
176 | 0 | "that mapped to that host. backend={}", |
177 | 0 | master_info.backend_ip, BackendOptions::get_localhost()); |
178 | 0 | } |
179 | 3 | } else { |
180 | | // if is ip,not check anything,use it |
181 | 3 | BackendOptions::set_localhost(master_info.backend_ip); |
182 | 3 | } |
183 | | |
184 | 3 | LOG(WARNING) << "update localhost done, the new localhost is " |
185 | 3 | << BackendOptions::get_localhost(); |
186 | 3 | } |
187 | 581 | } |
188 | | |
189 | 581 | bool need_report = false; |
190 | 581 | if (_cluster_info->master_fe_addr.hostname != master_info.network_address.hostname || |
191 | 581 | _cluster_info->master_fe_addr.port != master_info.network_address.port) { |
192 | 7 | if (master_info.epoch > _fe_epoch) { |
193 | 7 | _cluster_info->master_fe_addr.hostname = master_info.network_address.hostname; |
194 | 7 | _cluster_info->master_fe_addr.port = master_info.network_address.port; |
195 | 7 | _fe_epoch = master_info.epoch; |
196 | 7 | need_report = true; |
197 | 7 | LOG(INFO) << "master change. new master host: " |
198 | 7 | << _cluster_info->master_fe_addr.hostname |
199 | 7 | << ". port: " << _cluster_info->master_fe_addr.port |
200 | 7 | << ". epoch: " << _fe_epoch; |
201 | 7 | } else { |
202 | 0 | return Status::InternalError( |
203 | 0 | "epoch is not greater than local. ignore heartbeat. host: {}, port: {}, local " |
204 | 0 | "epoch: {}, received epoch: {}", |
205 | 0 | _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port, |
206 | 0 | _fe_epoch, master_info.epoch); |
207 | 0 | } |
208 | 574 | } else { |
209 | | // when Master FE restarted, host and port remains the same, but epoch will be increased. |
210 | 574 | if (master_info.epoch > _fe_epoch) { |
211 | 0 | _fe_epoch = master_info.epoch; |
212 | 0 | need_report = true; |
213 | 0 | LOG(INFO) << "master restarted. epoch: " << _fe_epoch; |
214 | 0 | } |
215 | 574 | } |
216 | | |
217 | 581 | if (master_info.__isset.token) { |
218 | 581 | if (_cluster_info->token == "") { |
219 | 7 | _cluster_info->token = master_info.token; |
220 | 7 | LOG(INFO) << "get token. token: " << _cluster_info->token; |
221 | 574 | } else if (_cluster_info->token != master_info.token) { |
222 | 0 | return Status::InternalError("invalid token. local: {}, master: {}", |
223 | 0 | _cluster_info->token, master_info.token); |
224 | 0 | } |
225 | 581 | } |
226 | | |
227 | 581 | if (master_info.__isset.http_port) { |
228 | 581 | _cluster_info->master_fe_http_port = master_info.http_port; |
229 | 581 | } |
230 | | |
231 | 581 | if (master_info.__isset.heartbeat_flags) { |
232 | 581 | HeartbeatFlags* heartbeat_flags = ExecEnv::GetInstance()->heartbeat_flags(); |
233 | 581 | heartbeat_flags->update(master_info.heartbeat_flags); |
234 | 581 | } |
235 | | |
236 | 581 | if (master_info.__isset.backend_id) { |
237 | 581 | _cluster_info->backend_id = master_info.backend_id; |
238 | 581 | BackendOptions::set_backend_id(master_info.backend_id); |
239 | 581 | } |
240 | 581 | if (master_info.__isset.frontend_infos) { |
241 | 581 | ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos); |
242 | 581 | } else { |
243 | 0 | LOG_EVERY_N(WARNING, 2) << fmt::format( |
244 | 0 | "Heartbeat from {}:{} does not have frontend_infos, this may because we are " |
245 | 0 | "upgrading cluster", |
246 | 0 | master_info.network_address.hostname, master_info.network_address.port); |
247 | 0 | } |
248 | | |
249 | 581 | if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) { |
250 | 0 | LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. " |
251 | 0 | << "FE cloud mode: " |
252 | 0 | << (master_info.__isset.meta_service_endpoint ? "true" : "false") |
253 | 0 | << ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false") |
254 | 0 | << ". If fe is earlier than version 3.0.2, the message can be ignored."; |
255 | 0 | } |
256 | | |
257 | 581 | if (master_info.__isset.meta_service_endpoint) { |
258 | 416 | if (config::meta_service_endpoint.empty() && !master_info.meta_service_endpoint.empty()) { |
259 | 0 | auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint, |
260 | 0 | true); |
261 | 0 | LOG(INFO) << "set config meta_service_endpoint " << master_info.meta_service_endpoint |
262 | 0 | << " " << st; |
263 | 0 | } |
264 | | |
265 | 416 | if (master_info.meta_service_endpoint != config::meta_service_endpoint) { |
266 | 0 | LOG(WARNING) << "Detected mismatch in meta_service_endpoint configuration between FE " |
267 | 0 | "and BE. " |
268 | 0 | << "FE meta_service_endpoint: " << master_info.meta_service_endpoint |
269 | 0 | << ", BE meta_service_endpoint: " << config::meta_service_endpoint; |
270 | 0 | std::vector<std::string> old_endpoints = |
271 | 0 | doris::split(config::meta_service_endpoint, ","); |
272 | 0 | std::vector<std::string> new_endpoints = |
273 | 0 | doris::split(master_info.meta_service_endpoint, ","); |
274 | 0 | auto has_intersection = false; |
275 | 0 | for (auto endpoint : new_endpoints) { |
276 | 0 | if (std::find(old_endpoints.begin(), old_endpoints.end(), endpoint) != |
277 | 0 | old_endpoints.end()) { |
278 | 0 | has_intersection = true; |
279 | 0 | } |
280 | 0 | } |
281 | 0 | if (has_intersection) { |
282 | 0 | auto st = config::set_config("meta_service_endpoint", |
283 | 0 | master_info.meta_service_endpoint, true); |
284 | 0 | LOG(INFO) << "change config meta_service_endpoint to " |
285 | 0 | << master_info.meta_service_endpoint << " " << st; |
286 | 0 | } |
287 | 0 | if (!has_intersection && config::enable_meta_service_endpoint_consistency_check) { |
288 | 0 | return Status::InvalidArgument<false>( |
289 | 0 | "fe and be do not work in same mode or meta_service_endpoint mismatch," |
290 | 0 | "fe meta_service_endpoint: {}, be meta_service_endpoint: {}", |
291 | 0 | master_info.meta_service_endpoint, config::meta_service_endpoint); |
292 | 0 | } |
293 | 0 | } |
294 | 416 | } |
295 | | |
296 | 581 | if (master_info.__isset.cloud_unique_id && |
297 | 581 | config::cloud_unique_id != master_info.cloud_unique_id && |
298 | 581 | config::enable_use_cloud_unique_id_from_fe) { |
299 | 0 | auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true); |
300 | 0 | LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st; |
301 | 0 | } |
302 | | |
303 | 581 | if (master_info.__isset.cloud_cluster_info && |
304 | 581 | master_info.cloud_cluster_info.__isset.isStandby) { |
305 | 416 | auto* cloud_cluster_info = static_cast<CloudClusterInfo*>(_cluster_info); |
306 | 416 | cloud_cluster_info->set_is_in_standby(master_info.cloud_cluster_info.isStandby); |
307 | 416 | } |
308 | | |
309 | 581 | if (master_info.__isset.tablet_report_inactive_duration_ms) { |
310 | 416 | doris::g_tablet_report_inactive_duration_ms = |
311 | 416 | master_info.tablet_report_inactive_duration_ms; |
312 | 416 | } |
313 | | |
314 | 581 | if (master_info.__isset.auth_token) { |
315 | 581 | if (_cluster_info->curr_auth_token == "") { |
316 | 7 | _cluster_info->curr_auth_token = master_info.auth_token; |
317 | 7 | LOG(INFO) << "set new auth token: " << master_info.auth_token; |
318 | 574 | } else if (_cluster_info->curr_auth_token != master_info.auth_token) { |
319 | 0 | LOG(INFO) << "last auth token: " << _cluster_info->last_auth_token |
320 | 0 | << "set new auth token: " << master_info.auth_token; |
321 | 0 | _cluster_info->last_auth_token = _cluster_info->curr_auth_token; |
322 | 0 | _cluster_info->curr_auth_token = master_info.auth_token; |
323 | 0 | } |
324 | 581 | } |
325 | | |
326 | 581 | if (need_report) { |
327 | 7 | LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; |
328 | 7 | _engine.notify_listeners(); |
329 | 7 | } |
330 | | |
331 | 581 | return Status::OK(); |
332 | 581 | } |
333 | | |
334 | | Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port, |
335 | | std::unique_ptr<ThriftServer>* thrift_server, |
336 | 7 | uint32_t worker_thread_num, ClusterInfo* cluster_info) { |
337 | 7 | HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info); |
338 | 7 | if (heartbeat_server == nullptr) { |
339 | 0 | return Status::InternalError("Get heartbeat server failed"); |
340 | 0 | } |
341 | | |
342 | 7 | heartbeat_server->init_cluster_id(); |
343 | | |
344 | 7 | std::shared_ptr<HeartbeatServer> handler(heartbeat_server); |
345 | 7 | std::shared_ptr<HeartbeatServiceProcessor::TProcessor> server_processor( |
346 | 7 | new HeartbeatServiceProcessor(handler)); |
347 | 7 | *thrift_server = std::make_unique<ThriftServer>("heartbeat", server_processor, server_port, |
348 | 7 | worker_thread_num); |
349 | 7 | return Status::OK(); |
350 | 7 | } |
351 | | } // namespace doris |