CacheHotspotManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud;

import org.apache.doris.analysis.CancelCloudWarmUpStmt;
import org.apache.doris.analysis.WarmUpClusterStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.CloudWarmUpJob.JobState;
import org.apache.doris.cloud.CloudWarmUpJob.JobType;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
import org.apache.doris.thrift.THotPartition;
import org.apache.doris.thrift.THotTableMessage;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CacheHotspotManager extends MasterDaemon {
    public static final int MAX_SHOW_ENTRIES = 2000;
    private static final Logger LOG = LogManager.getLogger(CacheHotspotManager.class);
    private static final int CYCLE_COUNT_TO_CHECK_EXPIRE_CLOUD_WARM_UP_JOB = 20;
    private final CloudSystemInfoService nodeMgr;

    // periodically clear and re-build <id, table> message for
    // efficiency and memory consumption issue
    private Map<Long, Table> idToTable = new HashMap<>();

    private boolean tableCreated = false;

    private List<String> insertValueBatches = new ArrayList<String>();

    private int cycleCount = 0;

    private MasterDaemon jobDaemon;

    private boolean startJobDaemon = false;

    private ConcurrentMap<Long, CloudWarmUpJob> cloudWarmUpJobs = Maps.newConcurrentMap();

    private ConcurrentMap<Long, CloudWarmUpJob> activeCloudWarmUpJobs = Maps.newConcurrentMap();

    private ConcurrentMap<Long, CloudWarmUpJob> runnableCloudWarmUpJobs = Maps.newConcurrentMap();

    private Set<String> runnableClusterSet = ConcurrentHashMap.newKeySet();

    private final ThreadPoolExecutor cloudWarmUpThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(
            Config.max_active_cloud_warm_up_job, "cloud-warm-up-pool", true);

    public CacheHotspotManager(CloudSystemInfoService nodeMgr) {
        super("CacheHotspotManager", Config.fetch_cluster_cache_hotspot_interval_ms);
        this.nodeMgr = nodeMgr;
    }

    @Override
    public void runAfterCatalogReady() {
        if (!startJobDaemon) {
            jobDaemon = new JobDaemon();
            jobDaemon.start();
            startJobDaemon = true;
        }
        if (!tableCreated) {
            try {
                CacheHotspotManagerUtils.execCreateCacheTable();
                tableCreated = true;
            } catch (Exception e) {
                LOG.warn("Create cache hot spot table failed", e);
                return;
            }
        }
        traverseAllDatabaseForTable();
        // it's thread safe to iterate through this concurrent map's ref
        nodeMgr.getCloudClusterIdToBackend().entrySet().forEach(clusterToBeList -> {
            List<Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>> futureList
                    = new ArrayList<>();
            clusterToBeList.getValue().forEach(backend -> {
                try {
                    futureList.add(getTopNHotPartitionsAsync(backend));
                } catch (TException | RpcException e) {
                    LOG.warn("send getTopNHotPartitionsAsync to be {} failed due to {}", backend, e);
                }
            });

            List<Pair<TGetTopNHotPartitionsResponse, Backend>> responseList = fetchOneClusterHotSpot(futureList);
            responseList.forEach((Pair<TGetTopNHotPartitionsResponse, Backend> respPair) -> {
                TGetTopNHotPartitionsResponse resp = respPair.first;
                if (resp.isSetHotTables()) {
                    resp.getHotTables().forEach((THotTableMessage hotTable) -> {
                        if (hotTable.isSetHotPartitions()) {
                            hotTable.hot_partitions.forEach((THotPartition partition) -> {
                                insertIntoTable(clusterToBeList.getKey(), hotTable.table_id,
                                        hotTable.index_id, resp.file_cache_size, partition, respPair.second);
                            });
                        }
                    });
                }
            });
        });
        triggerBatchInsert();
        idToTable.clear();
    }

    public boolean containsCluster(String clusterName) {
        return CacheHotspotManagerUtils.clusterContains(nodeMgr.getCloudClusterIdByName(clusterName));
    }

    // table_id table_name, index_id, partition_id
    public List<List<String>> getClusterTopNHotPartitions(String clusterName) {
        return CacheHotspotManagerUtils.getClusterTopNPartitions(nodeMgr.getCloudClusterIdByName(clusterName));
    }

    /**
     * traverse all database to cache all tableId -> table
     */
    private void traverseAllDatabaseForTable() {
        // dbs are stored in one concurrent hash map in catalog
        // return one list of snapshot, the item might be deleted
        // but java guarantee it will not be erased from memory
        Env.getCurrentInternalCatalog().getDbs().forEach(database -> {
            try {
                database.readLock();
                // database already dropped
                if (database.getDbState() != Database.DbState.NORMAL) {
                    return;
                }
                // it's thread safe to merge one concurrent map
                idToTable.putAll(database.getIdToTableRef());
            } finally {
                database.readUnlock();
            }
        });
    }

    private void triggerBatchInsert() {
        try {
            CacheHotspotManagerUtils.doBatchInsert(insertValueBatches);
        } catch (Exception e) {
            LOG.warn("Failed to insert into file cache hotspot table due to ", e);
        } finally {
            insertValueBatches.clear();
        }
    }

    private void insertIntoTable(String clusterId, long tableId, long indexId, long fileCacheSize,
            THotPartition partition, Backend backend) {
        LOG.info("table id {}, index id {}, partition id {}", tableId, indexId, partition.partition_id);
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
        DateTimeFormatter dateformatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        String formattedDateTime = now.format(formatter);
        String insertDay = now.format(dateformatter);

        Map<String, String> params = new HashMap<>();
        params.put("cluster_id", clusterId);
        params.put("cluster_name", nodeMgr.getClusterNameByClusterId(clusterId));
        params.put("backend_id", String.valueOf(backend.getId()));
        params.put("creation_time", formattedDateTime);
        params.put("file_cache_size", String.valueOf(fileCacheSize));
        params.put("insert_day", insertDay);
        Table t;
        if (!idToTable.containsKey(tableId)) {
            return;
        }
        // it might be null?
        t = idToTable.get(tableId);
        params.put("table_id", String.valueOf(tableId));
        params.put("table_name", String.format("%s.%s", t.getDBName(), t.getName()));
        OlapTable olapTable = (OlapTable) t;
        params.put("index_name", String.valueOf(olapTable.getIndexNameById(indexId)));
        params.put("index_id", String.valueOf(indexId));
        Optional<Partition> op = t.getPartitionNames().stream().map(t::getPartition)
                                .filter(p -> p.getId() == partition.partition_id).findAny();
        if (!op.isPresent()) {
            LOG.warn("partition id {} is invalid", partition.partition_id);
            return;
        }
        params.put("partition_name", op.get().getName());
        params.put("partition_id", String.valueOf(partition.partition_id));
        LOG.info("has qpd {}, has qpw {}", partition.isSetQueryPerDay(), partition.isSetQueryPerWeek());
        if (partition.isSetQueryPerDay()) {
            params.put("qpd", String.valueOf(partition.getQueryPerDay()));
        } else {
            params.put("qpd", "0");
        }
        if (partition.isSetQueryPerWeek()) {
            params.put("qpw", String.valueOf(partition.getQueryPerWeek()));
        } else {
            params.put("qpw", "0");
        }
        // Doris's datetime v2 doesn't support time zone
        LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(partition.last_access_time),
                ZoneId.systemDefault());
        params.put("last_access_time", localDateTime.format(formatter));

        CacheHotspotManagerUtils.transformIntoCacheHotSpotTableValue(params, insertValueBatches);
        if (insertValueBatches.size() == Config.batch_insert_cluster_cache_hotspot_num) {
            triggerBatchInsert();
        }
    }

    private Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>
            getTopNHotPartitionsAsync(Backend be) throws TException, RpcException, RuntimeException {
        CompletableFuture<TGetTopNHotPartitionsResponse> f = CompletableFuture.supplyAsync(() -> {
            boolean ok = false;
            BackendService.Client client = null;
            TNetworkAddress address = null;
            try {
                address = new TNetworkAddress(be.getHost(), be.getBePort());
                client = ClientPool.backendPool.borrowObject(address);
                TGetTopNHotPartitionsResponse resp = client.getTopNHotPartitions(
                    new TGetTopNHotPartitionsRequest());
                ok = true;
                return resp;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (ok) {
                    ClientPool.backendPool.returnObject(address, client);
                } else {
                    ClientPool.backendPool.invalidateObject(address, client);
                }
            }
        });
        return Pair.of(f, be);
    }

    // the message we need:
    // cluster_id varchar,
    // cluster_name varchar,
    // backend_id bigint,
    // creation_time DATETIMEV2,
    // file_cache_size bigint,
    // table_name varchar,
    // partition_name varchar,
    // last_access_time DATETIMEV2
    private List<Pair<TGetTopNHotPartitionsResponse, Backend>> fetchOneClusterHotSpot(
            List<Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>> futureList) {
        List<Pair<TGetTopNHotPartitionsResponse, Backend>> responseList = new ArrayList<>();
        long timeoutMs = Math.min(5000, Config.remote_fragment_exec_timeout_ms);
        futureList.forEach(futureBackendPair -> {
            TStatusCode code = TStatusCode.OK;
            String errMsg = null;
            Exception exception = null;
            Future<TGetTopNHotPartitionsResponse> f = futureBackendPair.key();
            try {
                // temporary value, would change to config
                TGetTopNHotPartitionsResponse result = f.get(timeoutMs, TimeUnit.MILLISECONDS);
                responseList.add(Pair.of(result, futureBackendPair.second));
            } catch (ExecutionException e) {
                exception = e;
                code = TStatusCode.THRIFT_RPC_ERROR;
            } catch (InterruptedException e) {
                exception = e;
                code = TStatusCode.INTERNAL_ERROR;
            } catch (TimeoutException e) {
                exception = e;
                errMsg = "timeout when waiting for fetch cache hotspot RPC. Wait(sec): " + timeoutMs / 1000;
                code = TStatusCode.TIMEOUT;
            }
            if (code != TStatusCode.OK) {
                LOG.warn("Fetch be {}'s cache hotspot information throw {}, errmsg {}",
                        futureBackendPair.second.getAddress(), exception, errMsg);
            }
        });
        return responseList;
    }

    Long getFileCacheCapacity(String clusterName) throws RuntimeException {
        List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                                        .getBackendsByClusterName(clusterName);
        Long totalFileCache = 0L;
        for (Backend backend : backends) {
            Long fileCacheSize = 0L;
            boolean ok = false;
            BackendService.Client client = null;
            TNetworkAddress address = null;
            try {
                address = new TNetworkAddress(backend.getHost(), backend.getBePort());
                client = ClientPool.backendPool.borrowObject(address);
                TGetTopNHotPartitionsResponse resp = client.getTopNHotPartitions(
                        new TGetTopNHotPartitionsRequest());
                fileCacheSize = resp.file_cache_size;
                ok = true;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (ok) {
                    ClientPool.backendPool.returnObject(address, client);
                } else {
                    ClientPool.backendPool.invalidateObject(address, client);
                }
            }
            totalFileCache += fileCacheSize;
        }
        return totalFileCache;
    }

    private Map<Long, List<List<Long>>> splitBatch(Map<Long, List<Tablet>> beToWarmUpTablets) {
        final Long maxSizePerBatch = 10737418240L; // 10G
        Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
        for (Map.Entry<Long, List<Tablet>> entry : beToWarmUpTablets.entrySet()) {
            List<List<Long>> batches = new ArrayList<>();
            List<Long> batch = new ArrayList<>();
            Long curBatchSize = 0L;
            for (Tablet tablet : entry.getValue()) {
                if (curBatchSize + tablet.getDataSize(true) > maxSizePerBatch) {
                    batches.add(batch);
                    batch = new ArrayList<>();
                    curBatchSize = 0L;
                }
                batch.add(tablet.getId());
                curBatchSize += tablet.getDataSize(true);
            }
            if (!batch.isEmpty()) {
                batches.add(batch);
            }
            beToTabletIdBatches.put(entry.getKey(), batches);
        }
        return beToTabletIdBatches;
    }

    private Map<Long, List<Tablet>> warmUpNewClusterByCluster(String dstClusterName, String srcClusterName) {
        Long dstTotalFileCache = getFileCacheCapacity(dstClusterName);
        List<List<String>> result = getClusterTopNHotPartitions(srcClusterName);
        Long warmUpTabletsSize = 0L;
        List<Tablet> tablets = new ArrayList<>();
        for (List<String> line : result) {
            Long tableId = Long.parseLong(line.get(0));
            String[] tmp = line.get(1).split("\\.");
            String dbName = tmp[0];
            Long partitionId = Long.parseLong(line.get(2));
            Long indexId = Long.parseLong(line.get(3));
            Database db = Env.getCurrentInternalCatalog().getDbNullable("default_cluster:" + dbName);
            if (db == null) {
                continue;
            }
            OlapTable table = (OlapTable) db.getTableNullable(tableId);
            if (table == null) {
                continue;
            }
            Partition partition = table.getPartition(partitionId);
            if (partition == null) {
                continue;
            }
            MaterializedIndex index = partition.getIndex(indexId);
            if (index == null) {
                continue;
            }
            for (Tablet tablet : index.getTablets()) {
                warmUpTabletsSize += tablet.getDataSize(true);
                tablets.add(tablet);
                if (warmUpTabletsSize >= dstTotalFileCache) {
                    break;
                }
            }
            if (warmUpTabletsSize >= dstTotalFileCache) {
                break;
            }
        }
        Collections.reverse(tablets);
        List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                                        .getBackendsByClusterName(dstClusterName);
        Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
        for (Backend backend : backends) {
            Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
                                    .getCloudTabletRebalancer()
                                    .getSnapshotTabletsInPrimaryByBeId(backend.getId());
            List<Tablet> warmUpTablets = new ArrayList<>();
            for (Tablet tablet : tablets) {
                if (beTabletIds.contains(tablet.getId())) {
                    warmUpTablets.add(tablet);
                }
            }
            beToWarmUpTablets.put(backend.getId(), warmUpTablets);
        }
        return beToWarmUpTablets;
    }

    public List<List<String>> getSingleJobInfo(long jobId) throws AnalysisException {
        List<List<String>> infos = new ArrayList<List<String>>();
        CloudWarmUpJob job = cloudWarmUpJobs.get(jobId);
        if (job == null) {
            throw new AnalysisException("cloud warm up with job " + jobId + " does not exist");
        }
        infos.add(job.getJobInfo());
        return infos;
    }

    private class JobDaemon extends MasterDaemon {
        JobDaemon() {
            super("JobDaemon", Config.cloud_warm_up_job_scheduler_interval_millisecond);
            LOG.info("start cloud warm up job daemon");
        }

        @Override
        public void runAfterCatalogReady() {
            if (cycleCount >= CYCLE_COUNT_TO_CHECK_EXPIRE_CLOUD_WARM_UP_JOB) {
                clearFinishedOrCancelCloudWarmUpJob();
                cycleCount = 0;
            }
            ++cycleCount;
            runCloudWarmUpJob();
        }
    }

    private void clearFinishedOrCancelCloudWarmUpJob() {
        Iterator<Entry<Long, CloudWarmUpJob>> iterator = runnableCloudWarmUpJobs.entrySet().iterator();
        while (iterator.hasNext()) {
            CloudWarmUpJob cloudWarmUpJob = iterator.next().getValue();
            if (cloudWarmUpJob.isDone()) {
                iterator.remove();
            }
        }
        Iterator<Map.Entry<Long, CloudWarmUpJob>> iterator2 = cloudWarmUpJobs.entrySet().iterator();
        while (iterator2.hasNext()) {
            CloudWarmUpJob cloudWarmUpJob = iterator2.next().getValue();
            if (cloudWarmUpJob.isExpire()) {
                cloudWarmUpJob.setJobState(JobState.DELETED);
                Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(cloudWarmUpJob);
                iterator2.remove();
                LOG.info("remove expired cloud warm up job {}. finish at {}",
                        cloudWarmUpJob.getJobId(), TimeUtils.longToTimeString(cloudWarmUpJob.getFinishedTimeMs()));
            }
        }
    }

    public Map<Long, CloudWarmUpJob> getCloudWarmUpJobs() {
        return this.cloudWarmUpJobs;
    }

    public Set<String> getRunnableClusterSet() {
        return this.runnableClusterSet;
    }

    public List<List<String>> getAllJobInfos(int limit) {
        List<List<String>> infos = Lists.newArrayList();
        Collection<CloudWarmUpJob> allJobs = cloudWarmUpJobs.values();
        allJobs.stream().sorted(Comparator.comparing(CloudWarmUpJob::getCreateTimeMs).reversed())
                        .limit(limit).forEach(t -> {
                            infos.add(t.getJobInfo());
                        });
        return infos;
    }

    public void addCloudWarmUpJob(CloudWarmUpJob job) {
        cloudWarmUpJobs.put(job.getJobId(), job);
        LOG.info("add cloud warm up job {}", job.getJobId());
        runnableCloudWarmUpJobs.put(job.getJobId(), job);
        if (!job.isDone()) {
            runnableClusterSet.add(job.getCloudClusterName());
        }
    }

    public List<Partition> getPartitionsFromTriple(Triple<String, String, String> tableTriple) {
        String dbName = tableTriple.getLeft();
        String tableName = tableTriple.getMiddle();
        String partitionName = tableTriple.getRight();
        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
        OlapTable table = (OlapTable) db.getTableNullable(tableName);
        List<Partition> partitions = new ArrayList<>();
        if (partitionName.length() != 0) {
            partitions.add(table.getPartition(partitionName));
        } else {
            partitions.addAll(table.getPartitions());
        }
        return partitions;
    }

    public List<Backend> getBackendsFromCluster(String dstClusterName) {
        return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
        .getBackendsByClusterName(dstClusterName);
    }

    public Set<Long> getTabletIdsFromBe(long beId) {
        return ((CloudEnv) Env.getCurrentEnv())
                                        .getCloudTabletRebalancer()
                                        .getSnapshotTabletsInPrimaryByBeId(beId);
    }

    public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> indexes) {
        List<Tablet> tablets = new ArrayList<>();
        for (MaterializedIndex index : indexes) {
            tablets.addAll(index.getTablets());
        }
        return tablets;
    }

    public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String dstClusterName,
            List<Triple<String, String, String>> tables,
            boolean isForce) throws RuntimeException {
        Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
        Long totalFileCache = getFileCacheCapacity(dstClusterName);
        Long warmUpTotalFileCache = 0L;
        LOG.info("Start warm up job {}, cluster {}, total cache size: {}",
                jobId, dstClusterName, totalFileCache);
        for (Triple<String, String, String> tableTriple : tables) {
            if (warmUpTotalFileCache > totalFileCache) {
                LOG.info("Warm up size {} exceeds total cache size {}, breaking loop",
                        warmUpTotalFileCache, totalFileCache);
                break;
            }

            List<Partition> partitions = getPartitionsFromTriple(tableTriple);
            LOG.info("Got {} partitions for table {}.{}.{}", partitions.size(),
                    tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
            List<Backend> backends = getBackendsFromCluster(dstClusterName);
            LOG.info("Got {} backends for cluster {}", backends.size(), dstClusterName);
            List<Partition> warmUpPartitions = new ArrayList<>();
            for (Partition partition : partitions) {
                Long partitionSize = partition.getDataSize(true);
                warmUpTotalFileCache += partitionSize;
                warmUpPartitions.add(partition);
                if (warmUpTotalFileCache > totalFileCache) {
                    LOG.info("Warm up size {} exceeds total cache size {}, current partition size {}",
                            warmUpTotalFileCache, totalFileCache, partitionSize);
                    break;
                }
            }
            List<MaterializedIndex> indexes = new ArrayList<>();
            for (Partition partition : warmUpPartitions) {
                indexes.addAll(partition.getMaterializedIndices(IndexExtState.VISIBLE));
            }
            LOG.info("Got {} materialized indexes for table {}.{}.{}", indexes.size(),
                    tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
            List<Tablet> tablets = getTabletsFromIndexs(indexes);
            LOG.info("Got {} tablets for table {}.{}.{}", tablets.size(),
                    tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
            for (Backend backend : backends) {
                Set<Long> beTabletIds = getTabletIdsFromBe(backend.getId());
                List<Tablet> warmUpTablets = new ArrayList<>();
                for (Tablet tablet : tablets) {
                    if (beTabletIds.contains(tablet.getId())) {
                        warmUpTablets.add(tablet);
                    }
                }
                LOG.info("Assigning {} tablets to backend {}", warmUpTablets.size(), backend.getId());
                beToWarmUpTablets.computeIfAbsent(backend.getId(),
                        k -> new ArrayList<>()).addAll(warmUpTablets);
            }
        }
        LOG.info("The job {} warm up size is {}, the cluster cache size is {}",
                    jobId, warmUpTotalFileCache, totalFileCache);
        if (warmUpTotalFileCache > totalFileCache && !isForce) {
            throw new RuntimeException("The cluster " + dstClusterName + " cache size is not enough");
        }
        return beToWarmUpTablets;
    }

    public long createJob(WarmUpClusterStmt stmt) throws AnalysisException {
        if (runnableClusterSet.contains(stmt.getDstClusterName())) {
            throw new AnalysisException("cluster: " + stmt.getDstClusterName() + " already has a runnable job");
        }
        Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
        long jobId = Env.getCurrentEnv().getNextId();
        if (!FeConstants.runningUnitTest) {
            if (stmt.isWarmUpWithTable()) {
                beToWarmUpTablets = warmUpNewClusterByTable(jobId, stmt.getDstClusterName(), stmt.getTables(),
                                                            stmt.isForce());
            } else {
                beToWarmUpTablets = warmUpNewClusterByCluster(stmt.getDstClusterName(), stmt.getSrcClusterName());
            }
        }

        Map<Long, List<List<Long>>> beToTabletIdBatches = splitBatch(beToWarmUpTablets);

        CloudWarmUpJob.JobType jobType = stmt.isWarmUpWithTable() ? JobType.TABLE : JobType.CLUSTER;
        CloudWarmUpJob warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstClusterName(), beToTabletIdBatches, jobType);
        addCloudWarmUpJob(warmUpJob);

        Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(warmUpJob);
        LOG.info("finished to create cloud warm up job: {}", warmUpJob.getJobId());

        return jobId;

    }

    public void cancel(CancelCloudWarmUpStmt stmt) throws DdlException {
        CloudWarmUpJob job = cloudWarmUpJobs.get(stmt.getJobId());
        if (job == null) {
            throw new DdlException("job id: " + stmt.getJobId() + " does not exist.");
        }
        if (!job.cancel("user cancel")) {
            throw new DdlException("job can not be cancelled. State: " + job.getJobState());
        }
    }

    private void runCloudWarmUpJob() {
        runnableCloudWarmUpJobs.values().forEach(cloudWarmUpJob -> {
            if (!cloudWarmUpJob.isDone() && !activeCloudWarmUpJobs.containsKey(cloudWarmUpJob.getJobId())
                    && activeCloudWarmUpJobs.size() < Config.max_active_cloud_warm_up_job) {
                if (FeConstants.runningUnitTest) {
                    cloudWarmUpJob.run();
                } else {
                    cloudWarmUpThreadPool.submit(() -> {
                        if (activeCloudWarmUpJobs.putIfAbsent(cloudWarmUpJob.getJobId(), cloudWarmUpJob) == null) {
                            try {
                                cloudWarmUpJob.run();
                            } finally {
                                activeCloudWarmUpJobs.remove(cloudWarmUpJob.getJobId());
                            }
                        }
                    });
                }
            }
        });
    }

    public void replayCloudWarmUpJob(CloudWarmUpJob cloudWarmUpJob) throws Exception {
        // ATTN: not need to replay, just override the job with the same job id.
        runnableCloudWarmUpJobs.put(cloudWarmUpJob.getJobId(), cloudWarmUpJob);
        cloudWarmUpJobs.put(cloudWarmUpJob.getJobId(), cloudWarmUpJob);
        LOG.info("replay cloud warm up job {}, state {}", cloudWarmUpJob.getJobId(), cloudWarmUpJob.getJobState());
        if (cloudWarmUpJob.isDone()) {
            runnableClusterSet.remove(cloudWarmUpJob.getCloudClusterName());
        } else {
            runnableClusterSet.add(cloudWarmUpJob.getCloudClusterName());
        }
        if (cloudWarmUpJob.jobState == JobState.DELETED) {
            if (cloudWarmUpJobs.remove(cloudWarmUpJob.getJobId()) != null
                    && runnableCloudWarmUpJobs.remove(cloudWarmUpJob.getJobId()) != null) {
                LOG.info("replay removing expired cloud warm up job {}.", cloudWarmUpJob.getJobId());
            } else {
                // should not happen, but it does no matter, just add a warn log here to observe
                LOG.warn("failed to find cloud warm up job {} when replay removing expired job.",
                            cloudWarmUpJob.getJobId());
            }
        }
    }

}