CloudInstanceStatusChecker.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.catalog;

import org.apache.doris.analysis.WarmUpClusterStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.CacheHotspotManager;
import org.apache.doris.cloud.CloudWarmUpJob;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;



public class CloudInstanceStatusChecker extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(CloudInstanceStatusChecker.class);
    private CloudSystemInfoService cloudSystemInfoService;
    // if find vcg failed sync, record it timestamp, virtual compute group name <-> timestamp
    private Map<String, Long> lastFailedSyncTimeMap = new ConcurrentHashMap<>();

    public CloudInstanceStatusChecker(CloudSystemInfoService cloudSystemInfoService) {
        super("cloud instance check", Config.cloud_cluster_check_interval_second * 1000L);
        this.cloudSystemInfoService = cloudSystemInfoService;
    }

    @Override
    protected void runAfterCatalogReady() {
        try {
            Cloud.GetInstanceResponse response = cloudSystemInfoService.getCloudInstance();
            if (LOG.isDebugEnabled()) {
                LOG.debug("get from ms response {}", response);
            }
            if (!isResponseValid(response)) {
                return;
            }

            Cloud.InstanceInfoPB instance = response.getInstance();
            cloudSystemInfoService.setInstanceStatus(instance.getStatus());
            syncStorageVault(instance);
            processVirtualClusters(instance.getClustersList());

        } catch (Exception e) {
            LOG.warn("get instance from ms exception", e);
        }
    }

    private boolean isResponseValid(Cloud.GetInstanceResponse response) {
        if (response == null || !response.hasStatus() || !response.getStatus().hasCode()
                || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
            LOG.warn("failed to get cloud instance due to incomplete response, "
                    + "cloud_unique_id={}, response={}", Config.cloud_unique_id, response);
            return false;
        }
        return true;
    }

    private void syncStorageVault(Cloud.InstanceInfoPB instance) {
        Map<String, String> vaultMap = new HashMap<>();
        int cnt = instance.getResourceIdsCount();
        for (int i = 0; i < cnt; i++) {
            String name = instance.getStorageVaultNames(i);
            String id = instance.getResourceIds(i);
            vaultMap.put(name, id);
        }
        Env.getCurrentEnv().getStorageVaultMgr().refreshVaultMap(vaultMap,
                Pair.of(instance.getDefaultStorageVaultName(), instance.getDefaultStorageVaultId()));
    }

    private void processVirtualClusters(List<Cloud.ClusterPB> clusters) {
        List<Cloud.ClusterPB> virtualClusters = new ArrayList<>();
        List<Cloud.ClusterPB> computeClusters = new ArrayList<>();
        categorizeClusters(clusters, virtualClusters, computeClusters);
        handleVirtualClusters(virtualClusters, computeClusters);
        removeObsoleteVirtualGroups(virtualClusters);
    }

    private void categorizeClusters(List<Cloud.ClusterPB> clusters,
                                    List<Cloud.ClusterPB> virtualClusters, List<Cloud.ClusterPB> computeClusters) {
        for (Cloud.ClusterPB cluster : clusters) {
            if (!cluster.hasType()) {
                LOG.warn("found a cluster {} which has no type", cluster);
                continue;
            }
            if (Cloud.ClusterPB.Type.COMPUTE == cluster.getType()) {
                computeClusters.add(cluster);
            }
            if (Cloud.ClusterPB.Type.VIRTUAL == cluster.getType()) {
                virtualClusters.add(cluster);
            }
        }
    }

    private void handleVirtualClusters(List<Cloud.ClusterPB> virtualGroups, List<Cloud.ClusterPB> computeClusters) {
        for (Cloud.ClusterPB virtualGroupInMs : virtualGroups) {
            ComputeGroup virtualGroupInFe = cloudSystemInfoService
                    .getComputeGroupById(virtualGroupInMs.getClusterId());
            if (virtualGroupInFe != null) {
                handleExistingVirtualComputeGroup(virtualGroupInMs, virtualGroupInFe);
            } else {
                handleNewVirtualComputeGroup(virtualGroupInMs, computeClusters);
            }
            // just fe master gen file cache sync task
            if (Env.getCurrentEnv().isMaster()) {
                // get again in fe mem
                virtualGroupInFe = cloudSystemInfoService
                    .getComputeGroupById(virtualGroupInMs.getClusterId());
                if (virtualGroupInFe == null) {
                    LOG.info("virtual compute can not find virtual group {} after handle, may be a empty vcg",
                            virtualGroupInMs);
                    continue;
                }
                syncFileCacheTasksForVirtualGroup(virtualGroupInMs, virtualGroupInFe);
            }
        }
    }

    private void cancelCacheJobs(ComputeGroup vcgInFe, List<String> jobIds) {
        CacheHotspotManager cacheHotspotManager = ((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr();
        for (String jobId : jobIds) {
            try {
                if (Env.getCurrentEnv().isMaster()) {
                    // cancel old jobId, will write editlog, so just master can do
                    cacheHotspotManager.cancel(Long.parseLong(jobId));
                    LOG.info("virtual compute group {}, cancel jobId {}", vcgInFe.getName(), jobId);
                }
            } catch (DdlException e) {
                LOG.warn("virtual compute err, name {}, failed to cancel expired jobId failed {}",
                        vcgInFe.getName(), jobId, e);
            }
        }
    }

    private void checkNeedRebuildFileCache(ComputeGroup virtualGroupInFe, List<String> jobIdsInMs) {
        CacheHotspotManager cacheHotspotManager = ((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr();
        // check jobIds in Ms valid, if been cancelled, start new jobs
        for (String jobId : jobIdsInMs) {
            CloudWarmUpJob job = cacheHotspotManager.getCloudWarmUpJob(Long.parseLong(jobId));
            if (job == null) {
                LOG.warn("virtual compute err, clusterName {} jobId {} not found, need rebuild file cache",
                        virtualGroupInFe.getName(), jobId);
                virtualGroupInFe.setNeedRebuildFileCache(true);
                return;
            }
            if (job.getSrcClusterName() == null || job.getDstClusterName() == null) {
                LOG.info("may be Upgrade after downgrade, warm up job info lost,"
                        + " so just rebuild job, clusterName {}, jobId {}",
                        virtualGroupInFe.getName(), jobId);
                virtualGroupInFe.setNeedRebuildFileCache(true);
                return;
            }
            // check src
            String expectedSrc = virtualGroupInFe.getActiveComputeGroup();
            if (!job.getSrcClusterName().equals(expectedSrc)) {
                LOG.debug("file cache job src mismatch: jobId {} jobSrc {} expectedSrc {}, need rebuild",
                        jobId, job.getSrcClusterName(), expectedSrc);
                virtualGroupInFe.setNeedRebuildFileCache(true);
                return;
            }
            // check dest
            String expectedDst = virtualGroupInFe.getStandbyComputeGroup();
            if (!job.getDstClusterName().equals(expectedDst)) {
                LOG.debug("file cache job dest mismatch: jobId {} jobDst {} expectedDst {}, need rebuild",
                        jobId, job.getDstClusterName(), expectedDst);
                virtualGroupInFe.setNeedRebuildFileCache(true);
                return;
            }
            // check job state
            CloudWarmUpJob.JobState jobState = job.getJobState();
            if (jobState == CloudWarmUpJob.JobState.CANCELLED) {
                LOG.warn("virtual compute err, clusterName {} jobId {} has been cancelled, need rebuild",
                        virtualGroupInFe.getName(), jobId);
                virtualGroupInFe.setNeedRebuildFileCache(true);
                return;
            }
        }
    }

    /**
     * Generates and synchronizes file cache related tasks for virtual computing groups on the FE master.
     */
    private void syncFileCacheTasksForVirtualGroup(Cloud.ClusterPB virtualGroupInMs, ComputeGroup virtualGroupInFe) {
        if (!virtualGroupInMs.hasClusterPolicy()) {
            LOG.warn("virtual compute err, clusterName {}, no cluster policy {}",
                    virtualGroupInFe.getName(), virtualGroupInMs);
            return;
        }
        CacheHotspotManager cacheHotspotManager = ((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr();
        List<String> jobIdsInMs =
                new ArrayList<>(virtualGroupInMs.getClusterPolicy().getCacheWarmupJobidsList());

        checkNeedRebuildFileCache(virtualGroupInFe, jobIdsInMs);
        LOG.debug("virtual compute group {}, get from ms file cache sync task jobIds {}",
                virtualGroupInFe, jobIdsInMs);
        // virtual group has been changed in before step
        if (virtualGroupInFe.isNeedRebuildFileCache()) {
            String srcCg = virtualGroupInFe.getActiveComputeGroup();
            String dstCg = virtualGroupInFe.getStandbyComputeGroup();
            cancelCacheJobs(virtualGroupInFe, jobIdsInMs);
            try {
                // all
                Map<String, String> periodicProperties = new HashMap<>();
                // "sync_mode" = "periodic", "sync_interval_sec" = "fetch_cluster_cache_hotspot_interval_ms"
                periodicProperties.put("sync_mode", "periodic");
                long syncInterValSec = Config.fetch_cluster_cache_hotspot_interval_ms / 1000;
                if (syncInterValSec <= 0) {
                    LOG.warn("invalid Config fetch_cluster_cache_hotspot_interval_ms set it to 600s");
                    syncInterValSec = 600;
                }
                periodicProperties.put("sync_interval_sec", String.valueOf(syncInterValSec));
                WarmUpClusterStmt periodicStmtPeriodic =
                        new WarmUpClusterStmt(dstCg, srcCg, true, periodicProperties);
                long jobIdPeriodic = cacheHotspotManager.createJob(periodicStmtPeriodic);

                // load event
                Map<String, String> eventProperties = new HashMap<>();
                // "sync_mode" = "event_driven", "sync_event" = "load"
                eventProperties.put("sync_mode", "event_driven");
                eventProperties.put("sync_event", "load");
                WarmUpClusterStmt eventStmtPeriodic =
                        new WarmUpClusterStmt(dstCg, srcCg, true, eventProperties);
                long jobIdEvent = cacheHotspotManager.createJob(eventStmtPeriodic);
                // send jobIds to ms
                List<String> newJobIds = Arrays.asList(Long.toString(jobIdPeriodic), Long.toString(jobIdEvent));
                CloudSystemInfoService.updateFileCacheJobIds(virtualGroupInFe, newJobIds);
                LOG.info("virtual compute group {}, generate new jobIds periodic={}, event={}, and old jobIds {}",
                        virtualGroupInFe, jobIdPeriodic, jobIdEvent, jobIdsInMs);
            } catch (AnalysisException e) {
                LOG.warn("virtual compute err, name: {}, analysis error", virtualGroupInFe.getName(), e);
                return;
            }
            virtualGroupInFe.setNeedRebuildFileCache(false);
        }
    }

    private void handleExistingVirtualComputeGroup(Cloud.ClusterPB clusterInMs, ComputeGroup virtualGroupInFe) {
        if (!isClusterIdConsistent(clusterInMs, virtualGroupInFe)) {
            return;
        }

        if (!isClusterPolicyValid(clusterInMs)) {
            return;
        }

        if (!areSubComputeGroupsValid(clusterInMs, virtualGroupInFe)) {
            return;
        }

        diffAndUpdateComputeGroup(clusterInMs, virtualGroupInFe);
    }

    private boolean isClusterIdConsistent(Cloud.ClusterPB cluster, ComputeGroup computeGroup) {
        if (!cluster.getClusterId().equals(computeGroup.getId())) {
            LOG.warn("virtual compute err, group id changed, in fe={} but in ms={}, "
                    + "verbose {}, please check it",
                    computeGroup.getId(), cluster.getClusterId(), computeGroup);
            return false;
        }
        return true;
    }

    private boolean isClusterPolicyValid(Cloud.ClusterPB cluster) {
        if (!cluster.hasClusterPolicy()) {
            LOG.warn("virtual compute err, no cluster policy {}", cluster);
        }
        if (!cluster.getClusterPolicy().hasType()
                || cluster.getClusterPolicy().getType() != Cloud.ClusterPolicy.PolicyType.ActiveStandby) {
            LOG.warn("virtual compute err, current just support Virtual compute group policy ActiveStandby");
            return false;
        }
        if (cluster.getClusterPolicy().getStandbyClusterNamesList().size() != 1) {
            LOG.warn("virtual compute err, current just support one Standby compute group policy ActiveStandby,"
                    + " verbose {}", cluster);
            return false;
        }
        return true;
    }

    private boolean areSubComputeGroupsValid(Cloud.ClusterPB clusterInMs, ComputeGroup virtualGroupInFe) {
        List<String> subComputeGroups = clusterInMs.getClusterNamesList();
        if (subComputeGroups.isEmpty() || virtualGroupInFe.getSubComputeGroups() == null) {
            LOG.warn("virtual compute err, please check it, verbose {}", virtualGroupInFe);
            return false;
        }
        if (subComputeGroups.size() != virtualGroupInFe.getSubComputeGroups().size() || subComputeGroups.size() != 2) {
            LOG.warn("virtual compute err, sub compute group in fe {}, in ms {}",
                    virtualGroupInFe, subComputeGroups);
            return false;
        }
        return true;
    }

    private void diffAndUpdateComputeGroup(Cloud.ClusterPB cluster, ComputeGroup computeGroup) {
        // vcg rename logic, here cluster_id same, but cluster_name changed, so vcg renamed
        String clusterNameInMs = cluster.getClusterName();
        String computeGroupNameInFe = computeGroup.getName();
        if (!clusterNameInMs.equals(computeGroupNameInFe)) {
            LOG.info("virtual compute group renamed from {} to {}", computeGroupNameInFe, clusterNameInMs);
            computeGroup.setName(clusterNameInMs);
            cloudSystemInfoService.renameVirtualComputeGroup(computeGroup.getId(), computeGroupNameInFe, computeGroup);
        }

        List<String> subCgsInFe = computeGroup.getSubComputeGroups();
        List<String> subCgsInMs = new ArrayList<>(cluster.getClusterNamesList());
        Collections.sort(subCgsInFe);
        Collections.sort(subCgsInMs);

        if (!subCgsInFe.equals(subCgsInMs)) {
            LOG.info("virtual compute group change sub cgs from {} to {}", subCgsInFe, subCgsInMs);
            computeGroup.setSubComputeGroups(subCgsInMs);
            computeGroup.setNeedRebuildFileCache(true);
        }

        if (!cluster.getClusterPolicy().getActiveClusterName()
                .equals(computeGroup.getPolicy().activeComputeGroup)) {
            LOG.info("virtual compute group change active group from {} to {}",
                    computeGroup.getPolicy().activeComputeGroup,
                    cluster.getClusterPolicy().getActiveClusterName());
            computeGroup.getPolicy().setActiveComputeGroup(cluster.getClusterPolicy().getActiveClusterName());
            computeGroup.setNeedRebuildFileCache(true);
        }

        if (!cluster.getClusterPolicy().getStandbyClusterNames(0)
                .equals(computeGroup.getPolicy().standbyComputeGroup)) {
            LOG.info("virtual compute group change standby group from {} to {}",
                    computeGroup.getPolicy().standbyComputeGroup,
                    cluster.getClusterPolicy().getStandbyClusterNames(0));
            computeGroup.getPolicy().setStandbyComputeGroup(cluster.getClusterPolicy().getStandbyClusterNames(0));
            computeGroup.setNeedRebuildFileCache(true);
        }

        if (cluster.getClusterPolicy().getFailoverFailureThreshold()
                != computeGroup.getPolicy().failoverFailureThreshold) {
            LOG.info("virtual compute group change failover failure threshold from {} to {}",
                    computeGroup.getPolicy().failoverFailureThreshold,
                    cluster.getClusterPolicy().getFailoverFailureThreshold());
            computeGroup.getPolicy()
                    .setFailoverFailureThreshold(cluster.getClusterPolicy().getFailoverFailureThreshold());
        }

        if (cluster.getClusterPolicy().getUnhealthyNodeThresholdPercent()
                != computeGroup.getPolicy().unhealthyNodeThresholdPercent) {
            LOG.info("virtual compute group change unhealthy node threshold from {} to {}",
                    computeGroup.getPolicy().unhealthyNodeThresholdPercent,
                    cluster.getClusterPolicy().getUnhealthyNodeThresholdPercent());
            computeGroup.getPolicy()
                    .setUnhealthyNodeThresholdPercent(cluster.getClusterPolicy().getUnhealthyNodeThresholdPercent());
        }

        List<String> jobIdsInMs =
                new ArrayList<>(cluster.getClusterPolicy().getCacheWarmupJobidsList());
        List<String> jobIdsInFe = computeGroup.getPolicy().getCacheWarmupJobIds();
        if (!jobIdsInMs.equals(jobIdsInFe)) {
            LOG.debug("set exist vcg {}, jobIds in FE {} in Ms ms {}",
                    cluster.getClusterName(), jobIdsInFe, jobIdsInMs);
            computeGroup.getPolicy().setCacheWarmupJobIds(jobIdsInMs);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("after diff virtual cg info {}", computeGroup);
        }
    }

    private void handleNewVirtualComputeGroup(Cloud.ClusterPB cluster, List<Cloud.ClusterPB> computeClusters) {
        List<String> subComputeGroups = cluster.getClusterNamesList();
        if (subComputeGroups.isEmpty()) {
            LOG.info("found virtual cluster {} which has no sub clusters, skip empty virtual cluster", cluster);
            return;
        }
        if (subComputeGroups.size() != 2) {
            LOG.warn("virtual compute err, sub compute group size not eq 2, in ms {}", subComputeGroups);
            return;
        }
        if (!cluster.hasClusterPolicy()) {
            LOG.warn("virtual compute err, no cluster policy {}", cluster);
            return;
        }
        if (!cluster.getClusterPolicy().hasActiveClusterName()) {
            LOG.warn("virtual compute err, active cluster empty in ms {}", cluster);
            return;
        }
        if (cluster.getClusterPolicy().getStandbyClusterNamesList().size() != 1) {
            LOG.warn("virtual compute err, standby cluster size not eq 1 in ms {}", cluster);
            return;
        }
        checkSubClusters(subComputeGroups, cluster, computeClusters);
        ComputeGroup computeGroup = new ComputeGroup(cluster.getClusterId(),
                cluster.getClusterName(), ComputeGroup.ComputeTypeEnum.VIRTUAL);
        computeGroup.setSubComputeGroups(new ArrayList<>(subComputeGroups));
        ComputeGroup.Policy policy = new ComputeGroup.Policy();
        policy.setActiveComputeGroup(cluster.getClusterPolicy().getActiveClusterName());
        policy.setStandbyComputeGroup(cluster.getClusterPolicy().getStandbyClusterNames(0));
        policy.setFailoverFailureThreshold(cluster.getClusterPolicy().getFailoverFailureThreshold());
        policy.setUnhealthyNodeThresholdPercent(cluster.getClusterPolicy().getUnhealthyNodeThresholdPercent());
        computeGroup.setPolicy(policy);
        computeGroup.setNeedRebuildFileCache(true);
        cloudSystemInfoService.addComputeGroup(cluster.getClusterId(), computeGroup);
        MetricRepo.registerCloudMetrics(cluster.getClusterId(), cluster.getClusterName());
    }

    private void checkSubClusters(List<String> subClusterNames, Cloud.ClusterPB cluster,
                                  List<Cloud.ClusterPB> computeClustersInPB) {
        for (String subClusterName : subClusterNames) {
            if (cloudSystemInfoService.getCloudClusterIdByName(subClusterName) == null) {
                handleFailedSync(cluster, subClusterName, computeClustersInPB);
                continue;
            }
            // CloudClusterChecker find sub compute group
            lastFailedSyncTimeMap.remove(cluster.getClusterName());
        }
    }

    private void handleFailedSync(Cloud.ClusterPB cluster, String subClusterName,
                                  List<Cloud.ClusterPB> computeClustersInPB) {
        if (!lastFailedSyncTimeMap.containsKey(cluster.getClusterName())) {
            lastFailedSyncTimeMap.put(cluster.getClusterName(), System.currentTimeMillis());
        } else {
            List<String> computeGroupsInPb = computeClustersInPB.stream()
                    .map(Cloud.ClusterPB::getClusterName).collect(Collectors.toList());
            if (computeGroupsInPb.contains(subClusterName)) {
                LOG.warn("fe mem cant find {}, it may be wait cluster check to sync", subClusterName);
            } else {
                LOG.warn("fe mem and ms cant find {}, it may be dropped or renamed", subClusterName);
            }
            // sub cluster may be dropped or rename, or fe may be slowly,
            // need manual intervention
            if (System.currentTimeMillis() - lastFailedSyncTimeMap.get(cluster.getClusterName())
                    > 3 * Config.cloud_cluster_check_interval_second * 1000L) {
                LOG.warn("virtual compute err, cant find cluster info by cluster checker, "
                        + "sub cluster: {}, virtual cluster: {}", subClusterName, cluster.getClusterName());
            }
        }
    }

    private void removeObsoleteVirtualGroups(List<Cloud.ClusterPB> virtualClusters) {
        List<String> msVirtualClusters = virtualClusters.stream().map(Cloud.ClusterPB::getClusterId)
                .collect(Collectors.toList());
        for (ComputeGroup computeGroup : cloudSystemInfoService.getComputeGroups(true)) {
            // in fe mem, but not in meta server
            if (!msVirtualClusters.contains(computeGroup.getId())) {
                LOG.info("virtual compute group {} will be removed.", computeGroup.getName());
                cloudSystemInfoService.removeComputeGroup(computeGroup.getId(), computeGroup.getName());
                // cancel invalid job
                if (!computeGroup.getPolicy().getCacheWarmupJobIds().isEmpty()) {
                    cancelCacheJobs(computeGroup, computeGroup.getPolicy().getCacheWarmupJobIds());
                }
            }
        }
    }
}