CloudClusterChecker.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.catalog;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.ClusterPB;
import org.apache.doris.cloud.proto.Cloud.ClusterPB.Type;
import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class CloudClusterChecker extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(CloudClusterChecker.class);
private CloudSystemInfoService cloudSystemInfoService;
private final Object checkLock = new Object();
boolean isUpdateCloudUniqueId = false;
public CloudClusterChecker(CloudSystemInfoService cloudSystemInfoService) {
super("cloud cluster check", Config.cloud_cluster_check_interval_second * 1000L);
this.cloudSystemInfoService = cloudSystemInfoService;
}
/**
* Diff 2 collections of current and the dest.
* @param toAdd output param = (expectedState - currentState)
* @param toDel output param = (currentState - expectedState)
* @param supplierCurrentMapFunc get the current be or fe objects information map from memory, a lambda function
* @param supplierNodeMapFunc get be or fe information map from meta_service return pb, a lambda function
*/
private <T> void diffNodes(List<T> toAdd, List<T> toDel, Supplier<Map<String, T>> supplierCurrentMapFunc,
Supplier<Map<String, T>> supplierNodeMapFunc) {
if (toAdd == null || toDel == null) {
return;
}
// TODO(gavin): Consider VPC
// vpc:ip:port -> Nodes
Map<String, T> currentMap = supplierCurrentMapFunc.get();
Map<String, T> nodeMap = supplierNodeMapFunc.get();
if (LOG.isDebugEnabled()) {
LOG.debug("current Nodes={} expected Nodes={}", currentMap.keySet(), nodeMap.keySet());
}
toDel.addAll(currentMap.keySet().stream().filter(i -> !nodeMap.containsKey(i))
.map(currentMap::get).collect(Collectors.toList()));
toAdd.addAll(nodeMap.keySet().stream().filter(i -> !currentMap.containsKey(i))
.map(nodeMap::get).collect(Collectors.toList()));
}
private void checkToAddCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<String> localClusterIds) {
List<String> toAddClusterIds = remoteClusterIdToPB.keySet().stream()
.filter(i -> !localClusterIds.contains(i)).collect(Collectors.toList());
toAddClusterIds.forEach(
addId -> {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to add clusterId: {}", addId);
}
// Attach tag to BEs
String clusterName = remoteClusterIdToPB.get(addId).getClusterName();
String clusterId = remoteClusterIdToPB.get(addId).getClusterId();
String publicEndpoint = remoteClusterIdToPB.get(addId).getPublicEndpoint();
String privateEndpoint = remoteClusterIdToPB.get(addId).getPrivateEndpoint();
// For old versions that do no have status field set
ClusterStatus clusterStatus = remoteClusterIdToPB.get(addId).hasClusterStatus()
? remoteClusterIdToPB.get(addId).getClusterStatus() : ClusterStatus.NORMAL;
List<Backend> toAdd = new ArrayList<>();
for (Cloud.NodeInfoPB node : remoteClusterIdToPB.get(addId).getNodesList()) {
String addr = Config.enable_fqdn_mode ? node.getHost() : node.getIp();
if (Strings.isNullOrEmpty(addr)) {
LOG.warn("cant get valid add from ms {}", node);
continue;
}
Backend b = new Backend(Env.getCurrentEnv().getNextId(), addr, node.getHeartbeatPort());
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus));
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, publicEndpoint);
newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, privateEndpoint);
newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId());
b.setTagMap(newTagMap);
toAdd.add(b);
}
cloudSystemInfoService.updateCloudBackends(toAdd, new ArrayList<>());
}
);
}
private void checkToDelCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<String> localClusterIds,
Map<String, List<Backend>> clusterIdToBackend) {
List<String> toDelClusterIds = localClusterIds.stream()
.filter(i -> !remoteClusterIdToPB.containsKey(i)).collect(Collectors.toList());
// drop be cluster
Map<String, List<Backend>> finalClusterIdToBackend = clusterIdToBackend;
toDelClusterIds.forEach(
delId -> {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to drop clusterId: {}", delId);
}
List<Backend> toDel =
new ArrayList<>(finalClusterIdToBackend.getOrDefault(delId, new ArrayList<>()));
cloudSystemInfoService.updateCloudBackends(new ArrayList<>(), toDel);
// del clusterName
String delClusterName = cloudSystemInfoService.getClusterNameByClusterId(delId);
if (delClusterName.isEmpty()) {
LOG.warn("can't get delClusterName, clusterId: {}, plz check", delId);
return;
}
// del clusterID
cloudSystemInfoService.dropCluster(delId, delClusterName);
}
);
}
private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB> expectedBes) {
Map<String, Backend> currentMap = new HashMap<>();
for (Backend be : currentBes) {
String endpoint = be.getHost() + ":" + be.getHeartbeatPort();
currentMap.put(endpoint, be);
}
for (Cloud.NodeInfoPB node : expectedBes) {
String addr = Config.enable_fqdn_mode ? node.getHost() : node.getIp();
if (Strings.isNullOrEmpty(addr)) {
LOG.warn("cant get valid add from ms {}", node);
continue;
}
String endpoint = addr + ":" + node.getHeartbeatPort();
Cloud.NodeStatusPB status = node.getStatus();
Backend be = currentMap.get(endpoint);
if (be == null) {
LOG.warn("cant get valid be {} from fe mem, ignore it checker will add this be at next", endpoint);
continue;
}
if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) {
if (!be.isDecommissioned()) {
LOG.info("decommissioned backend: {} status: {}", be, status);
try {
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(be.getId());
} catch (UserException e) {
LOG.warn("failed to register water shed txn id, decommission be {}", be.getId(), e);
}
be.setDecommissioning(true);
}
}
if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED) {
// When the synchronization status of the node is "NODE_STATUS_DECOMMISSIONED",
// it indicates that the conditions for decommissioning have
// already been checked in CloudTabletRebalancer.java,
// such as the tablets having been successfully migrated and no remnants of WAL on the backend (BE).
if (!be.isDecommissioned()) {
LOG.warn("impossible status, somewhere has bug, backend: {} status: {}", be, status);
}
be.setDecommissioned(true);
// edit log
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
}
}
}
private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,
Map<String, List<Backend>> clusterIdToBackend) {
for (String cid : clusterIdToBackend.keySet()) {
List<Backend> toAdd = new ArrayList<>();
List<Backend> toDel = new ArrayList<>();
ClusterPB cp = remoteClusterIdToPB.get(cid);
if (cp == null) {
LOG.warn("can't get cid {} info, and local cluster info {}, remote cluster info {}",
cid, clusterIdToBackend, remoteClusterIdToPB);
continue;
}
String newClusterName = cp.getClusterName();
List<Backend> currentBes = clusterIdToBackend.getOrDefault(cid, new ArrayList<>());
String currentClusterName = currentBes.stream().map(Backend::getCloudClusterName).findFirst().orElse("");
if (!newClusterName.equals(currentClusterName)) {
// rename cluster's name
LOG.info("cluster_name corresponding to cluster_id has been changed,"
+ " cluster_id : {} , current_cluster_name : {}, new_cluster_name :{}",
cid, currentClusterName, newClusterName);
// change all be's cluster_name
currentBes.forEach(b -> b.setCloudClusterName(newClusterName));
// update clusterNameToId
cloudSystemInfoService.updateClusterNameToId(newClusterName, currentClusterName, cid);
// update tags
currentBes.forEach(b -> Env.getCurrentEnv().getEditLog().logModifyBackend(b));
}
String currentClusterStatus = cloudSystemInfoService.getCloudStatusById(cid);
// For old versions that do no have status field set
ClusterStatus clusterStatus = cp.hasClusterStatus() ? cp.getClusterStatus() : ClusterStatus.NORMAL;
String newClusterStatus = String.valueOf(clusterStatus);
if (LOG.isDebugEnabled()) {
LOG.debug("current cluster status {} {}", currentClusterStatus, newClusterStatus);
}
boolean needChange = false;
// ATTN: found bug, In the same cluster, the cluster status in the tags of BE nodes is inconsistent.
// Using a set to collect the cluster statuses from the BE nodes.
Set<String> clusterStatusInMem = new HashSet<>();
for (Backend backend : currentBes) {
String beClusterStatus = backend.getTagMap().get(Tag.CLOUD_CLUSTER_STATUS);
clusterStatusInMem.add(beClusterStatus == null ? "NOT_SET" : beClusterStatus);
}
if (clusterStatusInMem.size() != 1) {
LOG.warn("cluster {}, multi be nodes cluster status inconsistent, fix it {}", cid, clusterStatusInMem);
needChange = true;
}
if (!currentClusterStatus.equals(newClusterStatus) || needChange) {
// cluster's status changed
LOG.info("cluster_status corresponding to cluster_id has been changed,"
+ " cluster_id : {} , current_cluster_status : {}, new_cluster_status :{}",
cid, currentClusterStatus, newClusterStatus);
// change all be's cluster_status
currentBes.forEach(b -> b.setCloudClusterStatus(newClusterStatus));
// update tags
currentBes.forEach(b -> Env.getCurrentEnv().getEditLog().logModifyBackend(b));
}
List<String> currentBeEndpoints = currentBes.stream().map(backend ->
backend.getHost() + ":" + backend.getHeartbeatPort()).collect(Collectors.toList());
List<Cloud.NodeInfoPB> expectedBes = remoteClusterIdToPB.get(cid).getNodesList();
List<String> remoteBeEndpoints = expectedBes.stream()
.map(pb -> {
String addr = Config.enable_fqdn_mode ? pb.getHost() : pb.getIp();
if (Strings.isNullOrEmpty(addr)) {
LOG.warn("cant get valid add from ms {}", pb);
return "";
}
return addr + ":" + pb.getHeartbeatPort();
}).filter(e -> !Strings.isNullOrEmpty(e))
.collect(Collectors.toList());
LOG.info("get cloud cluster, clusterId={} local nodes={} remote nodes={}", cid,
currentBeEndpoints, remoteBeEndpoints);
updateStatus(currentBes, expectedBes);
diffNodes(toAdd, toDel, () -> {
Map<String, Backend> currentMap = new HashMap<>();
for (Backend be : currentBes) {
String endpoint = be.getHost() + ":" + be.getHeartbeatPort()
+ be.getCloudPublicEndpoint() + be.getCloudPrivateEndpoint();
currentMap.put(endpoint, be);
}
return currentMap;
}, () -> {
Map<String, Backend> nodeMap = new HashMap<>();
for (Cloud.NodeInfoPB node : expectedBes) {
String host = Config.enable_fqdn_mode ? node.getHost() : node.getIp();
if (Strings.isNullOrEmpty(host)) {
LOG.warn("cant get valid add from ms {}", node);
continue;
}
String endpoint = host + ":" + node.getHeartbeatPort()
+ remoteClusterIdToPB.get(cid).getPublicEndpoint()
+ remoteClusterIdToPB.get(cid).getPrivateEndpoint();
Backend b = new Backend(Env.getCurrentEnv().getNextId(), host, node.getHeartbeatPort());
if (node.hasIsSmoothUpgrade()) {
b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
}
// Attach tag to BEs
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, remoteClusterIdToPB.get(cid).getClusterName());
newTagMap.put(Tag.CLOUD_CLUSTER_ID, remoteClusterIdToPB.get(cid).getClusterId());
newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, remoteClusterIdToPB.get(cid).getPublicEndpoint());
newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
remoteClusterIdToPB.get(cid).getPrivateEndpoint());
newTagMap.put(Tag.CLOUD_UNIQUE_ID, node.getCloudUniqueId());
b.setTagMap(newTagMap);
nodeMap.put(endpoint, b);
}
return nodeMap;
});
if (LOG.isDebugEnabled()) {
LOG.debug("cluster_id: {}, diffBackends nodes: {}, current: {}, toAdd: {}, toDel: {}",
cid, expectedBes, currentBes, toAdd, toDel);
}
if (toAdd.isEmpty() && toDel.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("runAfterCatalogReady nothing todo");
}
continue;
}
cloudSystemInfoService.updateCloudBackends(toAdd, toDel);
}
}
@Override
protected void runAfterCatalogReady() {
synchronized (checkLock) {
checkCloudBackends();
updateCloudMetrics();
checkCloudFes();
}
}
private void checkFeNodesMapValid() {
if (LOG.isDebugEnabled()) {
LOG.debug("begin checkFeNodesMapValid");
}
Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend();
Set<String> clusterIds = new HashSet<>();
Set<String> clusterNames = new HashSet<>();
clusterIdToBackend.forEach((clusterId, bes) -> {
if (bes.isEmpty()) {
LOG.warn("impossible, somewhere err, clusterId {}, clusterIdToBeMap {}", clusterId, clusterIdToBackend);
clusterIdToBackend.remove(clusterId);
}
bes.forEach(be -> {
clusterIds.add(be.getCloudClusterId());
clusterNames.add(be.getCloudClusterName());
});
});
Map<String, String> nameToId = cloudSystemInfoService.getCloudClusterNameToId();
nameToId.forEach((clusterName, clusterId) -> {
if (!clusterIdToBackend.containsKey(clusterId)) {
LOG.warn("impossible, somewhere err, clusterId {}, clusterName {}, clusterNameToIdMap {}",
clusterId, clusterName, nameToId);
nameToId.remove(clusterName);
}
});
if (!clusterNames.containsAll(nameToId.keySet()) || !nameToId.keySet().containsAll(clusterNames)) {
LOG.warn("impossible, somewhere err, clusterNames {}, nameToId {}", clusterNames, nameToId);
}
if (!clusterIds.containsAll(nameToId.values()) || !nameToId.values().containsAll(clusterIds)) {
LOG.warn("impossible, somewhere err, clusterIds {}, nameToId {}", clusterIds, nameToId);
}
if (!clusterIds.containsAll(clusterIdToBackend.keySet())
|| !clusterIdToBackend.keySet().containsAll(clusterIds)) {
LOG.warn("impossible, somewhere err, clusterIds {}, clusterIdToBackend {}",
clusterIds, clusterIdToBackend);
}
}
private void checkCloudFes() {
Cloud.GetClusterResponse response = cloudSystemInfoService.getCloudCluster(
Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("failed to get cloud cluster due to incomplete response, "
+ "cloud_unique_id={}, clusterId={}, response={}",
Config.cloud_unique_id, Config.cloud_sql_server_cluster_id, response);
return;
}
// Note: get_cluster interface cluster(option -> repeated), so it has at least one cluster.
if (response.getClusterCount() == 0) {
LOG.warn("meta service error , return cluster zero, plz check it, "
+ "cloud_unique_id={}, clusterId={}, response={}",
Config.cloud_unique_id, Config.cloud_sql_server_cluster_id, response);
return;
}
ClusterPB cpb = response.getCluster(0);
if (LOG.isDebugEnabled()) {
LOG.debug("get cloud cluster, clusterId={} nodes={}",
Config.cloud_sql_server_cluster_id, cpb.getNodesList());
}
List<Frontend> currentFollowers = Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER);
List<Frontend> currentObservers = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
currentFollowers.addAll(currentObservers);
List<Frontend> currentFes = new ArrayList<>(currentFollowers.stream().collect(Collectors.toMap(
fe -> fe.getHost() + ":" + fe.getEditLogPort(),
fe -> fe,
(existing, replacement) -> existing
)).values());
List<Frontend> toAdd = new ArrayList<>();
List<Frontend> toDel = new ArrayList<>();
List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();
if (!isUpdateCloudUniqueId) {
// Just run once and number of fes is small, so iterating is ok.
// newly addde fe has cloudUniqueId.
for (Frontend fe : currentFes) {
for (Cloud.NodeInfoPB node : expectedFes) {
if (fe.getHost().equals(Config.enable_fqdn_mode ? node.getHost() : node.getIp())
&& fe.getEditLogPort() == node.getEditLogPort()) {
fe.setCloudUniqueId(node.getCloudUniqueId());
LOG.info("update cloud unique id result {}", fe);
break;
}
}
}
isUpdateCloudUniqueId = true;
}
diffNodes(toAdd, toDel, () -> {
// memory
Map<String, Frontend> currentMap = new HashMap<>();
String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
for (Frontend fe : currentFes) {
String endpoint = fe.getHost() + "_" + fe.getEditLogPort();
if (selfNode.equals(endpoint)) {
continue;
}
// add type to map key, for diff
endpoint = endpoint + "_" + fe.getRole();
currentMap.put(endpoint, fe);
}
LOG.info("fes in memory {}", currentMap);
return currentMap;
}, () -> {
// meta service
Map<String, Frontend> nodeMap = new HashMap<>();
String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
for (Cloud.NodeInfoPB node : expectedFes) {
String host = Config.enable_fqdn_mode ? node.getHost() : node.getIp();
if (Strings.isNullOrEmpty(host)) {
LOG.warn("cant get valid add from ms {}", node);
continue;
}
String endpoint = host + "_" + node.getEditLogPort();
if (selfNode.equals(endpoint)) {
continue;
}
Cloud.NodeInfoPB.NodeType type = node.getNodeType();
// ATTN: just allow to add follower or observer
if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) {
LOG.warn("impossible !!!, get fe node {} type equal master from ms", node);
}
FrontendNodeType role = type == Cloud.NodeInfoPB.NodeType.FE_OBSERVER
? FrontendNodeType.OBSERVER : FrontendNodeType.FOLLOWER;
Frontend fe = new Frontend(role,
CloudEnv.genFeNodeNameFromMeta(host, node.getEditLogPort(),
node.getCtime() * 1000), host, node.getEditLogPort());
fe.setCloudUniqueId(node.getCloudUniqueId());
// add type to map key, for diff
endpoint = endpoint + "_" + fe.getRole();
nodeMap.put(endpoint, fe);
}
LOG.info("fes in ms {}", nodeMap);
return nodeMap;
});
LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}, enable auto start: {}",
expectedFes, currentFes, toAdd, toDel, Config.enable_auto_start_for_cloud_cluster);
if (toAdd.isEmpty() && toDel.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("runAfterCatalogReady getObserverFes nothing todo");
}
return;
}
try {
cloudSystemInfoService.updateFrontends(toAdd, toDel);
} catch (DdlException e) {
LOG.warn("update cloud frontends exception e: {}, msg: {}", e, e.getMessage());
}
}
private void checkCloudBackends() {
Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend();
//rpc to ms, to get mysql user can use cluster_id
// NOTE: rpc args all empty, use cluster_unique_id to get a instance's all cluster info.
Cloud.GetClusterResponse response = cloudSystemInfoService.getCloudCluster("", "", "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
&& response.getStatus().getCode() != MetaServiceCode.CLUSTER_NOT_FOUND)) {
LOG.warn("failed to get cloud cluster due to incomplete response, "
+ "cloud_unique_id={}, response={}", Config.cloud_unique_id, response);
return;
}
Set<String> localClusterIds = clusterIdToBackend.keySet();
// clusterId -> clusterPB
Map<String, ClusterPB> remoteClusterIdToPB = response.getClusterList().stream()
.filter(c -> c.getType() != Type.SQL)
.collect(Collectors.toMap(ClusterPB::getClusterId, clusterPB -> clusterPB));
LOG.info("get cluster info clusterIds: {}", remoteClusterIdToPB);
try {
// cluster_ids diff remote <clusterId, nodes> and local <clusterId, nodes>
// remote - local > 0, add bes to local
checkToAddCluster(remoteClusterIdToPB, localClusterIds);
// local - remote > 0, drop bes from local
checkToDelCluster(remoteClusterIdToPB, localClusterIds, clusterIdToBackend);
clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend();
if (remoteClusterIdToPB.keySet().size() != clusterIdToBackend.keySet().size()) {
LOG.warn("impossible cluster id size not match, check it local {}, remote {}",
clusterIdToBackend, remoteClusterIdToPB);
}
// clusterID local == remote, diff nodes
checkDiffNode(remoteClusterIdToPB, clusterIdToBackend);
// check mem map
checkFeNodesMapValid();
} catch (Exception e) {
LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
}
LOG.info("daemon cluster get cluster info succ, current cloudClusterIdToBackendMap: {} clusterNameToId {}",
cloudSystemInfoService.getCloudClusterIdToBackend(), cloudSystemInfoService.getCloudClusterNameToId());
}
private void updateCloudMetrics() {
// Metric
Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend();
Map<String, String> clusterNameToId = cloudSystemInfoService.getCloudClusterNameToId();
for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
int aliveNum = 0;
List<Backend> bes = clusterIdToBackend.get(entry.getValue());
if (bes == null || bes.isEmpty()) {
LOG.info("cant get be nodes by cluster {}, bes {}", entry, bes);
continue;
}
for (Backend backend : bes) {
MetricRepo.updateClusterBackendAlive(entry.getKey(), entry.getValue(),
backend.getAddress(), backend.isAlive());
aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
}
MetricRepo.updateClusterBackendAliveTotal(entry.getKey(), entry.getValue(), aliveNum);
}
}
public void checkNow() {
if (Env.getCurrentEnv().isMaster()) {
synchronized (checkLock) {
runAfterCatalogReady();
}
}
}
}