HeartbeatMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FeDiskInfo;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.HeartbeatService;
import org.apache.doris.thrift.TBackendInfo;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPingBrokerRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
/**
* Heartbeat manager run as a daemon at a fix interval.
* For now, it will send heartbeat to all Frontends, Backends and Brokers
*/
public class HeartbeatMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(HeartbeatMgr.class);
private final ExecutorService executor;
private SystemInfoService nodeMgr;
private HeartbeatFlags heartbeatFlags;
private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<>();
public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) {
super("heartbeat mgr", Config.heartbeat_interval_second * 1000);
this.nodeMgr = nodeMgr;
this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric);
this.heartbeatFlags = new HeartbeatFlags();
}
public void setMaster(int clusterId, String token, long epoch) {
TMasterInfo tMasterInfo = new TMasterInfo(
new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port), clusterId, epoch);
tMasterInfo.setToken(token);
tMasterInfo.setHttpPort(Config.http_port);
long flags = heartbeatFlags.getHeartbeatFlags();
tMasterInfo.setHeartbeatFlags(flags);
if (Config.isCloudMode()) {
// Set the endpoint for the metadata service in cloud mode
tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint);
}
masterInfo.set(tMasterInfo);
}
/**
* At each round:
* 1. send heartbeat to all nodes
* 2. collect the heartbeat response from all nodes, and handle them
*/
@Override
protected void runAfterCatalogReady() {
// Get feInfos of previous iteration.
List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos();
List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();
// send backend heartbeat
List<Backend> bes;
try {
bes = nodeMgr.getAllBackendsByAllCluster().values().asList();
} catch (UserException e) {
LOG.warn("can not get backends", e);
return;
}
for (Backend backend : bes) {
BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend, feInfos);
hbResponses.add(executor.submit(handler));
}
// send frontend heartbeat
List<Frontend> frontends = Env.getCurrentEnv().getFrontends(null);
for (Frontend frontend : frontends) {
FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(frontend,
Env.getCurrentEnv().getClusterId(),
Env.getCurrentEnv().getToken());
hbResponses.add(executor.submit(handler));
}
// send broker heartbeat;
Map<String, List<FsBroker>> brokerMap = Maps.newHashMap(
Env.getCurrentEnv().getBrokerMgr().getBrokerListMap());
for (Map.Entry<String, List<FsBroker>> entry : brokerMap.entrySet()) {
for (FsBroker brokerAddress : entry.getValue()) {
BrokerHeartbeatHandler handler = new BrokerHeartbeatHandler(entry.getKey(), brokerAddress,
masterInfo.get().getNetworkAddress().getHostname());
hbResponses.add(executor.submit(handler));
}
}
// collect all heartbeat responses and handle them.
// and also we find which node's info is changed, if is changed, we need collect them and write
// an edit log to synchronize the info to other Frontends
HbPackage hbPackage = new HbPackage();
for (Future<HeartbeatResponse> future : hbResponses) {
boolean isChanged = false;
try {
// the heartbeat rpc's timeout is 5 seconds, so we will not be blocked here very long.
HeartbeatResponse response = future.get();
if (response.getStatus() != HbStatus.OK) {
LOG.warn("get bad heartbeat response: {}", response);
}
isChanged = handleHbResponse(response, false);
if (isChanged) {
hbPackage.addHbResponse(response);
}
} catch (InterruptedException | ExecutionException e) {
LOG.warn("got exception when doing heartbeat", e);
}
} // end for all results
Env.getCurrentEnv().getEditLog().logHeartbeat(hbPackage);
}
private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
switch (response.getType()) {
case FRONTEND: {
FrontendHbResponse hbResponse = (FrontendHbResponse) response;
Frontend fe = Env.getCurrentEnv().getFeByName(hbResponse.getName());
if (fe != null) {
return fe.handleHbResponse(hbResponse, isReplay);
}
break;
}
case BACKEND: {
BackendHbResponse hbResponse = (BackendHbResponse) response;
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse, isReplay);
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
if (!isReplay && Config.enable_abort_txn_by_checking_coordinator_be
&& oldStartTime != newStartTime) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
be.getId(), be.getHost(), newStartTime);
}
} else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs()
>= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
be.getId(), be.getHost(), 100);
}
}
return isChanged;
}
break;
}
case BROKER: {
BrokerHbResponse hbResponse = (BrokerHbResponse) response;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getBroker(
hbResponse.getName(), hbResponse.getHost(), hbResponse.getPort());
if (broker != null) {
boolean isChanged = broker.handleHbResponse(hbResponse);
if (hbResponse.getStatus() != HbStatus.OK) {
// invalid all connections cached in ClientPool
ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.host, broker.port));
}
return isChanged;
}
break;
}
default:
break;
}
return false;
}
// backend heartbeat
private class BackendHeartbeatHandler implements Callable<HeartbeatResponse> {
private Backend backend;
private List<TFrontendInfo> feInfos;
public BackendHeartbeatHandler(Backend backend, List<TFrontendInfo> feInfos) {
this.backend = backend;
this.feInfos = feInfos;
}
@Override
public HeartbeatResponse call() {
HeartbeatResponse response = pingOnce();
// We ping twice here to avoid immediately failure due to connection reset.
if (response.getStatus() != HbStatus.OK) {
response = pingOnce();
}
return response;
}
private HeartbeatResponse pingOnce() {
long backendId = backend.getId();
HeartbeatService.Client client = null;
TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort());
boolean ok = false;
try {
TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get());
copiedMasterInfo.setBackendIp(backend.getHost());
long flags = heartbeatFlags.getHeartbeatFlags();
copiedMasterInfo.setHeartbeatFlags(flags);
copiedMasterInfo.setBackendId(backendId);
copiedMasterInfo.setFrontendInfos(feInfos);
copiedMasterInfo.setAuthToken(Env.getCurrentEnv().getTokenManager().acquireToken());
if (Config.isCloudMode()) {
String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds);
}
THeartbeatResult result;
if (!FeConstants.runningUnitTest) {
client = ClientPool.backendHeartbeatPool.borrowObject(beAddr);
result = client.heartbeat(copiedMasterInfo);
} else {
// Mocked result
TBackendInfo backendInfo = new TBackendInfo();
backendInfo.setBePort(1);
backendInfo.setHttpPort(2);
backendInfo.setBeRpcPort(3);
backendInfo.setBrpcPort(4);
backendInfo.setArrowFlightSqlPort(8);
backendInfo.setVersion("test-1234");
result = new THeartbeatResult();
result.setStatus(new TStatus(TStatusCode.OK));
result.setBackendInfo(backendInfo);
}
String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault(
"HeartbeatMgr.BackendHeartbeatHandler", "deadBeIds", "");
if (!Strings.isNullOrEmpty(debugDeadBeIds)
&& Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id -> Long.parseLong(id) == backendId)) {
result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
result.getStatus().addToErrorMsgs("debug point HeartbeatMgr.deadBeIds set dead be");
}
ok = true;
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
TBackendInfo tBackendInfo = result.getBackendInfo();
int bePort = tBackendInfo.getBePort();
int httpPort = tBackendInfo.getHttpPort();
int brpcPort = -1;
if (tBackendInfo.isSetBrpcPort()) {
brpcPort = tBackendInfo.getBrpcPort();
}
int arrowFlightSqlPort = -1;
if (tBackendInfo.isSetArrowFlightSqlPort()) {
arrowFlightSqlPort = tBackendInfo.getArrowFlightSqlPort();
}
String version = "";
if (tBackendInfo.isSetVersion()) {
version = tBackendInfo.getVersion();
}
long beStartTime = tBackendInfo.getBeStartTime();
String nodeRole = Tag.VALUE_MIX;
if (tBackendInfo.isSetBeNodeRole()) {
nodeRole = tBackendInfo.getBeNodeRole();
}
long fragmentNum = tBackendInfo.getFragmentExecutingCount();
long lastFragmentUpdateTime = tBackendInfo.getFragmentLastActiveTime();
boolean isShutDown = false;
if (tBackendInfo.isSetIsShutdown()) {
isShutDown = tBackendInfo.isIsShutdown();
}
long beMemory = tBackendInfo.isSetBeMem() ? tBackendInfo.getBeMem() : 0;
return new BackendHbResponse(backendId, bePort, httpPort, brpcPort,
System.currentTimeMillis(), beStartTime, version, nodeRole,
fragmentNum, lastFragmentUpdateTime, isShutDown, arrowFlightSqlPort, beMemory);
} else {
return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(),
result.getStatus().getErrorMsgs().isEmpty()
? "Unknown error" : result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
LOG.warn("backend heartbeat got exception", e);
return new BackendHbResponse(backendId, backend.getHost(), backend.getLastUpdateMs(),
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (client != null) {
if (ok) {
ClientPool.backendHeartbeatPool.returnObject(beAddr, client);
} else {
ClientPool.backendHeartbeatPool.invalidateObject(beAddr, client);
}
}
}
}
}
// frontend heartbeat
public static class FrontendHeartbeatHandler implements Callable<HeartbeatResponse> {
private Frontend fe;
private int clusterId;
private String token;
public FrontendHeartbeatHandler(Frontend fe, int clusterId, String token) {
this.fe = fe;
this.clusterId = clusterId;
this.token = token;
}
@Override
public HeartbeatResponse call() {
HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
if (fe.getHost().equals(selfNode.getHost())) {
// heartbeat to self
if (Env.getCurrentEnv().isReady()) {
return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port,
Config.arrow_flight_sql_port, Env.getCurrentEnv().getMaxJournalId(),
System.currentTimeMillis(),
Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH,
ExecuteEnv.getInstance().getStartupTime(), ExecuteEnv.getInstance().getDiskInfos(),
ExecuteEnv.getInstance().getProcessUUID());
} else {
return new FrontendHbResponse(fe.getNodeName(), "not ready");
}
}
return getHeartbeatResponse();
}
private HeartbeatResponse getHeartbeatResponse() {
FrontendService.Client client = null;
TNetworkAddress addr = new TNetworkAddress(fe.getHost(), Config.rpc_port);
boolean ok = false;
try {
client = ClientPool.frontendHeartbeatPool.borrowObject(addr);
TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token);
request.setDeployMode(Env.getCurrentEnv().getDeployMode());
TFrontendPingFrontendResult result = client.ping(request);
ok = true;
if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(),
result.getRpcPort(), result.getArrowFlightSqlPort(), result.getReplayedJournalId(),
System.currentTimeMillis(), result.getVersion(), result.getLastStartupTime(),
FeDiskInfo.fromThrifts(result.getDiskInfos()), result.getProcessUUID());
} else {
return new FrontendHbResponse(fe.getNodeName(), result.getMsg());
}
} catch (Exception e) {
return new FrontendHbResponse(fe.getNodeName(),
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (ok) {
ClientPool.frontendHeartbeatPool.returnObject(addr, client);
} else {
ClientPool.frontendHeartbeatPool.invalidateObject(addr, client);
}
}
}
}
// broker heartbeat handler
public static class BrokerHeartbeatHandler implements Callable<HeartbeatResponse> {
private String brokerName;
private FsBroker broker;
private String clientId;
public BrokerHeartbeatHandler(String brokerName, FsBroker broker, String clientId) {
this.brokerName = brokerName;
this.broker = broker;
this.clientId = clientId;
}
@Override
public HeartbeatResponse call() {
TPaloBrokerService.Client client = null;
TNetworkAddress addr = new TNetworkAddress(broker.host, broker.port);
boolean ok = false;
try {
client = ClientPool.brokerPool.borrowObject(addr);
TBrokerPingBrokerRequest request = new TBrokerPingBrokerRequest(TBrokerVersion.VERSION_ONE,
clientId);
TBrokerOperationStatus status = client.ping(request);
ok = true;
if (status.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new BrokerHbResponse(brokerName, broker.host, broker.port, status.getMessage());
} else {
return new BrokerHbResponse(brokerName, broker.host, broker.port, System.currentTimeMillis());
}
} catch (Exception e) {
return new BrokerHbResponse(brokerName, broker.host, broker.port,
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (ok) {
ClientPool.brokerPool.returnObject(addr, client);
} else {
ClientPool.brokerPool.invalidateObject(addr, client);
}
}
}
}
public void replayHearbeat(HbPackage hbPackage) {
for (HeartbeatResponse hbResult : hbPackage.getHbResults()) {
handleHbResponse(hbResult, true);
}
}
}