CacheHotspotManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.CloudWarmUpJob.JobState;
import org.apache.doris.cloud.CloudWarmUpJob.JobType;
import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Triple;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.trees.plans.commands.CancelWarmUpJobCommand;
import org.apache.doris.nereids.trees.plans.commands.WarmUpClusterCommand;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
import org.apache.doris.thrift.THotPartition;
import org.apache.doris.thrift.THotTableMessage;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class CacheHotspotManager extends MasterDaemon {
    public static final int MAX_SHOW_ENTRIES = 2000;
    private static final Logger LOG = LogManager.getLogger(CacheHotspotManager.class);
    private static final int CYCLE_COUNT_TO_CHECK_EXPIRE_CLOUD_WARM_UP_JOB = 20;
    private final CloudSystemInfoService nodeMgr;

    // periodically clear and re-build <id, table> message for
    // efficiency and memory consumption issue
    private Map<Long, Table> idToTable = new HashMap<>();

    private boolean tableCreated = false;

    private List<String> insertValueBatches = new ArrayList<String>();

    private int cycleCount = 0;

    private MasterDaemon jobDaemon;

    private boolean startJobDaemon = false;

    private MasterDaemon tableFilterRefreshDaemon;

    private boolean startTableFilterRefreshDaemon = false;

    private MasterDaemon warmUpSyncStatsRefreshDaemon;

    private boolean startWarmUpSyncStatsRefreshDaemon = false;

    // Thread pool for concurrent BE HTTP requests during on-demand stats collection
    private final ExecutorService warmupStatsExecutor = Executors.newFixedThreadPool(16,
            new ThreadFactoryBuilder().setNameFormat("warmup-stats-collector-%d").setDaemon(true).build());

    private ConcurrentMap<Long, CloudWarmUpJob> cloudWarmUpJobs = Maps.newConcurrentMap();

    private ConcurrentMap<Long, CloudWarmUpJob> activeCloudWarmUpJobs = Maps.newConcurrentMap();

    private ConcurrentMap<Long, CloudWarmUpJob> runnableCloudWarmUpJobs = Maps.newConcurrentMap();

    private final ThreadPoolExecutor cloudWarmUpThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(
            Config.max_active_cloud_warm_up_job, "cloud-warm-up-pool", true);

    private static class JobKey {
        private final String srcName;
        private final String dstName;
        private final CloudWarmUpJob.SyncMode syncMode;
        private final String tableFilterExpr;

        public JobKey(String srcName, String dstName, CloudWarmUpJob.SyncMode syncMode) {
            this(srcName, dstName, syncMode, "");
        }

        public JobKey(String srcName, String dstName, CloudWarmUpJob.SyncMode syncMode, String tableFilterExpr) {
            this.srcName = srcName;
            this.dstName = dstName;
            this.syncMode = syncMode;
            this.tableFilterExpr = tableFilterExpr == null ? "" : tableFilterExpr;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof JobKey)) {
                return false;
            }
            JobKey jobKey = (JobKey) o;
            return Objects.equals(srcName, jobKey.srcName)
                    && Objects.equals(dstName, jobKey.dstName)
                    && syncMode == jobKey.syncMode
                    && Objects.equals(tableFilterExpr, jobKey.tableFilterExpr);
        }

        @Override
        public int hashCode() {
            return Objects.hash(srcName, dstName, syncMode, tableFilterExpr);
        }

        @Override
        public String toString() {
            String s = "WarmUpJob src='" + srcName + "', dst='" + dstName + "', syncMode=" + String.valueOf(syncMode);
            if (!tableFilterExpr.isEmpty()) {
                s += ", tableFilter=" + tableFilterExpr;
            }
            return s;
        }
    }

    // Tracks long-running jobs (event-driven and periodic).
    // Ensures only one active job exists per <source, destination, sync_mode> tuple.
    private Set<JobKey> repeatJobDetectionSet = ConcurrentHashMap.newKeySet();

    private void registerJobForRepeatDetection(CloudWarmUpJob job, boolean replay) throws AnalysisException {
        if (job.isDone()) {
            return;
        }
        if (!replay) {
            checkLoadEventWarmUpConflict(job);
        }
        if (job.isEventDriven() || job.isPeriodic()) {
            JobKey key = new JobKey(job.getSrcClusterName(), job.getDstClusterName(),
                    job.getSyncMode(), job.getTableFilterExpr());
            boolean added = this.repeatJobDetectionSet.add(key);
            if (!added && !replay) {
                throw new AnalysisException(key + " already has a runnable job");
            }
        }
    }

    // Only checks cross-type conflicts between table-level and cluster-level load-event warm-up jobs.
    // Same-type duplicate jobs are still rejected later by repeatJobDetectionSet.
    private void checkLoadEventWarmUpConflict(CloudWarmUpJob newJob) throws AnalysisException {
        if (!isLoadEventWarmUpJob(newJob)) {
            return;
        }

        for (CloudWarmUpJob existingJob : runnableCloudWarmUpJobs.values()) {
            if (existingJob.getJobId() == newJob.getJobId() || existingJob.isDone()
                    || !isLoadEventWarmUpJob(existingJob)) {
                continue;
            }
            if (!isSameWarmUpPair(newJob, existingJob)) {
                continue;
            }
            if (isTableLevelLoadEventWarmUpJob(newJob) != isTableLevelLoadEventWarmUpJob(existingJob)) {
                throw buildLoadEventWarmUpConflictException(newJob, existingJob);
            }
        }
    }

    public void cancelTableLevelLoadEventWarmUpJobsForVirtualComputeGroup(
            String virtualComputeGroupName, String activeComputeGroup, String standbyComputeGroup,
            List<String> subComputeGroups, String reason) throws AnalysisException {
        String cancelReason = reason + " for virtual compute group '" + virtualComputeGroupName + "'";
        Set<String> computeGroupsInVcg = new HashSet<>();
        if (subComputeGroups != null) {
            computeGroupsInVcg.addAll(subComputeGroups);
        }
        computeGroupsInVcg.add(activeComputeGroup);
        computeGroupsInVcg.add(standbyComputeGroup);

        for (CloudWarmUpJob existingJob : runnableCloudWarmUpJobs.values()) {
            if (existingJob.isDone() || !isTableLevelLoadEventWarmUpJob(existingJob)) {
                continue;
            }
            if (!computeGroupsInVcg.contains(existingJob.getSrcClusterName())
                    || !computeGroupsInVcg.contains(existingJob.getDstClusterName())) {
                continue;
            }
            try {
                cancel(existingJob.getJobId(), cancelReason);
                LOG.info("cancel table-level load-event warm up job {} before virtual compute group '{}' creates "
                                + "cluster-level load-event warm up job. active compute group {}, "
                                + "standby compute group {}, source compute group {}, destination compute group {}{}, "
                                + "reason: {}",
                        existingJob.getJobId(), virtualComputeGroupName, activeComputeGroup, standbyComputeGroup,
                        existingJob.getSrcClusterName(), existingJob.getDstClusterName(),
                        formatExistingTableFilter(existingJob), cancelReason);
            } catch (DdlException e) {
                throw new AnalysisException("Failed to cancel table-level load-event warm up job "
                        + existingJob.getJobId() + " before virtual compute group '" + virtualComputeGroupName
                        + "' creates cluster-level load-event warm up job from active compute group '"
                        + activeComputeGroup + "' to standby compute group '" + standbyComputeGroup
                        + "'. Source compute group '" + existingJob.getSrcClusterName()
                        + "', destination compute group '" + existingJob.getDstClusterName() + "'"
                        + formatExistingTableFilter(existingJob) + ". Cancel table-level load-event warm up job "
                        + existingJob.getJobId() + " before retrying.", e);
            }
        }
    }

    private static boolean isLoadEventWarmUpJob(CloudWarmUpJob job) {
        return job != null && job.isEventDriven() && job.getSyncEvent() == CloudWarmUpJob.SyncEvent.LOAD;
    }

    private static boolean isClusterLevelLoadEventWarmUpJob(CloudWarmUpJob job) {
        return isLoadEventWarmUpJob(job) && job.getJobType() == JobType.CLUSTER;
    }

    private static boolean isTableLevelLoadEventWarmUpJob(CloudWarmUpJob job) {
        return isLoadEventWarmUpJob(job) && job.getJobType() == JobType.TABLES;
    }

    private static boolean isSameWarmUpPair(CloudWarmUpJob left, CloudWarmUpJob right) {
        return Objects.equals(left.getSrcClusterName(), right.getSrcClusterName())
                && Objects.equals(left.getDstClusterName(), right.getDstClusterName());
    }

    private static AnalysisException buildLoadEventWarmUpConflictException(
            CloudWarmUpJob newJob, CloudWarmUpJob existingJob) {
        String newJobLevel = isTableLevelLoadEventWarmUpJob(newJob) ? "table-level" : "cluster-level";
        String existingJobLevel = isClusterLevelLoadEventWarmUpJob(existingJob) ? "cluster-level" : "table-level";
        return new AnalysisException("Cannot create " + newJobLevel + " load-event warm up job from source "
                + "compute group '" + newJob.getSrcClusterName() + "' to destination compute group '"
                + newJob.getDstClusterName() + "': conflicting " + existingJobLevel
                + " load-event warm up job " + existingJob.getJobId()
                + " already exists for the same source and destination"
                + formatExistingTableFilter(existingJob)
                + ". Cancel existing load-event warm up job " + existingJob.getJobId()
                + " before creating this job.");
    }

    private static String formatExistingTableFilter(CloudWarmUpJob job) {
        if (!job.hasTableFilter()) {
            return "";
        }
        return " with table filter [" + job.getTableFilterExpr() + "]";
    }

    // Tracks warm-up jobs scheduled by CacheHotSpotManager.
    // Ensures that at most one job runs concurrently per destination cluster.
    private Map<String, Long> clusterToRunningJobId = new ConcurrentHashMap<>();

    /**
     * Attempts to register a job as running for the given destination cluster.
     * <p>
     * For one-time or periodic jobs, returns {@code false} if there is already a running job
     * for the specified destination cluster. Returns {@code true} if this job is successfully
     * registered as the only running job for that cluster.
     * <p>
     * For event-driven jobs, this method does not perform any registration and always returns {@code true}.
     *
     * @param job the CloudWarmUpJob to register
     * @return {@code true} if the job was registered successfully or is event-driven; {@code false} otherwise
     */
    public boolean tryRegisterRunningJob(CloudWarmUpJob job) {
        if (job.isEventDriven()) {
            // Event-driven jobs do not require registration, always allow
            return true;
        }

        String clusterName = job.getDstClusterName();
        long jobId = job.getJobId();

        // Try to register the job atomically if absent
        Long existingJobId = clusterToRunningJobId.putIfAbsent(clusterName, jobId);
        boolean success = (existingJobId == null) || (existingJobId == jobId);
        if (!success) {
            LOG.info("Job {} skipped: waiting for job {} to finish on destination cluster {}",
                    jobId, existingJobId, clusterName);
        }
        return success;
    }

    /**
     * Deregisters the given job from the running jobs map, allowing another job
     * to run on the same destination cluster.
     * <p>
     * For event-driven jobs, this method does nothing and always returns {@code true}
     * since they are not registered.
     * <p>
     * This method only removes the job if the currently registered job ID matches
     * the job's ID, ensuring no accidental deregistration of other jobs.
     *
     * @param job the CloudWarmUpJob to deregister
     * @return {@code true} if the job was successfully deregistered or is event-driven; {@code false} otherwise
     */
    private boolean deregisterRunningJob(CloudWarmUpJob job) {
        if (job.isEventDriven()) {
            // Event-driven jobs are not registered, so nothing to deregister
            return true;
        }

        String clusterName = job.getDstClusterName();
        long jobId = job.getJobId();

        return clusterToRunningJobId.remove(clusterName, jobId);
    }

    public void notifyJobStop(CloudWarmUpJob job) {
        if (job.isOnce() || job.isPeriodic()) {
            this.deregisterRunningJob(job);
        }
        if (!job.isDone()) {
            return;
        }
        if (job.isEventDriven() || job.isPeriodic()) {
            this.repeatJobDetectionSet.remove(new JobKey(
                    job.getSrcClusterName(), job.getDstClusterName(),
                    job.getSyncMode(), job.getTableFilterExpr()));
        }
    }

    public CacheHotspotManager(CloudSystemInfoService nodeMgr) {
        super("CacheHotspotManager", Config.fetch_cluster_cache_hotspot_interval_ms);
        this.nodeMgr = nodeMgr;
    }

    @Override
    public void runAfterCatalogReady() {
        if (!startJobDaemon) {
            jobDaemon = new JobDaemon();
            jobDaemon.start();
            startJobDaemon = true;
        }
        if (!startTableFilterRefreshDaemon) {
            tableFilterRefreshDaemon = new TableFilterRefreshDaemon();
            tableFilterRefreshDaemon.start();
            startTableFilterRefreshDaemon = true;
        }
        if (Config.isCloudMode() && !startWarmUpSyncStatsRefreshDaemon) {
            warmUpSyncStatsRefreshDaemon = new WarmUpSyncStatsRefreshDaemon();
            warmUpSyncStatsRefreshDaemon.start();
            startWarmUpSyncStatsRefreshDaemon = true;
        }


        if (!tableCreated) {
            try {
                CacheHotspotManagerUtils.execCreateCacheTable();
                tableCreated = true;
                this.intervalMs = Config.fetch_cluster_cache_hotspot_interval_ms;
            } catch (Exception e) {
                // sleep 60s wait for syncing storage vault info from ms and retry
                this.intervalMs = 60000;
                LOG.warn("Create cache hot spot table failed, sleep 60s and retry", e);
                return;
            }
        }
        traverseAllDatabaseForTable();
        // it's thread safe to iterate through this concurrent map's ref
        nodeMgr.getCloudClusterIdToBackend(false).entrySet().forEach(clusterToBeList -> {
            List<Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>> futureList
                    = new ArrayList<>();
            clusterToBeList.getValue().forEach(backend -> {
                try {
                    futureList.add(getTopNHotPartitionsAsync(backend));
                } catch (TException | RpcException e) {
                    LOG.warn("send getTopNHotPartitionsAsync to be {} failed due to {}", backend, e);
                }
            });

            List<Pair<TGetTopNHotPartitionsResponse, Backend>> responseList = fetchOneClusterHotSpot(futureList);
            responseList.forEach((Pair<TGetTopNHotPartitionsResponse, Backend> respPair) -> {
                TGetTopNHotPartitionsResponse resp = respPair.first;
                if (resp.isSetHotTables()) {
                    resp.getHotTables().forEach((THotTableMessage hotTable) -> {
                        if (hotTable.isSetHotPartitions()) {
                            hotTable.hot_partitions.forEach((THotPartition partition) -> {
                                insertIntoTable(clusterToBeList.getKey(), hotTable.table_id,
                                        hotTable.index_id, resp.file_cache_size, partition, respPair.second);
                            });
                        }
                    });
                }
            });
        });
        triggerBatchInsert();
        idToTable.clear();
    }

    public boolean containsCluster(String clusterName) {
        return CacheHotspotManagerUtils.clusterContains(nodeMgr.getCloudClusterIdByName(clusterName));
    }

    // table_id table_name, index_id, partition_id
    public List<List<String>> getClusterTopNHotPartitions(String clusterName) {
        LOG.debug("getClusterTopNHotPartitions called with clusterName={}", clusterName);
        return CacheHotspotManagerUtils.getClusterTopNPartitions(nodeMgr.getCloudClusterIdByName(clusterName));
    }

    /**
     * traverse all database to cache all tableId -> table
     */
    private void traverseAllDatabaseForTable() {
        // dbs are stored in one concurrent hash map in catalog
        // return one list of snapshot, the item might be deleted
        // but java guarantee it will not be erased from memory
        Env.getCurrentInternalCatalog().getDbs().forEach(database -> {
            try {
                database.readLock();
                // database already dropped
                if (database.getDbState() != Database.DbState.NORMAL) {
                    return;
                }
                // it's thread safe to merge one concurrent map
                idToTable.putAll(database.getIdToTableRef());
            } finally {
                database.readUnlock();
            }
        });
    }

    private void triggerBatchInsert() {
        try {
            CacheHotspotManagerUtils.doBatchInsert(insertValueBatches);
        } catch (Exception e) {
            LOG.warn("Failed to insert into file cache hotspot table due to ", e);
        } finally {
            insertValueBatches.clear();
        }
    }

    private void refreshWarmUpSyncStats() {
        if (!Env.getCurrentEnv().isMaster()) {
            MetricRepo.syncCloudWarmUpSyncJobMetricDefinitions(Collections.emptyList());
            return;
        }

        Map<Long, JobWarmUpStats> statsMap = collectAndAggregate();
        for (CloudWarmUpJob job : cloudWarmUpJobs.values()) {
            JobWarmUpStats stats = job.isEventDriven() && !job.isDone() ? statsMap.get(job.getJobId()) : null;
            job.setSyncStats(stats);
        }
        MetricRepo.syncCloudWarmUpSyncJobMetricDefinitions(cloudWarmUpJobs.values());
    }

    private void insertIntoTable(String clusterId, long tableId, long indexId, long fileCacheSize,
            THotPartition partition, Backend backend) {
        LOG.info("table id {}, index id {}, partition id {}", tableId, indexId, partition.partition_id);
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
        DateTimeFormatter dateformatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        String formattedDateTime = now.format(formatter);
        String insertDay = now.format(dateformatter);

        Map<String, String> params = new HashMap<>();
        params.put("cluster_id", clusterId);
        params.put("cluster_name", nodeMgr.getClusterNameByClusterId(clusterId));
        params.put("backend_id", String.valueOf(backend.getId()));
        params.put("creation_time", formattedDateTime);
        params.put("file_cache_size", String.valueOf(fileCacheSize));
        params.put("insert_day", insertDay);
        Table t;
        if (!idToTable.containsKey(tableId)) {
            return;
        }
        // it might be null?
        t = idToTable.get(tableId);
        params.put("table_id", String.valueOf(tableId));
        params.put("table_name", String.format("%s.%s", t.getDBName(), t.getName()));
        OlapTable olapTable = (OlapTable) t;
        params.put("index_name", String.valueOf(olapTable.getIndexNameById(indexId)));
        params.put("index_id", String.valueOf(indexId));
        Optional<Partition> op = t.getPartitionNames().stream().map(t::getPartition)
                                .filter(p -> p.getId() == partition.partition_id).findAny();
        if (!op.isPresent()) {
            LOG.warn("partition id {} is invalid", partition.partition_id);
            return;
        }
        params.put("partition_name", op.get().getName());
        params.put("partition_id", String.valueOf(partition.partition_id));
        LOG.info("has qpd {}, has qpw {}", partition.isSetQueryPerDay(), partition.isSetQueryPerWeek());
        if (partition.isSetQueryPerDay()) {
            params.put("qpd", String.valueOf(partition.getQueryPerDay()));
        } else {
            params.put("qpd", "0");
        }
        if (partition.isSetQueryPerWeek()) {
            params.put("qpw", String.valueOf(partition.getQueryPerWeek()));
        } else {
            params.put("qpw", "0");
        }
        // Doris's datetime v2 doesn't support time zone
        LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(partition.last_access_time),
                ZoneId.systemDefault());
        params.put("last_access_time", localDateTime.format(formatter));

        CacheHotspotManagerUtils.transformIntoCacheHotSpotTableValue(params, insertValueBatches);
        if (insertValueBatches.size() == Config.batch_insert_cluster_cache_hotspot_num) {
            triggerBatchInsert();
        }
    }

    private Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>
            getTopNHotPartitionsAsync(Backend be) throws TException, RpcException, RuntimeException {
        CompletableFuture<TGetTopNHotPartitionsResponse> f = CompletableFuture.supplyAsync(() -> {
            boolean ok = false;
            BackendService.Client client = null;
            TNetworkAddress address = null;
            try {
                address = new TNetworkAddress(be.getHost(), be.getBePort());
                client = ClientPool.backendPool.borrowObject(address);
                TGetTopNHotPartitionsResponse resp = client.getTopNHotPartitions(
                    new TGetTopNHotPartitionsRequest());
                ok = true;
                return resp;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (ok) {
                    ClientPool.backendPool.returnObject(address, client);
                } else {
                    ClientPool.backendPool.invalidateObject(address, client);
                }
            }
        });
        return Pair.of(f, be);
    }

    // the message we need:
    // cluster_id varchar,
    // cluster_name varchar,
    // backend_id bigint,
    // creation_time DATETIMEV2,
    // file_cache_size bigint,
    // table_name varchar,
    // partition_name varchar,
    // last_access_time DATETIMEV2
    private List<Pair<TGetTopNHotPartitionsResponse, Backend>> fetchOneClusterHotSpot(
            List<Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>> futureList) {
        List<Pair<TGetTopNHotPartitionsResponse, Backend>> responseList = new ArrayList<>();
        long timeoutMs = Math.min(5000, Config.remote_fragment_exec_timeout_ms);
        futureList.forEach(futureBackendPair -> {
            TStatusCode code = TStatusCode.OK;
            String errMsg = null;
            Exception exception = null;
            Future<TGetTopNHotPartitionsResponse> f = futureBackendPair.key();
            try {
                // temporary value, would change to config
                TGetTopNHotPartitionsResponse result = f.get(timeoutMs, TimeUnit.MILLISECONDS);
                responseList.add(Pair.of(result, futureBackendPair.second));
            } catch (ExecutionException e) {
                exception = e;
                code = TStatusCode.THRIFT_RPC_ERROR;
            } catch (InterruptedException e) {
                exception = e;
                code = TStatusCode.INTERNAL_ERROR;
            } catch (TimeoutException e) {
                exception = e;
                errMsg = "timeout when waiting for fetch cache hotspot RPC. Wait(sec): " + timeoutMs / 1000;
                code = TStatusCode.TIMEOUT;
            }
            if (code != TStatusCode.OK) {
                LOG.warn("Fetch be {}'s cache hotspot information throw {}, errmsg {}",
                        futureBackendPair.second.getAddress(), exception, errMsg);
            }
        });
        return responseList;
    }

    Long getFileCacheCapacity(String clusterName) throws RuntimeException {
        List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                                        .getBackendsByClusterName(clusterName);
        Long totalFileCache = 0L;
        for (Backend backend : backends) {
            Long fileCacheSize = 0L;
            boolean ok = false;
            BackendService.Client client = null;
            TNetworkAddress address = null;
            try {
                address = new TNetworkAddress(backend.getHost(), backend.getBePort());
                client = ClientPool.backendPool.borrowObject(address);
                TGetTopNHotPartitionsResponse resp = client.getTopNHotPartitions(
                        new TGetTopNHotPartitionsRequest());
                fileCacheSize = resp.file_cache_size;
                ok = true;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (ok) {
                    ClientPool.backendPool.returnObject(address, client);
                } else {
                    ClientPool.backendPool.invalidateObject(address, client);
                }
            }
            totalFileCache += fileCacheSize;
        }
        return totalFileCache;
    }

    public Map<Long, List<List<Long>>> splitBatch(Map<Long, List<Tablet>> beToWarmUpTablets) {
        final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch;
        Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
        for (Map.Entry<Long, List<Tablet>> entry : beToWarmUpTablets.entrySet()) {
            List<List<Long>> batches = new ArrayList<>();
            List<Long> batch = new ArrayList<>();
            long curBatchSize = 0L;
            for (Tablet tablet : entry.getValue()) {
                if (curBatchSize + tablet.getDataSize(true, false) > maxSizePerBatch) {
                    batches.add(batch);
                    batch = new ArrayList<>();
                    curBatchSize = 0L;
                }
                batch.add(tablet.getId());
                curBatchSize += tablet.getDataSize(true, false);
            }
            if (!batch.isEmpty()) {
                batches.add(batch);
            }
            beToTabletIdBatches.put(entry.getKey(), batches);
        }
        return beToTabletIdBatches;
    }

    private List<Tablet> getHotTablets(String srcClusterName, String dstClusterName) {
        Long dstTotalFileCache = getFileCacheCapacity(dstClusterName);
        List<List<String>> result = getClusterTopNHotPartitions(srcClusterName);
        Long warmUpTabletsSize = 0L;
        List<Tablet> tablets = new ArrayList<>();
        for (List<String> line : result) {
            Long tableId = Long.parseLong(line.get(0));
            String[] tmp = line.get(1).split("\\.");
            String dbName = tmp[0];
            Long partitionId = Long.parseLong(line.get(2));
            Long indexId = Long.parseLong(line.get(3));
            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
            if (db == null) {
                continue;
            }
            OlapTable table = (OlapTable) db.getTableNullable(tableId);
            if (table == null) {
                continue;
            }
            Partition partition = table.getPartition(partitionId);
            if (partition == null) {
                continue;
            }
            MaterializedIndex index = partition.getIndex(indexId);
            if (index == null) {
                continue;
            }
            for (Tablet tablet : index.getTablets()) {
                warmUpTabletsSize += tablet.getDataSize(true, false);
                tablets.add(tablet);
                if (warmUpTabletsSize >= dstTotalFileCache) {
                    break;
                }
            }
            if (warmUpTabletsSize >= dstTotalFileCache) {
                break;
            }
        }
        Collections.reverse(tablets);
        return tablets;
    }

    private List<Tablet> getAllTablets(String srcClusterName, String dstClusterName) {
        List<Tablet> tablets = new ArrayList<>();
        List<Database> dbs = Env.getCurrentInternalCatalog().getDbs();
        for (Database db : dbs) {
            List<Table> tables = db.getTables();
            for (Table table : tables) {
                if (!(table instanceof OlapTable)) {
                    continue;
                }
                OlapTable olapTable = (OlapTable) table;
                for (Partition partition : olapTable.getPartitions()) {
                    // Maybe IndexExtState.ALL
                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
                        for (Tablet tablet : index.getTablets()) {
                            tablets.add(tablet);
                        }
                    }
                }
            }
        }
        return tablets;
    }

    public Map<Long, List<Tablet>> warmUpNewClusterByCluster(String dstClusterName, String srcClusterName) {
        List<Tablet> tablets;
        if (Config.cloud_warm_up_force_all_partitions) {
            tablets = getAllTablets(srcClusterName, dstClusterName);
        } else {
            tablets = getHotTablets(srcClusterName, dstClusterName);
        }
        List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                                        .getBackendsByClusterName(dstClusterName);
        Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
        for (Backend backend : backends) {
            Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
                                    .getCloudTabletRebalancer()
                                    .getSnapshotTabletsInPrimaryByBeId(backend.getId());
            List<Tablet> warmUpTablets = new ArrayList<>();
            for (Tablet tablet : tablets) {
                if (beTabletIds.contains(tablet.getId())) {
                    warmUpTablets.add(tablet);
                }
            }
            beToWarmUpTablets.put(backend.getId(), warmUpTablets);
        }
        return beToWarmUpTablets;
    }

    public List<List<String>> getSingleJobInfo(long jobId) throws AnalysisException {
        List<List<String>> infos = new ArrayList<List<String>>();
        CloudWarmUpJob job = cloudWarmUpJobs.get(jobId);
        if (job == null) {
            throw new AnalysisException("cloud warm up with job " + jobId + " does not exist");
        }
        Map<Long, JobWarmUpStats> statsMap = collectAndAggregate();
        infos.add(job.getJobInfo(statsMap.get(jobId), true));
        return infos;
    }

    private class JobDaemon extends MasterDaemon {
        JobDaemon() {
            super("JobDaemon", Config.cloud_warm_up_job_scheduler_interval_millisecond);
            LOG.info("start cloud warm up job daemon");
        }

        @Override
        public void runAfterCatalogReady() {
            if (cycleCount >= CYCLE_COUNT_TO_CHECK_EXPIRE_CLOUD_WARM_UP_JOB) {
                clearFinishedOrCancelCloudWarmUpJob();
                cycleCount = 0;
            }
            ++cycleCount;
            runCloudWarmUpJob();
        }
    }

    private class TableFilterRefreshDaemon extends MasterDaemon {
        TableFilterRefreshDaemon() {
            super("TableFilterRefreshDaemon", Config.cloud_warm_up_table_filter_refresh_interval_ms);
            LOG.info("start table filter refresh daemon, interval={}ms",
                    Config.cloud_warm_up_table_filter_refresh_interval_ms);
        }

        @Override
        public void runAfterCatalogReady() {
            if (getInterval() != Config.cloud_warm_up_table_filter_refresh_interval_ms) {
                setInterval(Config.cloud_warm_up_table_filter_refresh_interval_ms);
                LOG.info("update table filter refresh daemon interval to {}ms", getInterval());
            }
            refreshAllTableFilters();
        }
    }

    private class WarmUpSyncStatsRefreshDaemon extends MasterDaemon {
        WarmUpSyncStatsRefreshDaemon() {
            super("WarmUpSyncStatsRefreshDaemon", Config.cloud_warm_up_sync_stats_refresh_interval_ms);
            LOG.info("start warm up sync stats refresh daemon, interval={}ms",
                    Config.cloud_warm_up_sync_stats_refresh_interval_ms);
        }

        @Override
        public void runAfterCatalogReady() {
            if (getInterval() != Config.cloud_warm_up_sync_stats_refresh_interval_ms) {
                setInterval(Config.cloud_warm_up_sync_stats_refresh_interval_ms);
                LOG.info("update warm up sync stats refresh daemon interval to {}ms", getInterval());
            }
            refreshWarmUpSyncStats();
        }
    }


    /**
     * Collect warmup stats from all BEs on demand and aggregate per-job.
     * Called when SHOW WARM UP JOB is executed.
     *
     * @return per-job aggregated warmup stats; empty map if no event-driven jobs exist
     */
    private Map<Long, JobWarmUpStats> collectAndAggregate() {
        Map<Long, JobWarmUpStats> result = new HashMap<>();

        // 1. Collect all clusters involved in event-driven jobs
        Set<String> allClusters = new HashSet<>();
        for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
            if (job.isEventDriven()) {
                allClusters.add(job.getSrcClusterName());
                allClusters.add(job.getDstClusterName());
            }
        }
        if (allClusters.isEmpty()) {
            return result;
        }

        // 2. Enumerate all (cluster, BE) pairs
        List<Pair<String, Backend>> allTargets = new ArrayList<>();
        for (String cluster : allClusters) {
            for (Backend be : getBackendsFromCluster(cluster)) {
                if (be.isAlive()) {
                    allTargets.add(Pair.of(cluster, be));
                }
            }
        }
        if (allTargets.isEmpty()) {
            return result;
        }

        // 3. Concurrent HTTP requests to all BEs
        ExecutorCompletionService<Pair<String, String>> completionService =
                new ExecutorCompletionService<>(warmupStatsExecutor);

        // Acquire auth token once for all BE requests (needed when enable_all_http_auth is on)
        Map<String, String> authHeaders = new HashMap<>();
        try {
            String token = Env.getCurrentEnv().getTokenManager().acquireToken();
            authHeaders.put("Auth-Token", token);
        } catch (Exception e) {
            LOG.warn("Failed to acquire auth token for warmup stats collection, "
                    + "requests may fail if enable_all_http_auth is enabled: {}", e.getMessage());
        }

        for (Pair<String, Backend> target : allTargets) {
            String cluster = target.first;
            Backend be = target.second;
            completionService.submit(() -> {
                String url = "http://"
                        + NetUtils.getHostPortInAccessibleFormat(be.getHost(), be.getHttpPort())
                        + "/api/warmup_event_driven_stats";
                String json = HttpUtils.doGet(url, authHeaders, 5000);
                return Pair.of(cluster, json);
            });
        }

        // 4. Collect results and merge by cluster → jobId
        Map<String, Map<Long, TableWarmUpWindowedStats>> clusterStats = new HashMap<>();
        for (int i = 0; i < allTargets.size(); i++) {
            try {
                Future<Pair<String, String>> future = completionService.take();
                Pair<String, String> resultPair = future.get(10, TimeUnit.SECONDS);
                String cluster = resultPair.first;
                String json = resultPair.second;
                Map<Long, TableWarmUpWindowedStats> jobMap =
                        clusterStats.computeIfAbsent(cluster, k -> new HashMap<>());
                mergeStatsFromJson(jobMap, json);
            } catch (Exception e) {
                LOG.warn("Failed to collect warmup stats: {}", e.getMessage());
            }
        }

        // 5. Aggregate per-job
        for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
            if (!job.isEventDriven()) {
                continue;
            }
            JobWarmUpStats stats = aggregateStatsForJob(job, clusterStats);
            result.put(job.getJobId(), stats);
        }
        return result;
    }

    /**
     * Parse BE JSON response and merge into jobMap.
     * JSON structure: data[].{job_id, requested, finish, fail, ...}
     */
    private void mergeStatsFromJson(
            Map<Long, TableWarmUpWindowedStats> jobMap, String json) {
        try {
            JsonObject root = JsonParser.parseString(json).getAsJsonObject();
            JsonArray data = root.getAsJsonArray("data");
            if (data == null) {
                return;
            }
            for (JsonElement jobElem : data) {
                JsonObject jobObj = jobElem.getAsJsonObject();
                long jobId = jobObj.get("job_id").getAsLong();
                TableWarmUpWindowedStats stats = TableWarmUpWindowedStats.fromJson(jobObj);
                jobMap.compute(jobId, (id, existing) -> {
                    if (existing == null) {
                        return stats;
                    }
                    existing.merge(stats);
                    return existing;
                });
            }
        } catch (Exception e) {
            LOG.warn("Failed to parse warmup stats JSON: {}", e.getMessage());
        }
    }

    /**
     * Aggregate per-job stats: from srcCluster take requested, from dstCluster take finished.
     */
    @VisibleForTesting
    JobWarmUpStats aggregateStatsForJob(
            CloudWarmUpJob job,
            Map<String, Map<Long, TableWarmUpWindowedStats>> clusterStats) {
        JobWarmUpStats result = new JobWarmUpStats();
        long jobId = job.getJobId();
        String srcCluster = job.getSrcClusterName();
        String dstCluster = job.getDstClusterName();

        TableWarmUpWindowedStats srcStat = clusterStats
                .getOrDefault(srcCluster, Collections.emptyMap())
                .get(jobId);
        TableWarmUpWindowedStats dstStat = clusterStats
                .getOrDefault(dstCluster, Collections.emptyMap())
                .get(jobId);

        if (srcStat != null) {
            result.mergeRequested(srcStat);
        }
        if (dstStat != null) {
            // Target-side progress timestamp is a watermark, not an additive counter. The merge
            // keeps the minimum positive watermark across BEs so FE reports the slowest target
            // progress for trigger-gap calculation.
            result.mergeFinished(dstStat);
        }
        result.computeGap();
        return result;
    }


    private void clearFinishedOrCancelCloudWarmUpJob() {
        Iterator<Entry<Long, CloudWarmUpJob>> iterator = runnableCloudWarmUpJobs.entrySet().iterator();
        while (iterator.hasNext()) {
            CloudWarmUpJob cloudWarmUpJob = iterator.next().getValue();
            if (cloudWarmUpJob.isDone()) {
                iterator.remove();
            }
        }
        Iterator<Map.Entry<Long, CloudWarmUpJob>> iterator2 = cloudWarmUpJobs.entrySet().iterator();
        while (iterator2.hasNext()) {
            CloudWarmUpJob cloudWarmUpJob = iterator2.next().getValue();
            if (cloudWarmUpJob.isExpire()) {
                cloudWarmUpJob.setJobState(JobState.DELETED);
                Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(cloudWarmUpJob);
                iterator2.remove();
                LOG.info("remove expired cloud warm up job {}. finish at {}",
                        cloudWarmUpJob.getJobId(), TimeUtils.longToTimeString(cloudWarmUpJob.getFinishedTimeMs()));
            }
        }
    }

    public Map<Long, CloudWarmUpJob> getCloudWarmUpJobs() {
        return this.cloudWarmUpJobs;
    }

    public CloudWarmUpJob getCloudWarmUpJob(long jobId) {
        return this.cloudWarmUpJobs.get(jobId);
    }

    public List<List<String>> getAllJobInfos(int limit) {
        Map<Long, JobWarmUpStats> statsMap = collectAndAggregate();
        List<List<String>> infos = Lists.newArrayList();
        Collection<CloudWarmUpJob> allJobs = cloudWarmUpJobs.values();
        allJobs.stream().sorted(Comparator.comparing(CloudWarmUpJob::getCreateTimeMs).reversed())
                        .limit(limit).forEach(t -> {
                            infos.add(t.getJobInfo(statsMap.get(t.getJobId()), false));
                        });
        return infos;
    }

    public void addCloudWarmUpJob(CloudWarmUpJob job) throws AnalysisException {
        restoreTableFilterState(job);
        registerJobForRepeatDetection(job, false);
        cloudWarmUpJobs.put(job.getJobId(), job);
        LOG.info("add cloud warm up job {}", job.getJobId());
        runnableCloudWarmUpJobs.put(job.getJobId(), job);
    }

    private void restoreTableFilterState(CloudWarmUpJob job) {
        if (!job.hasTableFilter()) {
            return;
        }
        job.rebuildOnTablesFilter();
        Map<Long, String> tableIdNames = resolveTableIds(job.getOnTablesFilter());
        job.setCurrentTableIdNames(tableIdNames);
        logMatchedTables("restored table filter for job " + job.getJobId(), tableIdNames);
    }

    public List<Partition> getPartitionsFromTriple(Triple<String, String, String> tableTriple) {
        String dbName = tableTriple.getLeft();
        String tableName = tableTriple.getMiddle();
        String partitionName = tableTriple.getRight();
        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
        OlapTable table = (OlapTable) db.getTableNullable(tableName);
        List<Partition> partitions = new ArrayList<>();
        if (partitionName.length() != 0) {
            partitions.add(table.getPartition(partitionName));
        } else {
            partitions.addAll(table.getPartitions());
        }
        return partitions;
    }

    public List<Backend> getBackendsFromCluster(String dstClusterName) {
        return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
        .getBackendsByClusterName(dstClusterName);
    }

    public Set<Long> getTabletIdsFromBe(long beId) {
        return ((CloudEnv) Env.getCurrentEnv())
                                        .getCloudTabletRebalancer()
                                        .getSnapshotTabletsInPrimaryByBeId(beId);
    }

    public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> indexes) {
        List<Tablet> tablets = new ArrayList<>();
        for (MaterializedIndex index : indexes) {
            tablets.addAll(index.getTablets());
        }
        return tablets;
    }

    public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String dstClusterName,
            List<Triple<String, String, String>> tables,
            boolean isForce) throws RuntimeException {
        Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
        Long totalFileCache = getFileCacheCapacity(dstClusterName);
        Long warmUpTotalFileCache = 0L;
        LOG.info("Start warm up job {}, cluster {}, total cache size: {}",
                jobId, dstClusterName, totalFileCache);
        List<Backend> backends = getBackendsFromCluster(dstClusterName);
        LOG.info("Got {} backends for cluster {}", backends.size(), dstClusterName);
        Map<Long, Set<Long>> beToTabletIds = new HashMap<>();
        for (Backend backend : backends) {
            beToTabletIds.put(backend.getId(), getTabletIdsFromBe(backend.getId()));
        }
        for (Triple<String, String, String> tableTriple : tables) {
            if (warmUpTotalFileCache > totalFileCache) {
                LOG.info("Warm up size {} exceeds total cache size {}, breaking loop",
                        warmUpTotalFileCache, totalFileCache);
                break;
            }

            List<Partition> partitions = getPartitionsFromTriple(tableTriple);
            LOG.info("Got {} partitions for table {}.{}.{}", partitions.size(),
                    tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());

            List<Partition> warmUpPartitions = new ArrayList<>();
            for (Partition partition : partitions) {
                Long partitionSize = partition.getDataSize(true);
                warmUpTotalFileCache += partitionSize;
                warmUpPartitions.add(partition);
                if (warmUpTotalFileCache > totalFileCache) {
                    LOG.info("Warm up size {} exceeds total cache size {}, current partition size {}",
                            warmUpTotalFileCache, totalFileCache, partitionSize);
                    break;
                }
            }
            List<MaterializedIndex> indexes = new ArrayList<>();
            for (Partition partition : warmUpPartitions) {
                indexes.addAll(partition.getMaterializedIndices(IndexExtState.VISIBLE));
            }
            LOG.info("Got {} materialized indexes for table {}.{}.{}", indexes.size(),
                    tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
            List<Tablet> tablets = getTabletsFromIndexs(indexes);
            LOG.info("Got {} tablets for table {}.{}.{}", tablets.size(),
                    tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
            for (Backend backend : backends) {
                Set<Long> beTabletIds = beToTabletIds.get(backend.getId());
                List<Tablet> warmUpTablets = new ArrayList<>();
                for (Tablet tablet : tablets) {
                    if (beTabletIds.contains(tablet.getId())) {
                        warmUpTablets.add(tablet);
                    }
                }
                LOG.info("Assigning {} tablets to backend {}", warmUpTablets.size(), backend.getId());
                beToWarmUpTablets.computeIfAbsent(backend.getId(),
                        k -> new ArrayList<>()).addAll(warmUpTablets);
            }
        }
        LOG.info("The job {} warm up size is {}, the cluster cache size is {}",
                    jobId, warmUpTotalFileCache, totalFileCache);
        if (warmUpTotalFileCache > totalFileCache && !isForce) {
            throw new RuntimeException("The cluster " + dstClusterName + " cache size is not enough");
        }
        return beToWarmUpTablets;
    }

    public long createJob(WarmUpClusterCommand stmt) throws AnalysisException {
        long jobId = Env.getCurrentEnv().getNextId();
        CloudWarmUpJob warmUpJob;
        if (stmt.isWarmUpWithTable()) {
            Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
            if (!FeConstants.runningUnitTest) {
                beToWarmUpTablets = warmUpNewClusterByTable(jobId, stmt.getDstCluster(), stmt.getTables(),
                                                            stmt.isForce());
            }
            Map<Long, List<List<Long>>> beToTabletIdBatches = splitBatch(beToWarmUpTablets);
            warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstCluster(),
                    beToTabletIdBatches, JobType.TABLE, stmt.getTables(), stmt.isForce());
        } else {
            CloudWarmUpJob.Builder builder = new CloudWarmUpJob.Builder()
                    .setJobId(jobId)
                    .setSrcClusterName(stmt.getSrcCluster())
                    .setDstClusterName(stmt.getDstCluster())
                    .setJobType(JobType.CLUSTER);

            Map<String, String> properties = stmt.getProperties();
            if ("periodic".equals(properties.get("sync_mode"))) {
                String syncIntervalSecStr = properties.get("sync_interval_sec");
                if (syncIntervalSecStr == null) {
                    throw new AnalysisException("No sync_interval_sec is provided");
                }
                long syncIntervalSec;
                try {
                    syncIntervalSec = Long.parseLong(syncIntervalSecStr);
                } catch (NumberFormatException e) {
                    throw new AnalysisException("Illegal sync_interval_sec: " + syncIntervalSecStr);
                }
                builder.setSyncMode(SyncMode.PERIODIC)
                        .setSyncInterval(syncIntervalSec);
            } else if ("event_driven".equals(properties.get("sync_mode"))) {
                String syncEventStr = properties.get("sync_event");
                if (syncEventStr == null) {
                    throw new AnalysisException("No sync_event is provided");
                }
                CloudWarmUpJob.SyncEvent syncEvent;
                try {
                    syncEvent = CloudWarmUpJob.SyncEvent.valueOf(syncEventStr.toUpperCase());
                } catch (IllegalArgumentException e) {
                    throw new AnalysisException("Illegal sync_event: " + syncEventStr, e);
                }
                builder.setSyncMode(SyncMode.EVENT_DRIVEN)
                        .setSyncEvent(syncEvent);

                // Handle ON TABLES rules
                List<OnTablesFilter.TableFilterRule> onTablesRules = stmt.getOnTablesRules();
                if (onTablesRules != null && !onTablesRules.isEmpty()) {
                    builder.setJobType(JobType.TABLES);
                    List<CloudWarmUpJob.PersistedTableFilterRule> persistedRules = new ArrayList<>();
                    for (OnTablesFilter.TableFilterRule rule : onTablesRules) {
                        CloudWarmUpJob.PersistedTableFilterRule pr = new CloudWarmUpJob.PersistedTableFilterRule();
                        pr.ruleType = rule.getRuleType().name();
                        pr.pattern = rule.getRawPattern();
                        persistedRules.add(pr);
                    }
                    builder.setTableFilterRules(persistedRules);
                }
            } else {
                builder.setSyncMode(SyncMode.ONCE);
            }
            warmUpJob = builder.build();

            // For event-driven jobs with ON TABLES, rebuild filter and resolve initial table IDs
            if (warmUpJob.hasTableFilter()) {
                warmUpJob.rebuildOnTablesFilter();
                Map<Long, String> initialTableIdNames = resolveTableIds(warmUpJob.getOnTablesFilter());
                logMatchedTables("created table filter for job " + jobId, initialTableIdNames);
                if (initialTableIdNames.isEmpty()) {
                    throw new AnalysisException("No tables matched the ON TABLES filter");
                }
                warmUpJob.setCurrentTableIdNames(initialTableIdNames);
            }
        }

        addCloudWarmUpJob(warmUpJob);

        Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(warmUpJob);
        LOG.info("finished to create cloud warm up job: {}", warmUpJob.getJobId());

        return jobId;
    }

    public void cancel(CancelWarmUpJobCommand stmt) throws DdlException {
        cancel(stmt.getJobId());
    }

    public void cancel(long jobId) throws DdlException {
        cancel(jobId, "user cancel");
    }

    public void cancel(long jobId, String msg) throws DdlException {
        CloudWarmUpJob job = cloudWarmUpJobs.get(jobId);
        if (job == null) {
            throw new DdlException("job id: " + jobId + " does not exist.");
        }
        if (!job.cancel(msg, true)) {
            throw new DdlException("job can not be cancelled. State: " + job.getJobState());
        }
    }

    public void cancelTableFilterJobsForClusterChange(String clusterName, String reason) {
        for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
            if (job.isDone() || !job.hasTableFilter()) {
                continue;
            }
            if (!Objects.equals(clusterName, job.getSrcClusterName())
                    && !Objects.equals(clusterName, job.getDstClusterName())) {
                continue;
            }
            try {
                cancel(job.getJobId(), reason);
                LOG.info("cancel table-level cloud warm up job {} because compute group {} changed: {}",
                        job.getJobId(), clusterName, reason);
            } catch (DdlException e) {
                LOG.warn("failed to cancel table-level cloud warm up job {} after compute group {} changed",
                        job.getJobId(), clusterName, e);
            }
        }
    }

    private void runCloudWarmUpJob() {
        runnableCloudWarmUpJobs.values().forEach(cloudWarmUpJob -> {
            if (cloudWarmUpJob.shouldWait()) {
                return;
            }
            if (!cloudWarmUpJob.isDone() && !activeCloudWarmUpJobs.containsKey(cloudWarmUpJob.getJobId())
                    && activeCloudWarmUpJobs.size() < Config.max_active_cloud_warm_up_job) {
                if (FeConstants.runningUnitTest) {
                    cloudWarmUpJob.run();
                } else {
                    cloudWarmUpThreadPool.submit(() -> {
                        if (activeCloudWarmUpJobs.putIfAbsent(cloudWarmUpJob.getJobId(), cloudWarmUpJob) == null) {
                            try {
                                cloudWarmUpJob.run();
                            } finally {
                                activeCloudWarmUpJobs.remove(cloudWarmUpJob.getJobId());
                            }
                        }
                    });
                }
            }
        });
    }

    public void replayCloudWarmUpJob(CloudWarmUpJob cloudWarmUpJob) throws Exception {
        // ATTN: not need to replay, just override the job with the same job id.
        runnableCloudWarmUpJobs.put(cloudWarmUpJob.getJobId(), cloudWarmUpJob);
        cloudWarmUpJobs.put(cloudWarmUpJob.getJobId(), cloudWarmUpJob);
        LOG.info("replay cloud warm up job {}, state {}", cloudWarmUpJob.getJobId(), cloudWarmUpJob.getJobState());

        restoreTableFilterState(cloudWarmUpJob);

        if (cloudWarmUpJob.isDone()) {
            notifyJobStop(cloudWarmUpJob);
        } else {
            registerJobForRepeatDetection(cloudWarmUpJob, true);
        }
        if (cloudWarmUpJob.jobState == JobState.DELETED) {
            if (cloudWarmUpJobs.remove(cloudWarmUpJob.getJobId()) != null
                    && runnableCloudWarmUpJobs.remove(cloudWarmUpJob.getJobId()) != null) {
                LOG.info("replay removing expired cloud warm up job {}.", cloudWarmUpJob.getJobId());
            } else {
                // should not happen, but it does no matter, just add a warn log here to observe
                LOG.warn("failed to find cloud warm up job {} when replay removing expired job.",
                            cloudWarmUpJob.getJobId());
            }
        }
    }

    /**
     * Resolve glob-based ON TABLES filter to a map of matching table ID → "db.table" name
     * by iterating all databases and tables in the internal catalog.
     */
    public Map<Long, String> resolveTableIds(OnTablesFilter filter) {
        Map<Long, String> result = new HashMap<>();
        if (filter == null) {
            return result;
        }
        Collection<DatabaseIf<? extends TableIf>> allDbs =
                Env.getCurrentInternalCatalog().getAllDbs();
        for (DatabaseIf<? extends TableIf> dbIf : allDbs) {
            String dbName = dbIf.getFullName();
            // Strip "default_cluster:" prefix if present
            if (dbName.contains(":")) {
                dbName = dbName.substring(dbName.indexOf(':') + 1);
            }
            Set<String> tableNames = dbIf.getTableNamesOrEmptyWithLock();
            for (String tableName : tableNames) {
                TableIf table = dbIf.getTableNullable(tableName);
                if (table != null && table.isManagedTable() && filter.shouldWarmUp(dbName, tableName)) {
                    result.put(table.getId(), dbName + "." + tableName);
                }
            }
        }
        return result;
    }

    private void logMatchedTables(String action, Map<Long, String> tableIdNames) {
        String matchedTables = CloudWarmUpJob.formatMatchedTablesForDisplay(tableIdNames.entrySet().stream()
                .sorted(Map.Entry.comparingByKey())
                .map(entry -> entry.getKey() + ":" + entry.getValue())
                .collect(Collectors.toList()));
        LOG.info("{}: matched_table_count={}, matched_tables=[{}]",
                action, tableIdNames.size(), matchedTables);
    }

    /**
     * Periodically refresh table IDs for all running event-driven jobs with ON TABLES filter.
     * Called from the daemon loop to pick up newly created/dropped tables matching glob patterns.
     */
    public void refreshAllTableFilters() {
        for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
            if (job.isDone() || !job.isEventDriven() || !job.hasTableFilter()) {
                continue;
            }
            try {
                Map<Long, String> newTableIdNames = resolveTableIds(job.getOnTablesFilter());
                logMatchedTables("refreshed table filter for job " + job.getJobId(), newTableIdNames);
                Set<Long> oldTableIds = job.getCurrentTableIds();
                if (!newTableIdNames.equals(job.getCurrentTableIdNames())) {
                    job.setCurrentTableIdNames(newTableIdNames);
                    LOG.info("refreshed table filter for job {}: {} -> {} tables",
                            job.getJobId(),
                            oldTableIds == null ? 0 : oldTableIds.size(),
                            newTableIdNames.size());
                }
            } catch (Exception e) {
                LOG.warn("failed to refresh table filter for job {}", job.getJobId(), e);
            }
        }
    }

}