Backend.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class extends the primary identifier of a Backend with ephemeral state,
* eg usage information, current administrative state etc.
*/
public class Backend implements Writable {
private static final Logger LOG = LogManager.getLogger(Backend.class);
// Represent a meaningless IP
public static final String DUMMY_IP = "0.0.0.0";
@SerializedName("id")
private long id;
@SerializedName("host")
private volatile String host;
private String version;
@SerializedName("heartbeatPort")
private int heartbeatPort; // heartbeat
@SerializedName("bePort")
private volatile int bePort; // be
@SerializedName("httpPort")
private volatile int httpPort; // web service
@SerializedName("beRpcPort")
private volatile int beRpcPort; // be rpc port
@SerializedName("brpcPort")
private volatile int brpcPort = -1;
@SerializedName("arrowFlightSqlPort")
private volatile int arrowFlightSqlPort = -1;
@SerializedName("lastUpdateMs")
private volatile long lastUpdateMs;
@SerializedName("lastStartTime")
private volatile long lastStartTime;
@SerializedName("isAlive")
private AtomicBoolean isAlive;
@SerializedName("isDecommissioned")
private AtomicBoolean isDecommissioned;
private AtomicBoolean isDecommissioning = new AtomicBoolean(false);
// rootPath -> DiskInfo
@SerializedName("disksRef")
private volatile ImmutableMap<String, DiskInfo> disksRef;
private Long lastPublishTaskAccumulatedNum = 0L;
private String heartbeatErrMsg = "";
// This is used for the first time we init pathHashToDishInfo in SystemInfoService.
// after init it, this variable is set to true.
private boolean initPathInfo = false;
private long lastMissingHeartbeatTime = -1;
// the max tablet compaction score of this backend.
// this field is set by tablet report, and just for metric monitor, no need to persist.
private volatile long tabletMaxCompactionScore = 0;
// additional backendStatus information for BE, display in JSON format
@SerializedName("backendStatus")
private BackendStatus backendStatus = new BackendStatus();
// the locationTag is also saved in tagMap, use a single field here to avoid
// creating this everytime we get it.
@SerializedName(value = "locationTag", alternate = {"tag"})
private Tag locationTag = Tag.DEFAULT_BACKEND_TAG;
@SerializedName("nodeRole")
private Tag nodeRoleTag = Tag.DEFAULT_NODE_ROLE_TAG;
// tag type -> tag value.
// A backend can only be assigned to one tag type, and each type can only have one value.
@SerializedName("tagMap")
private Map<String, String> tagMap = Maps.newHashMap();
private boolean isSmoothUpgradeSrc = false; // This be process is old process when doing smooth upgrade
private boolean isSmoothUpgradeDst = false; // This be process is new process when doing smooth upgrade
// cpu cores
@SerializedName("cpuCores")
private int cpuCores = 1;
// The physical memory available for use by BE.
@SerializedName("beMemory")
private long beMemory = 0;
// from config::pipeline_executor_size , default equal cpuCores
@SerializedName("pipelineExecutorSize")
private int pipelineExecutorSize = 1;
// Counter of heartbeat failure.
// Once a heartbeat failed, increase this counter by one.
// And if it reaches Config.max_backend_heartbeat_failure_tolerance_count, this backend
// will be marked as dead.
// And once it back to alive, reset this counter.
// No need to persist, because only master FE handle heartbeat.
private int heartbeatFailureCounter = 0;
// Not need serialize this field. If fe restart the state is reset to false. Maybe fe will
// send some queries to this BE, it is not an important problem.
private AtomicBoolean isShutDown = new AtomicBoolean(false);
private long nextForceEditlogHeartbeatTime = System.currentTimeMillis() + (new SecureRandom()).nextInt(60 * 1000);
public Backend() {
this.host = "";
this.version = "";
this.lastUpdateMs = 0;
this.lastStartTime = 0;
this.isAlive = new AtomicBoolean();
this.isDecommissioned = new AtomicBoolean(false);
this.bePort = 0;
this.httpPort = 0;
this.beRpcPort = 0;
this.disksRef = ImmutableMap.of();
this.tagMap.put(locationTag.type, locationTag.value);
}
public Backend(long id, String host, int heartbeatPort) {
this.id = id;
this.host = host;
this.version = "";
this.heartbeatPort = heartbeatPort;
this.bePort = -1;
this.httpPort = -1;
this.beRpcPort = -1;
this.lastUpdateMs = -1L;
this.lastStartTime = -1L;
this.disksRef = ImmutableMap.of();
this.isAlive = new AtomicBoolean(false);
this.isDecommissioned = new AtomicBoolean(false);
this.tagMap.put(locationTag.type, locationTag.value);
}
public String getCloudClusterStatus() {
return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(Cloud.ClusterStatus.NORMAL));
}
public void setCloudClusterStatus(final String clusterStatus) {
tagMap.put(Tag.CLOUD_CLUSTER_STATUS, clusterStatus);
}
public String getCloudClusterName() {
return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, "");
}
public void setCloudClusterName(final String clusterName) {
tagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
}
public String getCloudClusterId() {
return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_ID, "");
}
public String getCloudUniqueId() {
return tagMap.getOrDefault(Tag.CLOUD_UNIQUE_ID, "");
}
public String getCloudPublicEndpoint() {
return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, "");
}
public String getCloudPrivateEndpoint() {
return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, "");
}
public long getId() {
return id;
}
// Return ip:heartbeat port
public String getAddress() {
return host + ":" + heartbeatPort;
}
public String getHost() {
return host;
}
public String getVersion() {
return version;
}
public int getBePort() {
return bePort;
}
public int getHeartbeatPort() {
return heartbeatPort;
}
public int getHttpPort() {
return httpPort;
}
public int getBeRpcPort() {
return beRpcPort;
}
public int getBrpcPort() {
return brpcPort;
}
public int getArrowFlightSqlPort() {
return arrowFlightSqlPort;
}
public String getHeartbeatErrMsg() {
return heartbeatErrMsg;
}
public long getLastStreamLoadTime() {
return this.backendStatus.lastStreamLoadTime;
}
public void setLastStreamLoadTime(long lastStreamLoadTime) {
this.backendStatus.lastStreamLoadTime = lastStreamLoadTime;
}
public boolean isQueryDisabled() {
return backendStatus.isQueryDisabled;
}
public void setQueryDisabled(boolean isQueryDisabled) {
this.backendStatus.isQueryDisabled = isQueryDisabled;
}
public boolean isLoadDisabled() {
return backendStatus.isLoadDisabled;
}
public void setLoadDisabled(boolean isLoadDisabled) {
this.backendStatus.isLoadDisabled = isLoadDisabled;
}
public void setActive(boolean isActive) {
this.backendStatus.isActive = isActive;
}
public boolean isActive() {
return this.backendStatus.isActive;
}
public long getCurrentFragmentNum() {
return this.backendStatus.currentFragmentNum;
}
public String getDetailsForCreateReplica() {
int hddBad = 0;
int hddExceedLimit = 0;
int hddOk = 0;
int ssdBad = 0;
int ssdExceedLimit = 0;
int ssdOk = 0;
for (DiskInfo disk : disksRef.values()) {
TStorageMedium storageMedium = disk.getStorageMedium();
if (storageMedium == TStorageMedium.HDD) {
if (!disk.isAlive()) {
hddBad++;
} else if (disk.exceedLimit(true)) {
hddExceedLimit++;
} else {
hddOk++;
}
} else if (storageMedium == TStorageMedium.SSD) {
if (!disk.isAlive()) {
ssdBad++;
} else if (disk.exceedLimit(true)) {
ssdExceedLimit++;
} else {
ssdOk++;
}
}
}
StringBuilder sb = new StringBuilder("[");
sb.append("backendId=").append(id);
sb.append(", host=").append(host);
if (!isAlive()) {
sb.append(", isAlive=false, exclude it");
} else if (isDecommissioned()) {
sb.append(", isDecommissioned=true, exclude it");
} else if (isComputeNode()) {
sb.append(", isComputeNode=true, exclude it");
} else if (!Config.disable_backend_black_list && !SimpleScheduler.isAvailable(this)) {
sb.append(", is in black list, exclude it");
} else {
sb.append(", hdd disks count={");
if (hddOk > 0) {
sb.append("ok=").append(hddOk).append(",");
}
if (hddBad > 0) {
sb.append("bad=").append(hddBad).append(",");
}
if (hddExceedLimit > 0) {
sb.append("capExceedLimit=").append(hddExceedLimit).append(",");
}
sb.append("}, ssd disk count={");
if (ssdOk > 0) {
sb.append("ok=").append(ssdOk).append(",");
}
if (ssdBad > 0) {
sb.append("bad=").append(ssdBad).append(",");
}
if (ssdExceedLimit > 0) {
sb.append("capExceedLimit=").append(ssdExceedLimit).append(",");
}
sb.append("}");
}
sb.append("]");
return sb.toString();
}
// for test only
public void updateOnce(int bePort, int httpPort, int beRpcPort) {
if (this.bePort != bePort) {
this.bePort = bePort;
}
if (this.httpPort != httpPort) {
this.httpPort = httpPort;
}
if (this.beRpcPort != beRpcPort) {
this.beRpcPort = beRpcPort;
}
long currentTime = System.currentTimeMillis();
this.lastUpdateMs = currentTime;
if (!isAlive.get()) {
this.lastStartTime = currentTime;
LOG.info("{} is alive,", this.toString());
this.isAlive.set(true);
}
heartbeatErrMsg = "";
}
public boolean setDecommissioned(boolean isDecommissioned) {
if (this.isDecommissioned.compareAndSet(!isDecommissioned, isDecommissioned)) {
LOG.warn("{} set decommission: {}", this.toString(), isDecommissioned);
return true;
}
return false;
}
public boolean setDecommissioning(boolean isDecommissioning) {
if (this.isDecommissioning.compareAndSet(!isDecommissioning, isDecommissioning)) {
LOG.warn("{} set decommissioning: {}", this.toString(), isDecommissioning);
return true;
}
return false;
}
public void setHost(String host) {
this.host = host;
}
public void setAlive(boolean isAlive) {
this.isAlive.set(isAlive);
}
public void setBePort(int agentPort) {
this.bePort = agentPort;
}
public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}
public void setBeRpcPort(int beRpcPort) {
this.beRpcPort = beRpcPort;
}
public void setBrpcPort(int brpcPort) {
this.brpcPort = brpcPort;
}
public void setArrowFlightSqlPort(int arrowFlightSqlPort) {
this.arrowFlightSqlPort = arrowFlightSqlPort;
}
public void setCpuCores(int cpuCores) {
this.cpuCores = cpuCores;
}
public void setPipelineExecutorSize(int pipelineExecutorSize) {
this.pipelineExecutorSize = pipelineExecutorSize;
}
public long getLastUpdateMs() {
return this.lastUpdateMs;
}
public void setLastUpdateMs(long currentTime) {
this.lastUpdateMs = currentTime;
}
public long getLastStartTime() {
return this.lastStartTime;
}
public void setLastStartTime(long currentTime) {
this.lastStartTime = currentTime;
}
public int getCputCores() {
return cpuCores;
}
public long getBeMemory() {
return beMemory;
}
public int getPipelineExecutorSize() {
return pipelineExecutorSize;
}
public long getLastMissingHeartbeatTime() {
return lastMissingHeartbeatTime;
}
public void setLastMissingHeartbeatTime(long lastMissingHeartbeatTime) {
this.lastMissingHeartbeatTime = lastMissingHeartbeatTime;
}
// Backend process epoch, is uesd to tag a beckend process
// Currently it is always equal to be start time, even during oplog replay.
public long getProcessEpoch() {
return lastStartTime;
}
public boolean isAlive() {
return this.isAlive.get();
}
public boolean isDecommissioned() {
return this.isDecommissioned.get();
}
public boolean isDecommissioning() {
return this.isDecommissioning.get();
}
public boolean isQueryAvailable() {
return isAlive() && !isQueryDisabled() && !isShutDown.get();
}
public boolean isScheduleAvailable() {
return isAlive() && !isDecommissioned();
}
public boolean isLoadAvailable() {
return isAlive() && !isLoadDisabled();
}
public void setDisks(ImmutableMap<String, DiskInfo> disks) {
this.disksRef = disks;
}
public BackendStatus getBackendStatus() {
return backendStatus;
}
public void setSmoothUpgradeSrc(boolean is) {
this.isSmoothUpgradeSrc = is;
}
public boolean isSmoothUpgradeSrc() {
return this.isSmoothUpgradeSrc;
}
public void setSmoothUpgradeDst(boolean is) {
this.isSmoothUpgradeDst = is;
}
public boolean isSmoothUpgradeDst() {
return this.isSmoothUpgradeDst;
}
public int getHeartbeatFailureCounter() {
return heartbeatFailureCounter;
}
public ImmutableMap<String, DiskInfo> getDisks() {
return this.disksRef;
}
public boolean hasPathHash() {
return disksRef.values().stream().allMatch(DiskInfo::hasPathHash);
}
public boolean hasSpecifiedStorageMedium(TStorageMedium storageMedium) {
return disksRef.values().stream().anyMatch(d -> d.isStorageMediumMatch(storageMedium));
}
public long getTotalCapacityB() {
ImmutableMap<String, DiskInfo> disks = disksRef;
long totalCapacityB = 0L;
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getState() == DiskState.ONLINE) {
totalCapacityB += diskInfo.getTotalCapacityB();
}
}
return totalCapacityB;
}
public long getAvailableCapacityB() {
// when system init, disks is empty, return 1L.
ImmutableMap<String, DiskInfo> disks = disksRef;
long availableCapacityB = 1L;
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getState() == DiskState.ONLINE) {
availableCapacityB += diskInfo.getAvailableCapacityB();
}
}
return availableCapacityB;
}
public long getDataUsedCapacityB() {
ImmutableMap<String, DiskInfo> disks = disksRef;
long dataUsedCapacityB = 0L;
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getState() == DiskState.ONLINE) {
dataUsedCapacityB += diskInfo.getDataUsedCapacityB();
}
}
return dataUsedCapacityB;
}
public long getTrashUsedCapacityB() {
ImmutableMap<String, DiskInfo> disks = disksRef;
long trashUsedCapacityB = 0L;
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getState() == DiskState.ONLINE) {
trashUsedCapacityB += diskInfo.getTrashUsedCapacityB();
}
}
return trashUsedCapacityB;
}
public long getRemoteUsedCapacityB() {
ImmutableMap<String, DiskInfo> disks = disksRef;
long totalRemoteUsedCapacityB = 0L;
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getState() == DiskState.ONLINE) {
totalRemoteUsedCapacityB += diskInfo.getRemoteUsedCapacity();
}
}
return totalRemoteUsedCapacityB;
}
public double getMaxDiskUsedPct() {
ImmutableMap<String, DiskInfo> disks = disksRef;
double maxPct = 0.0;
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getState() == DiskState.ONLINE) {
double percent = diskInfo.getUsedPct();
if (percent > maxPct) {
maxPct = percent;
}
}
}
return maxPct;
}
public boolean diskExceedLimitByStorageMedium(TStorageMedium storageMedium) {
if (getDiskNumByStorageMedium(storageMedium) <= 0) {
return true;
}
ImmutableMap<String, DiskInfo> diskInfos = disksRef;
boolean exceedLimit = true;
for (DiskInfo diskInfo : diskInfos.values()) {
if (diskInfo.getState() == DiskState.ONLINE && diskInfo.getStorageMedium()
== storageMedium && !diskInfo.exceedLimit(true)) {
exceedLimit = false;
break;
}
}
return exceedLimit;
}
public boolean diskExceedLimit() {
if (getDiskNum() <= 0) {
return true;
}
ImmutableMap<String, DiskInfo> diskInfos = disksRef;
boolean exceedLimit = true;
for (DiskInfo diskInfo : diskInfos.values()) {
if (diskInfo.getState() == DiskState.ONLINE && !diskInfo.exceedLimit(true)) {
exceedLimit = false;
break;
}
}
return exceedLimit;
}
public void updateDisks(Map<String, TDisk> backendDisks) {
ImmutableMap<String, DiskInfo> disks = disksRef;
// The very first time to init the path info
if (!initPathInfo) {
boolean allPathHashUpdated = true;
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getPathHash() == 0) {
allPathHashUpdated = false;
break;
}
}
if (allPathHashUpdated) {
initPathInfo = true;
Env.getCurrentSystemInfo().updatePathInfo(new ArrayList<>(disks.values()), Lists.newArrayList());
}
}
// update status or add new diskInfo
Map<String, DiskInfo> newDiskInfos = Maps.newHashMap();
List<DiskInfo> addedDisks = Lists.newArrayList();
List<DiskInfo> removedDisks = Lists.newArrayList();
/*
* set isChanged to true only if new disk is added or old disk is dropped.
* we ignore the change of capacity, because capacity info is only used in master FE.
*/
boolean isChanged = false;
for (TDisk tDisk : backendDisks.values()) {
String rootPath = tDisk.getRootPath();
long totalCapacityB = tDisk.getDiskTotalCapacity();
long dataUsedCapacityB = tDisk.getDataUsedCapacity();
long trashUsedCapacityB = tDisk.getTrashUsedCapacity();
long diskAvailableCapacityB = tDisk.getDiskAvailableCapacity();
boolean isUsed = tDisk.isUsed();
DiskInfo diskInfo = disks.get(rootPath);
if (diskInfo == null) {
diskInfo = new DiskInfo(rootPath);
addedDisks.add(diskInfo);
isChanged = true;
LOG.info("add new disk info. backendId: {}, rootPath: {}", id, rootPath);
}
newDiskInfos.put(rootPath, diskInfo);
diskInfo.setTotalCapacityB(totalCapacityB);
diskInfo.setDataUsedCapacityB(dataUsedCapacityB);
diskInfo.setTrashUsedCapacityB(trashUsedCapacityB);
diskInfo.setAvailableCapacityB(diskAvailableCapacityB);
if (tDisk.isSetRemoteUsedCapacity()) {
diskInfo.setRemoteUsedCapacity(tDisk.getRemoteUsedCapacity());
}
if (tDisk.isSetPathHash()) {
diskInfo.setPathHash(tDisk.getPathHash());
}
if (tDisk.isSetStorageMedium()) {
diskInfo.setStorageMedium(tDisk.getStorageMedium());
}
if (isUsed) {
if (diskInfo.setState(DiskState.ONLINE)) {
isChanged = true;
}
} else {
if (diskInfo.setState(DiskState.OFFLINE)) {
isChanged = true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("update disk info. backendId: {}, diskInfo: {}", id, diskInfo.toString());
}
}
// remove not exist rootPath in backend
for (DiskInfo diskInfo : disks.values()) {
String rootPath = diskInfo.getRootPath();
if (!backendDisks.containsKey(rootPath)) {
removedDisks.add(diskInfo);
isChanged = true;
LOG.warn("remove not exist rootPath. backendId: {}, rootPath: {}", id, rootPath);
}
}
if (isChanged) {
// update disksRef
disksRef = ImmutableMap.copyOf(newDiskInfos);
Env.getCurrentSystemInfo().updatePathInfo(addedDisks, removedDisks);
// log disk changing
Env.getCurrentEnv().getEditLog().logBackendStateChange(this);
}
}
public boolean updateCpuInfo(int cpuCores, int pipelineExecutorSize) {
boolean isChanged = false;
if (this.cpuCores != cpuCores) {
this.cpuCores = cpuCores;
isChanged = true;
}
if (this.pipelineExecutorSize != pipelineExecutorSize) {
this.pipelineExecutorSize = pipelineExecutorSize;
isChanged = true;
}
return isChanged;
}
/**
* In old version, there is only one tag for a Backend, and it is a "location" type tag.
* But in new version, a Backend can have multi tag, so we need to put locationTag to
* the new tagMap
*/
private void convertToTagMapAndSetLocationTag() {
if (tagMap == null) {
// When first upgrade from old version, tags may be null
tagMap = Maps.newHashMap();
}
if (!locationTag.value.equals(tagMap.get(Tag.TYPE_LOCATION))) {
// ATTN: here we use Tag.TYPE_LOCATION directly, not locationTag.type,
// because we need to make sure the previous tag must be a location type tag,
// and if not, convert it to location type.
tagMap.put(Tag.TYPE_LOCATION, locationTag.value);
}
}
public static Backend read(DataInput in) throws IOException {
String json = Text.readString(in);
Backend be = GsonUtils.GSON.fromJson(json, Backend.class);
be.convertToTagMapAndSetLocationTag();
return be;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
@Override
public int hashCode() {
return Objects.hash(id, host, heartbeatPort, bePort, isAlive.get());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof Backend)) {
return false;
}
Backend backend = (Backend) obj;
return (id == backend.id) && (host.equals(backend.host)) && (heartbeatPort == backend.heartbeatPort) && (bePort
== backend.bePort) && (isAlive.get() == backend.isAlive.get());
}
@Override
public String toString() {
return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get()
+ ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + lastStartTime
+ ", isDecommissioned=" + isDecommissioned + ", tags: " + tagMap + "]"
+ ", backendStatus: " + backendStatus;
}
/**
* handle Backend's heartbeat response.
* return true if any port changed, or alive state is changed.
*/
public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) {
boolean isChanged = false;
if (hbResponse.getStatus() == HbStatus.OK) {
if (!this.version.equals(hbResponse.getVersion())) {
isChanged = true;
this.version = hbResponse.getVersion();
}
if (this.bePort != hbResponse.getBePort() && !FeConstants.runningUnitTest) {
isChanged = true;
this.bePort = hbResponse.getBePort();
}
if (this.httpPort != hbResponse.getHttpPort() && !FeConstants.runningUnitTest) {
isChanged = true;
this.httpPort = hbResponse.getHttpPort();
}
if (this.brpcPort != hbResponse.getBrpcPort() && !FeConstants.runningUnitTest) {
isChanged = true;
this.brpcPort = hbResponse.getBrpcPort();
}
if (this.arrowFlightSqlPort != hbResponse.getArrowFlightSqlPort() && !FeConstants.runningUnitTest) {
isChanged = true;
this.arrowFlightSqlPort = hbResponse.getArrowFlightSqlPort();
}
if (this.isShutDown.get() != hbResponse.isShutDown()) {
isChanged = true;
LOG.info("{} shutdown state is changed", this.toString());
this.isShutDown.set(hbResponse.isShutDown());
}
if (!this.getNodeRoleTag().value.equals(hbResponse.getNodeRole()) && Tag.validNodeRoleTag(
hbResponse.getNodeRole())) {
isChanged = true;
this.nodeRoleTag = Tag.createNotCheck(Tag.TYPE_ROLE, hbResponse.getNodeRole());
}
if (this.beMemory != hbResponse.getBeMemory()) {
isChanged = true;
this.beMemory = hbResponse.getBeMemory();
}
this.lastUpdateMs = hbResponse.getHbTime();
if (!isAlive.get()) {
isChanged = true;
LOG.info("{} is back to alive, update start time from {} to {}, "
+ "update be epoch from {} to {}.", this.toString(),
TimeUtils.longToTimeString(lastStartTime),
TimeUtils.longToTimeString(hbResponse.getBeStartTime()),
lastStartTime, hbResponse.getBeStartTime());
this.lastStartTime = hbResponse.getBeStartTime();
this.isAlive.set(true);
}
if (this.lastStartTime != hbResponse.getBeStartTime() && hbResponse.getBeStartTime() > 0) {
LOG.info("{} update last start time from {} to {}, "
+ "update be epoch from {} to {}.", this.toString(),
TimeUtils.longToTimeString(lastStartTime),
TimeUtils.longToTimeString(hbResponse.getBeStartTime()),
lastStartTime, hbResponse.getBeStartTime());
this.lastStartTime = hbResponse.getBeStartTime();
isChanged = true;
}
this.backendStatus.currentFragmentNum = hbResponse.getFragmentNum();
this.backendStatus.lastFragmentUpdateTime = hbResponse.getLastFragmentUpdateTime();
heartbeatErrMsg = "";
this.heartbeatFailureCounter = 0;
// even if no change, write an editlog to make lastUpdateMs in image update
if (System.currentTimeMillis() >= this.nextForceEditlogHeartbeatTime) {
isChanged = true;
int delaySecond = Config.editlog_healthy_heartbeat_seconds + (new SecureRandom()).nextInt(60);
this.nextForceEditlogHeartbeatTime = System.currentTimeMillis() + delaySecond * 1000L;
}
} else {
// for a bad BackendHbResponse, its hbTime is last succ hbTime, not this hbTime
if (hbResponse.getHbTime() > 0) {
this.lastUpdateMs = hbResponse.getHbTime();
}
// Only set backend to dead if the heartbeat failure counter exceed threshold.
// And if it is a replay process, must set backend to dead.
if (isReplay || ++this.heartbeatFailureCounter >= Config.max_backend_heartbeat_failure_tolerance_count) {
if (isAlive.compareAndSet(true, false)) {
isChanged = true;
LOG.warn("{} is dead,", this.toString());
}
}
// still set error msg and missing time even if we may not mark this backend as dead,
// for debug easily.
// But notice that if isChanged = false, these msg will not sync to other FE.
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
lastMissingHeartbeatTime = System.currentTimeMillis();
}
return isChanged;
}
public void setTabletMaxCompactionScore(long compactionScore) {
tabletMaxCompactionScore = compactionScore;
}
public long getTabletMaxCompactionScore() {
return tabletMaxCompactionScore;
}
private long getDiskNumByStorageMedium(TStorageMedium storageMedium) {
return disksRef.values().stream().filter(v -> v.getStorageMedium() == storageMedium).count();
}
private int getDiskNum() {
return disksRef.size();
}
/**
* Note: This class must be a POJO in order to display in JSON format
* Add additional information in the class to show in `show backends`
* if just change new added backendStatus, you can do like following
* BackendStatus status = Backend.getBackendStatus();
* status.newItem = xxx;
*/
public class BackendStatus {
// this will be output as json, so not using FeConstants.null_string;
public volatile String lastSuccessReportTabletsTime = "N/A";
@SerializedName("lastStreamLoadTime")
// the last time when the stream load status was reported by backend
public volatile long lastStreamLoadTime = -1;
@SerializedName("isQueryDisabled")
public volatile boolean isQueryDisabled = false;
@SerializedName("isLoadDisabled")
public volatile boolean isLoadDisabled = false;
@SerializedName("isActive")
public volatile boolean isActive = true;
// cloud mode, cloud control just query master, so not need SerializedName
public volatile long currentFragmentNum = 0;
public volatile long lastFragmentUpdateTime = 0;
@Override
public String toString() {
return "[" + "lastSuccessReportTabletsTime='" + lastSuccessReportTabletsTime + '\''
+ ", lastStreamLoadTime=" + lastStreamLoadTime + ", isQueryDisabled=" + isQueryDisabled
+ ", isLoadDisabled=" + isLoadDisabled
+ ", currentFragmentNum=" + currentFragmentNum
+ ", lastFragmentUpdateTime=" + lastFragmentUpdateTime + "]";
}
}
public Tag getLocationTag() {
return locationTag;
}
public Tag getNodeRoleTag() {
return nodeRoleTag;
}
public boolean isMixNode() {
return nodeRoleTag.value.equals(Tag.VALUE_MIX);
}
public boolean isComputeNode() {
return nodeRoleTag.value.equals(Tag.VALUE_COMPUTATION);
}
public void setTagMap(Map<String, String> tagMap) {
Preconditions.checkState(tagMap.containsKey(Tag.TYPE_LOCATION));
this.tagMap = tagMap;
this.locationTag = Tag.createNotCheck(Tag.TYPE_LOCATION, tagMap.get(Tag.TYPE_LOCATION));
if (tagMap.containsKey(Tag.TYPE_ROLE) && Tag.validNodeRoleTag(tagMap.get(Tag.TYPE_ROLE))) {
this.nodeRoleTag = Tag.createNotCheck(Tag.TYPE_ROLE, tagMap.get(Tag.TYPE_ROLE));
}
}
public Map<String, String> getTagMap() {
return tagMap;
}
public TNetworkAddress getBrpcAddress() {
return new TNetworkAddress(getHost(), getBrpcPort());
}
public TNetworkAddress getHeartbeatAddress() {
return new TNetworkAddress(getHost(), getHeartbeatPort());
}
public TNetworkAddress getArrowFlightAddress() {
return new TNetworkAddress(getHost(), getArrowFlightSqlPort());
}
// Only used for users, we hide and rename some internal tags.
public String getTagMapString() {
Map<String, String> displayTagMap = Maps.newHashMap();
displayTagMap.putAll(tagMap);
if (displayTagMap.containsKey("cloud_cluster_public_endpoint")) {
displayTagMap.put("public_endpoint", displayTagMap.remove("cloud_cluster_public_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_private_endpoint")) {
displayTagMap.put("private_endpoint", displayTagMap.remove("cloud_cluster_private_endpoint"));
}
if (displayTagMap.containsKey("cloud_cluster_status")) {
displayTagMap.put("compute_group_status", displayTagMap.remove("cloud_cluster_status"));
}
if (displayTagMap.containsKey("cloud_cluster_id")) {
displayTagMap.put("compute_group_id", displayTagMap.remove("cloud_cluster_id"));
}
if (displayTagMap.containsKey("cloud_cluster_name")) {
displayTagMap.put("compute_group_name", displayTagMap.remove("cloud_cluster_name"));
}
return "{" + new PrintableMap<>(displayTagMap, ":", true, false).toString() + "}";
}
public Long getPublishTaskLastTimeAccumulated() {
return this.lastPublishTaskAccumulatedNum;
}
public void setPublishTaskLastTimeAccumulated(Long accumulatedNum) {
this.lastPublishTaskAccumulatedNum = accumulatedNum;
}
public String getComputeGroup() {
if (Config.isCloudMode()) {
return getCloudClusterId();
} else {
return getLocationTag().value;
}
}
}