CloudWarmUpJob.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Triple;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService.Client;
import org.apache.doris.thrift.TDownloadType;
import org.apache.doris.thrift.TJobMeta;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TWarmUpEventType;
import org.apache.doris.thrift.TWarmUpTabletsRequest;
import org.apache.doris.thrift.TWarmUpTabletsRequestType;
import org.apache.doris.thrift.TWarmUpTabletsResponse;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class CloudWarmUpJob implements Writable {
    private static final Logger LOG = LogManager.getLogger(CloudWarmUpJob.class);

    public enum JobState {
        PENDING,
        RUNNING,
        FINISHED,
        CANCELLED,
        DELETED;
        public boolean isFinalState() {
            return this == JobState.FINISHED || this == JobState.CANCELLED || this == JobState.DELETED;
        }
    }

    public enum JobType {
        CLUSTER,
        TABLE,
        TABLES;
    }

    public enum SyncMode {
        ONCE,
        PERIODIC,
        EVENT_DRIVEN;
    }

    public enum SyncEvent {
        LOAD,
        QUERY
    }

    @SerializedName(value = "jobId")
    protected long jobId;
    @SerializedName(value = "jobState")
    protected JobState jobState;
    @SerializedName(value = "createTimeMs")
    protected long createTimeMs = -1;
    @SerializedName(value = "startTimeMs")
    protected long startTimeMs = -1;

    @SerializedName(value = "errMsg")
    protected String errMsg = "";
    @SerializedName(value = "finishedTimeMs")
    protected long finishedTimeMs = -1;

    @SerializedName(value = "srcClusterName")
    protected String srcClusterName = "";

    // the serialized name is kept for compatibility reasons
    @SerializedName(value = "cloudClusterName")
    protected String dstClusterName = "";

    @SerializedName(value = "lastBatchId")
    protected long lastBatchId = -1;

    @SerializedName(value = "beToTabletIdBatches")
    protected Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();

    @SerializedName(value = "beToThriftAddress")
    protected Map<Long, String> beToThriftAddress = new HashMap<>();

    @SerializedName(value = "JobType")
    protected JobType jobType;

    @SerializedName(value = "tables")
    protected List<Triple<String, String, String>> tables = new ArrayList<>();

    @SerializedName(value = "force")
    protected boolean force = false;

    @SerializedName(value = "syncMode")
    protected SyncMode syncMode = SyncMode.ONCE;

    @SerializedName(value = "syncInterval")
    protected long syncInterval;

    @SerializedName(value = "syncEvent")
    protected SyncEvent syncEvent;

    @SerializedName(value = "tableFilterRules")
    protected List<PersistedTableFilterRule> tableFilterRules = new ArrayList<>();

    // Computed from tableFilterRules via canonicalize(); not persisted.
    private transient String tableFilterExpr = "";
    private transient OnTablesFilter onTablesFilter;
    // Maps table ID ��� "db.table" qualified name for matched tables.
    private transient volatile Map<Long, String> currentTableIdNames = new ConcurrentHashMap<>();

    // Latest event-driven SyncStats collected by FE background metrics refresh. Not persisted.
    private transient volatile JobWarmUpStats syncStats;

    /**
     * Serializable rule for GSON persistence.
     */
    public static class PersistedTableFilterRule {
        @SerializedName("ruleType")
        public String ruleType;
        @SerializedName("pattern")
        public String pattern;
    }

    private static final Comparator<PersistedTableFilterRule> TABLE_FILTER_RULE_COMPARATOR =
            Comparator.comparingInt(CloudWarmUpJob::tableFilterRuleTypeOrder)
                    .thenComparing(rule -> StringUtils.defaultString(rule.pattern));

    private Map<Long, Client> beToClient;

    private Map<Long, TNetworkAddress> beToAddr;

    private int maxRetryTime = 3;

    private int retryTime = 0;

    private boolean retry = false;

    private boolean setJobDone = false;

    public static class Builder {
        private long jobId;
        private String srcClusterName;
        private String dstClusterName;
        private JobType jobType = JobType.CLUSTER;
        private SyncMode syncMode = SyncMode.ONCE;
        private SyncEvent syncEvent;
        private long syncInterval;
        private List<PersistedTableFilterRule> tableFilterRules = new ArrayList<>();

        public Builder() {}

        public Builder setJobId(long jobId) {
            this.jobId = jobId;
            return this;
        }

        public Builder setSrcClusterName(String srcClusterName) {
            this.srcClusterName = srcClusterName;
            return this;
        }

        public Builder setDstClusterName(String dstClusterName) {
            this.dstClusterName = dstClusterName;
            return this;
        }

        public Builder setJobType(JobType jobType) {
            this.jobType = jobType;
            return this;
        }

        public Builder setSyncMode(SyncMode syncMode) {
            this.syncMode = syncMode;
            return this;
        }

        public Builder setSyncEvent(SyncEvent syncEvent) {
            this.syncEvent = syncEvent;
            return this;
        }

        public Builder setSyncInterval(long syncInterval) {
            this.syncInterval = syncInterval;
            return this;
        }

        public Builder setTableFilterRules(List<PersistedTableFilterRule> tableFilterRules) {
            this.tableFilterRules = tableFilterRules;
            return this;
        }

        public CloudWarmUpJob build() {
            if (jobId == 0 || srcClusterName == null || dstClusterName == null || jobType == null || syncMode == null) {
                throw new IllegalStateException("Missing required fields for CloudWarmUpJob");
            }
            return new CloudWarmUpJob(this);
        }
    }

    private CloudWarmUpJob(Builder builder) {
        this.jobId = builder.jobId;
        this.jobState = JobState.PENDING;
        this.srcClusterName = builder.srcClusterName;
        this.dstClusterName = builder.dstClusterName;
        this.jobType = builder.jobType;
        this.syncMode = builder.syncMode;
        this.syncEvent = builder.syncEvent;
        this.syncInterval = builder.syncInterval;
        this.tableFilterRules = normalizeTableFilterRules(builder.tableFilterRules);
        this.tableFilterExpr = computeTableFilterExpr();
        this.createTimeMs = System.currentTimeMillis();
    }

    private void fetchBeToThriftAddress() {
        String clusterName = isEventDriven() ? srcClusterName : dstClusterName;
        List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                .getBackendsByClusterName(clusterName);
        this.beToThriftAddress = new HashMap<>();
        for (Backend backend : backends) {
            beToThriftAddress.put(backend.getId(), backend.getHost() + ":" + backend.getBePort());
        }
    }

    void refreshEventDrivenBeToThriftAddress() {
        if (!isEventDriven()) {
            return;
        }
        Map<Long, String> previousBeToThriftAddress = this.beToThriftAddress;
        fetchBeToThriftAddress();
        if (previousBeToThriftAddress != null && !previousBeToThriftAddress.equals(this.beToThriftAddress)) {
            LOG.info("refresh event-driven warm up job {} BE address count from {} to {}",
                    jobId, previousBeToThriftAddress.size(), this.beToThriftAddress.size());
            LOG.debug("refresh event-driven warm up job {} BE addresses from {} to {}",
                    jobId, previousBeToThriftAddress, this.beToThriftAddress);
        }
        this.beToClient = null;
        this.beToAddr = null;
    }

    public CloudWarmUpJob(long jobId, String srcClusterName, String dstClusterName,
                                Map<Long, List<List<Long>>> beToTabletIdBatches, JobType jobType) {
        this.jobId = jobId;
        this.jobState = JobState.PENDING;
        this.srcClusterName = srcClusterName;
        this.dstClusterName = dstClusterName;
        this.beToTabletIdBatches = beToTabletIdBatches;
        this.createTimeMs = System.currentTimeMillis();
        this.jobType = jobType;
        if (!FeConstants.runningUnitTest) {
            List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                                            .getBackendsByClusterName(dstClusterName);
            for (Backend backend : backends) {
                beToThriftAddress.put(backend.getId(), backend.getHost() + ":" + backend.getBePort());
            }
        }
    }

    public CloudWarmUpJob(long jobId, String cloudClusterName,
                          Map<Long, List<List<Long>>> beToTabletIdBatches, JobType jobType,
                          List<Triple<String, String, String>> tables, boolean force) {
        this(jobId, null, cloudClusterName, beToTabletIdBatches, jobType);
        this.tables = tables;
        this.force = force;
    }

    public void fetchBeToTabletIdBatches() {
        if (FeConstants.runningUnitTest) {
            return;
        }
        if (jobType == JobType.TABLE || jobType == JobType.TABLES) {
            // warm up with table will have to set tablets on creation
            return;
        }
        if (syncMode == null) {
            // This job was created by an old FE version.
            // It doesn't have the source cluster name, but tablets were already set.
            // Return for backward compatibility.
            return;
        }
        if (this.isEventDriven()) {
            // Event-driven jobs do not need to calculate tablets
            return;
        }
        CacheHotspotManager manager = ((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr();
        Map<Long, List<Tablet>> beToWarmUpTablets =
                manager.warmUpNewClusterByCluster(dstClusterName, srcClusterName);
        long totalTablets = beToWarmUpTablets.values().stream()
                .mapToLong(List::size)
                .sum();
        beToTabletIdBatches = manager.splitBatch(beToWarmUpTablets);
        long totalBatches = beToTabletIdBatches.values().stream()
                .mapToLong(List::size)
                .sum();
        LOG.info("warm up job {} tablet num {}, batch num {}", jobId, totalTablets, totalBatches);
    }

    public boolean shouldWait() {
        if (!this.isPeriodic()) {
            return false;
        }
        if (this.jobState != JobState.PENDING) {
            return false;
        }
        long timeSinceLastStart = System.currentTimeMillis() - this.startTimeMs;
        if (timeSinceLastStart < this.syncInterval * 1000L) {
            return true;
        }
        return false;
    }

    public boolean isOnce() {
        return this.syncMode == SyncMode.ONCE || this.syncMode == null;
    }

    public boolean isPeriodic() {
        return this.syncMode == SyncMode.PERIODIC;
    }

    public boolean isEventDriven() {
        return this.syncMode == SyncMode.EVENT_DRIVEN;
    }

    public long getJobId() {
        return jobId;
    }

    public JobState getJobState() {
        return jobState;
    }

    public long getCreateTimeMs() {
        return createTimeMs;
    }

    public long getStartTimeMs() {
        return startTimeMs;
    }

    public String getErrMsg() {
        return errMsg;
    }

    public long getFinishedTimeMs() {
        return finishedTimeMs;
    }

    public long getLastBatchId() {
        return lastBatchId;
    }

    public Map<Long, List<List<Long>>> getBeToTabletIdBatches() {
        return beToTabletIdBatches;
    }

    public Map<Long, String> getBeToThriftAddress() {
        return beToThriftAddress;
    }

    public JobType getJobType() {
        return jobType;
    }

    public SyncMode getSyncMode() {
        return syncMode;
    }

    public SyncEvent getSyncEvent() {
        return syncEvent;
    }

    public JobWarmUpStats getSyncStats() {
        return syncStats;
    }

    public void setSyncStats(JobWarmUpStats syncStats) {
        this.syncStats = syncStats;
    }

    public String getSyncModeString() {
        if (syncMode == null) {
            // For backward compatibility: older FE versions did not set syncMode for jobs,
            // so default to ONCE when syncMode is missing.
            return String.valueOf(SyncMode.ONCE);
        }
        StringBuilder sb = new StringBuilder().append(syncMode);
        switch (syncMode) {
            case PERIODIC:
                sb.append(" (");
                sb.append(syncInterval);
                sb.append("s)");
                break;
            case EVENT_DRIVEN:
                sb.append(" (");
                sb.append(syncEvent);
                sb.append(")");
                break;
            default:
                break;
        }
        return sb.toString();
    }

    public List<String> getJobInfo(JobWarmUpStats stats) {
        return getJobInfo(stats, true);
    }

    public List<String> getJobInfo(JobWarmUpStats stats, boolean showDetailedSyncStats) {
        List<String> info = Lists.newArrayList();
        info.add(String.valueOf(jobId));
        info.add(srcClusterName);
        info.add(dstClusterName);
        info.add(String.valueOf(jobState));
        info.add(String.valueOf(jobType));
        info.add(this.getSyncModeString());
        info.add(TimeUtils.longToTimeStringWithms(createTimeMs));
        info.add(TimeUtils.longToTimeStringWithms(startTimeMs));
        info.add(Long.toString(lastBatchId + 1));
        long maxBatchSize = 0;
        if (beToTabletIdBatches != null) {
            maxBatchSize = beToTabletIdBatches.values().stream()
                    .mapToLong(List::size)
                    .max()
                    .orElse(0);
        }
        info.add(Long.toString(maxBatchSize));
        info.add(TimeUtils.longToTimeStringWithms(finishedTimeMs));
        info.add(errMsg);
        info.add(tables == null ? "" : tables.stream()
                .map(t -> StringUtils.isEmpty(t.getRight())
                        ? t.getLeft() + "." + t.getMiddle()
                        : t.getLeft() + "." + t.getMiddle() + "." + t.getRight())
                .collect(Collectors.joining(", ")));
        info.add(tableFilterExpr == null ? "" : tableFilterExpr);
        info.add(getMatchedTablesString());
        // SyncStats: only for event-driven jobs
        if (isEventDriven() && stats != null) {
            info.add(showDetailedSyncStats ? stats.toJsonString() : stats.toSummaryJsonString());
        } else {
            info.add("");
        }
        return info;
    }

    private String getMatchedTablesString() {
        if (currentTableIdNames == null || currentTableIdNames.isEmpty()) {
            return "";
        }
        return formatMatchedTablesForDisplay(currentTableIdNames.values().stream()
                .sorted()
                .collect(Collectors.toList()));
    }

    static String formatMatchedTablesForDisplay(List<String> matchedTables) {
        if (matchedTables == null || matchedTables.isEmpty()) {
            return "";
        }
        int displayLimit = Math.max(0, Config.cloud_warm_up_matched_tables_display_limit);
        int shownCount = Math.min(matchedTables.size(), displayLimit);
        String result = matchedTables.stream()
                .limit(shownCount)
                .collect(Collectors.joining(", "));
        if (matchedTables.size() <= displayLimit) {
            return result;
        }
        String truncatedSuffix = "... (truncated, " + shownCount + " of " + matchedTables.size() + " shown)";
        return result.isEmpty() ? truncatedSuffix : result + ", " + truncatedSuffix;
    }

    public void setJobState(JobState jobState) {
        this.jobState = jobState;
    }

    public void setCreateTimeMs(long timeMs) {
        this.createTimeMs = timeMs;
    }

    public void setErrMsg(String msg) {
        this.errMsg = msg;
    }

    public void setFinishedTimeMs(long timeMs) {
        this.finishedTimeMs = timeMs;
    }

    public void setCloudClusterName(String name) {
        this.dstClusterName = name;
    }

    public void setLastBatchId(long id) {
        this.lastBatchId = id;
    }

    public void setBeToTabletIdBatches(Map<Long, List<List<Long>>> m) {
        this.beToTabletIdBatches = m;
    }

    public void setBeToThriftAddress(Map<Long, String> m) {
        this.beToThriftAddress = m;
    }

    public void setJobType(JobType t) {
        this.jobType = t;
    }

    public boolean isDone() {
        return jobState.isFinalState();
    }

    public boolean isTimeout() {
        return jobState == JobState.RUNNING
                && (System.currentTimeMillis() - startTimeMs) / 1000 > Config.cloud_warm_up_timeout_second;
    }

    public boolean isExpire() {
        return isDone() && (System.currentTimeMillis() - finishedTimeMs) / 1000
                > Config.history_cloud_warm_up_job_keep_max_second;
    }

    public String getDstClusterName() {
        return dstClusterName;
    }

    public String getSrcClusterName() {
        return srcClusterName;
    }

    public boolean hasTableFilter() {
        return tableFilterRules != null && !tableFilterRules.isEmpty();
    }

    public String getTableFilterExpr() {
        return tableFilterExpr;
    }

    public List<PersistedTableFilterRule> getTableFilterRules() {
        return tableFilterRules;
    }

    public OnTablesFilter getOnTablesFilter() {
        return onTablesFilter;
    }

    /**
     * Returns the set of currently matched table IDs.
     */
    public Set<Long> getCurrentTableIds() {
        if (currentTableIdNames == null) {
            currentTableIdNames = new ConcurrentHashMap<>();
        }
        return currentTableIdNames.keySet();
    }

    /**
     * Sets the current matched table ID-to-name mapping.
     */
    public void setCurrentTableIdNames(Map<Long, String> idNames) {
        this.currentTableIdNames = new ConcurrentHashMap<>(idNames);
    }

    public Map<Long, String> getCurrentTableIdNames() {
        if (currentTableIdNames == null) {
            currentTableIdNames = new ConcurrentHashMap<>();
        }
        return currentTableIdNames;
    }

    /**
     * Compute the canonical table filter expression from persisted rules.
     * Returns empty string when no table filter rules exist.
     */
    private String computeTableFilterExpr() {
        List<PersistedTableFilterRule> normalizedRules = normalizeTableFilterRules(tableFilterRules);
        tableFilterRules = normalizedRules;
        if (normalizedRules.isEmpty()) {
            return "";
        }
        return canonicalizeNormalizedRules(normalizedRules);
    }

    /**
     * Generate canonical JSON from persisted rules for JobKey dedup and SHOW output.
     * Steps: group by type ��� sort alphabetically ��� deduplicate ��� compact JSON.
     */
    public static String canonicalize(List<PersistedTableFilterRule> rules) {
        return canonicalizeNormalizedRules(normalizeTableFilterRules(rules));
    }

    private static String canonicalizeNormalizedRules(List<PersistedTableFilterRule> normalizedRules) {
        List<String> includes = normalizedRules.stream()
                .filter(r -> "INCLUDE".equals(r.ruleType))
                .map(r -> r.pattern)
                .collect(Collectors.toList());
        List<String> excludes = normalizedRules.stream()
                .filter(r -> "EXCLUDE".equals(r.ruleType))
                .map(r -> r.pattern)
                .collect(Collectors.toList());

        JsonObject json = new JsonObject();
        JsonArray incArr = new JsonArray();
        includes.forEach(incArr::add);
        json.add("include", incArr);
        if (!excludes.isEmpty()) {
            JsonArray excArr = new JsonArray();
            excludes.forEach(excArr::add);
            json.add("exclude", excArr);
        }
        return json.toString();
    }

    /**
     * Rebuild the transient OnTablesFilter and tableFilterExpr from persisted tableFilterRules.
     * Called after deserialization (EditLog replay, FE restart).
     */
    public void rebuildOnTablesFilter() {
        if (currentTableIdNames == null) {
            currentTableIdNames = new ConcurrentHashMap<>();
        }
        if (tableFilterRules == null || tableFilterRules.isEmpty()) {
            this.tableFilterRules = new ArrayList<>();
            this.tableFilterExpr = "";
            this.onTablesFilter = null;
            return;
        }
        this.tableFilterExpr = computeTableFilterExpr();
        List<OnTablesFilter.TableFilterRule> rules = tableFilterRules.stream()
                .map(r -> new OnTablesFilter.TableFilterRule(
                        "INCLUDE".equals(r.ruleType)
                                ? OnTablesFilter.TableFilterRule.RuleType.INCLUDE
                                : OnTablesFilter.TableFilterRule.RuleType.EXCLUDE,
                        r.pattern))
                .collect(Collectors.toList());
        this.onTablesFilter = new OnTablesFilter(rules);
    }

    private static int tableFilterRuleTypeOrder(PersistedTableFilterRule rule) {
        return "INCLUDE".equals(rule.ruleType) ? 0 : 1;
    }

    private static String normalizeTableFilterRuleType(String ruleType) {
        Preconditions.checkNotNull(ruleType, "table filter rule type cannot be null");
        Preconditions.checkState("INCLUDE".equalsIgnoreCase(ruleType) || "EXCLUDE".equalsIgnoreCase(ruleType),
                "Unexpected table filter rule type: %s", ruleType);
        return "INCLUDE".equalsIgnoreCase(ruleType) ? "INCLUDE" : "EXCLUDE";
    }

    private static PersistedTableFilterRule copyNormalizedTableFilterRule(PersistedTableFilterRule rule) {
        PersistedTableFilterRule normalizedRule = new PersistedTableFilterRule();
        normalizedRule.ruleType = normalizeTableFilterRuleType(rule.ruleType);
        normalizedRule.pattern = rule.pattern;
        return normalizedRule;
    }

    private static List<PersistedTableFilterRule> normalizeTableFilterRules(List<PersistedTableFilterRule> rules) {
        if (rules == null || rules.isEmpty()) {
            return new ArrayList<>();
        }
        List<PersistedTableFilterRule> sortedRules = rules.stream()
                .map(CloudWarmUpJob::copyNormalizedTableFilterRule)
                .sorted(TABLE_FILTER_RULE_COMPARATOR)
                .collect(Collectors.toList());
        List<PersistedTableFilterRule> normalizedRules = new ArrayList<>();
        String lastRuleKey = null;
        for (PersistedTableFilterRule rule : sortedRules) {
            String ruleKey = rule.ruleType + "\0" + StringUtils.defaultString(rule.pattern);
            if (ruleKey.equals(lastRuleKey)) {
                continue;
            }
            normalizedRules.add(rule);
            lastRuleKey = ruleKey;
        }
        return normalizedRules;
    }

    public synchronized void run() {
        if (isTimeout()) {
            cancel("Timeout", false);
            return;
        }
        if (Config.isCloudMode()) {
            LOG.debug("set context to job");
            ConnectContext ctx = new ConnectContext();
            ctx.setThreadLocalInfo();
            ctx.setCloudCluster(dstClusterName);
        }
        try {
            switch (jobState) {
                case PENDING:
                    runPendingJob();
                    break;
                case RUNNING:
                    runRunningJob();
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
            LOG.warn("state {} exception {}", jobState, e.getMessage());
        } finally {
            if (Config.isCloudMode()) {
                LOG.debug("remove context from job");
                ConnectContext.remove();
            }
        }
    }

    public void initClients() throws Exception {
        prepareClients();
        if (beToClient.isEmpty()) {
            try {
                for (Map.Entry<Long, String> entry : beToThriftAddress.entrySet()) {
                    initClient(entry.getKey(), entry.getValue());
                }
            } catch (Exception e) {
                releaseClients();
                throw new RuntimeException(e);
            }
        }
    }

    private void prepareClients() {
        if (beToThriftAddress == null || beToThriftAddress.isEmpty()) {
            fetchBeToThriftAddress();
        }
        if (beToClient == null) {
            beToClient = new HashMap<>();
            beToAddr = new HashMap<>();
        }
    }

    private void initClient(long beId, String thriftAddress) throws Exception {
        boolean ok = false;
        TNetworkAddress address = null;
        Client client = null;
        try {
            String[] ipPort = thriftAddress.split(":");
            address = new TNetworkAddress(ipPort[0], Integer.parseInt(ipPort[1]));
            beToAddr.put(beId, address);
            client = ClientPool.backendPool.borrowObject(address);
            beToClient.put(beId, client);
            ok = true;
        } finally {
            if (!ok) {
                ClientPool.backendPool.invalidateObject(address, client);
                beToAddr.remove(beId);
            }
        }
    }

    private void initClientsForClearJob() {
        prepareClients();
        if (beToClient.isEmpty()) {
            for (Map.Entry<Long, String> entry : beToThriftAddress.entrySet()) {
                try {
                    initClient(entry.getKey(), entry.getValue());
                } catch (Exception e) {
                    LOG.warn("init client for BE {} ({}) failed when clearing warm up job {}: {}",
                            entry.getKey(), entry.getValue(), jobId, e.getMessage());
                }
            }
        }
    }

    public void releaseClients() {
        if (beToClient != null) {
            for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
                ClientPool.backendPool.returnObject(beToAddr.get(entry.getKey()),
                        entry.getValue());
            }
        }
        beToClient = null;
        beToAddr = null;
    }

    private String getBackendEndpoint(long beId) {
        if (beToAddr != null) {
            TNetworkAddress addr = beToAddr.get(beId);
            if (addr != null) {
                String host = addr.getHostname();
                if (host == null) {
                    host = "unknown";
                }
                return host + ":" + addr.getPort();
            }
        }
        if (beToThriftAddress != null) {
            String addr = beToThriftAddress.get(beId);
            if (addr != null) {
                return addr;
            }
        }
        return "unknown";
    }

    private final void clearJobOnBEs() {
        try {
            initClientsForClearJob();
            // Iterate with explicit iterator so we can remove invalidated clients during iteration.
            Iterator<Map.Entry<Long, Client>> iter = beToClient.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<Long, Client> entry = iter.next();
                long beId = entry.getKey();
                Client client = entry.getValue();
                TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
                request.setType(TWarmUpTabletsRequestType.CLEAR_JOB);
                request.setJobId(jobId);
                if (this.isEventDriven()) {
                    TWarmUpEventType event = getTWarmUpEventType();
                    if (event == null) {
                        // If event type is unknown, skip this BE but continue others.
                        LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE {} ({})",
                                syncEvent, beId, getBackendEndpoint(beId));
                        continue;
                    }
                    request.setEvent(event);
                }
                LOG.info("send warm up request to BE {} ({}). job_id={}, request_type=CLEAR_JOB",
                        beId, getBackendEndpoint(beId), jobId);
                try {
                    client.warmUpTablets(request);
                } catch (Exception e) {
                    // If RPC to this BE fails, invalidate this client and remove it from map,
                    // then continue to next BE so that one bad BE won't block others.
                    LOG.warn("send warm up request to BE {} ({}) failed: {}",
                            beId, getBackendEndpoint(beId), e.getMessage());
                    try {
                        TNetworkAddress addr = beToAddr == null ? null : beToAddr.get(beId);
                        if (addr != null) {
                            ClientPool.backendPool.invalidateObject(addr, client);
                        }
                    } catch (Exception ie) {
                        LOG.warn("invalidate client for BE {} failed: {}", beId, ie.getMessage());
                    }
                    // remove from local map so releaseClients won't try to return an invalidated client
                    iter.remove();
                }
            }
        } catch (Exception e) {
            LOG.warn("send warm up request failed. job_id={}, request_type=CLEAR_JOB, exception={}",
                    jobId, e.getMessage());
        } finally {
            releaseClients();
        }
    }

    public final synchronized boolean cancel(String errMsg, boolean force) {
        if (this.jobState.isFinalState()) {
            return false;
        }
        if (this.jobState == JobState.PENDING) {
            // BE haven't started this job yet, skip RPC
        } else {
            clearJobOnBEs();
        }
        if (this.isOnce() || force) {
            this.jobState = JobState.CANCELLED;
        } else {
            this.jobState = JobState.PENDING;
        }
        this.errMsg = errMsg;
        this.finishedTimeMs = System.currentTimeMillis();
        MetricRepo.updateClusterWarmUpJobLastFinishTime(String.valueOf(jobId), srcClusterName,
                dstClusterName, finishedTimeMs);
        LOG.info("cancel cloud warm up job {}, err {}", jobId, errMsg);
        Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
        ((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr().notifyJobStop(this);
        return true;
    }

    private void runPendingJob() throws DdlException {
        Preconditions.checkState(jobState == JobState.PENDING, jobState);

        // make sure only one job runs concurrently for one destination cluster
        if (!((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr().tryRegisterRunningJob(this)) {
            return;
        }

        // Todo: nothing to prepare yet
        this.setJobDone = false;
        this.lastBatchId = -1;
        this.startTimeMs = System.currentTimeMillis();
        // reset clients to ensure we have the latest BE info
        this.beToThriftAddress = null;
        this.beToClient = null;
        this.beToAddr = null;
        MetricRepo.updateClusterWarmUpJobLatestStartTime(String.valueOf(jobId), srcClusterName,
                dstClusterName, startTimeMs);
        this.fetchBeToTabletIdBatches();
        long totalTablets = beToTabletIdBatches.values().stream()
                .flatMap(List::stream)
                .mapToLong(List::size)
                .sum();
        MetricRepo.increaseClusterWarmUpJobRequestedTablets(dstClusterName, totalTablets);
        MetricRepo.increaseClusterWarmUpJobExecCount(dstClusterName);
        this.jobState = JobState.RUNNING;
        Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
        LOG.info("transfer cloud warm up job {} state to {}", jobId, this.jobState);
    }

    private List<TJobMeta> buildJobMetas(long beId, long batchId) {
        List<TJobMeta> jobMetas = new ArrayList<>();
        List<List<Long>> tabletIdBatches = beToTabletIdBatches.get(beId);
        if (batchId < tabletIdBatches.size()) {
            List<Long> tabletIds = tabletIdBatches.get((int) batchId);
            TJobMeta jobMeta = new TJobMeta();
            jobMeta.setDownloadType(TDownloadType.S3);
            jobMeta.setTabletIds(tabletIds);
            jobMetas.add(jobMeta);
            MetricRepo.increaseClusterWarmUpJobFinishedTablets(dstClusterName, tabletIds.size());
        }
        return jobMetas;
    }

    private TWarmUpEventType getTWarmUpEventType() {
        switch (syncEvent) {
            case LOAD:
                return TWarmUpEventType.LOAD;
            case QUERY:
                return TWarmUpEventType.QUERY;
            default:
                return null;
        }
    }

    private void runEventDrivenJob() throws Exception {
        try {
            refreshEventDrivenBeToThriftAddress();
            initClients();
            for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
                TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
                request.setType(TWarmUpTabletsRequestType.SET_JOB);
                request.setJobId(jobId);
                TWarmUpEventType event = getTWarmUpEventType();
                if (event == null) {
                    throw new IllegalArgumentException("Unknown SyncEvent " + syncEvent);
                }
                request.setEvent(event);
                if (hasTableFilter()) {
                    request.setTableIds(new ArrayList<>(getCurrentTableIds()));
                }
                LOG.debug("send warm up request to BE {} ({}). job_id={}, event={}, "
                                + "request_type=SET_JOB(EVENT), table_ids_count={}",
                        entry.getKey(), getBackendEndpoint(entry.getKey()), jobId, syncEvent,
                        hasTableFilter() ? getCurrentTableIdNames().size() : "all");
                TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
                if (response.getStatus().getStatusCode() != TStatusCode.OK) {
                    if (!response.getStatus().getErrorMsgs().isEmpty()) {
                        errMsg = response.getStatus().getErrorMsgs().get(0);
                    }
                    LOG.warn("send warm up request failed. job_id={}, event={}, err={}",
                            jobId, syncEvent, errMsg);
                }
            }
        } catch (Exception e) {
            errMsg = e.getMessage();
            LOG.warn("send warm up request job_id={} failed with exception {}",
                    jobId, e);
        } finally {
            releaseClients();
        }
    }

    private void runRunningJob() throws Exception {
        Preconditions.checkState(jobState == JobState.RUNNING, jobState);
        if (FeConstants.runningUnitTest) {
            Thread.sleep(1000);
            this.jobState = JobState.FINISHED;
            this.finishedTimeMs = System.currentTimeMillis();
            ((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr().notifyJobStop(this);
            Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
            return;
        }
        if (this.isEventDriven()) {
            runEventDrivenJob();
            return;
        }
        boolean changeToCancelState = false;
        try {
            initClients();
            // If there is first batch, send SET_JOB RPC
            if (lastBatchId == -1 && !setJobDone) {
                setJobDone = true;
                for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
                    TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
                    request.setType(TWarmUpTabletsRequestType.SET_JOB);
                    request.setJobId(jobId);
                    request.setBatchId(lastBatchId + 1);
                    request.setJobMetas(buildJobMetas(entry.getKey(), request.batch_id));
                    LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}"
                            + ", job_size={}, request_type=SET_JOB",
                            entry.getKey(), getBackendEndpoint(entry.getKey()),
                            jobId, request.batch_id, request.job_metas.size());
                    TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
                    if (response.getStatus().getStatusCode() != TStatusCode.OK) {
                        if (!response.getStatus().getErrorMsgs().isEmpty()) {
                            errMsg = response.getStatus().getErrorMsgs().get(0);
                        }
                        changeToCancelState = true;
                    }
                }
            } else {
                // Check the batches of all BEs done
                boolean allLastBatchDone = true;
                for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
                    TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
                    request.setType(TWarmUpTabletsRequestType.GET_CURRENT_JOB_STATE_AND_LEASE);
                    LOG.info("send warm up request to BE {} ({}). job_id={}"
                            + ", request_type=GET_CURRENT_JOB_STATE_AND_LEASE",
                            entry.getKey(), getBackendEndpoint(entry.getKey()), jobId);
                    TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
                    if (response.getStatus().getStatusCode() != TStatusCode.OK) {
                        if (!response.getStatus().getErrorMsgs().isEmpty()) {
                            errMsg = response.getStatus().getErrorMsgs().get(0);
                        }
                        changeToCancelState = true;
                    }
                    if (!changeToCancelState && response.pending_job_size != 0) {
                        allLastBatchDone = false;
                        break;
                    }
                    // /api/debug_point/add/CloudWarmUpJob.FakeLastBatchNotDone
                    if (DebugPointUtil.isEnable("CloudWarmUpJob.FakeLastBatchNotDone")) {
                        allLastBatchDone = false;
                        LOG.info("DebugPoint:CloudWarmUpJob.FakeLastBatchNotDone, jobID={}", jobId);
                        break;
                    }
                }
                if (!changeToCancelState && allLastBatchDone) {
                    if (retry) {
                        // RPC failed, retry
                        retry = false;
                    } else {
                        // last batch is done, log and do next batch
                        lastBatchId++;
                        Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
                    }
                    boolean allBatchesDone = true;
                    for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
                        TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
                        request.setType(TWarmUpTabletsRequestType.SET_BATCH);
                        request.setJobId(jobId);
                        request.setBatchId(lastBatchId + 1);
                        request.setJobMetas(buildJobMetas(entry.getKey(), request.batch_id));
                        if (!request.job_metas.isEmpty()) {
                            // check all batches is done or not
                            allBatchesDone = false;
                            LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}"
                                    + ", job_size={}, request_type=SET_BATCH",
                                    entry.getKey(), getBackendEndpoint(entry.getKey()),
                                    jobId, request.batch_id, request.job_metas.size());
                            TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
                            if (response.getStatus().getStatusCode() != TStatusCode.OK) {
                                if (!response.getStatus().getErrorMsgs().isEmpty()) {
                                    errMsg = response.getStatus().getErrorMsgs().get(0);
                                }
                                changeToCancelState = true;
                            }
                        }
                    }
                    if (allBatchesDone) {
                        clearJobOnBEs();
                        this.finishedTimeMs = System.currentTimeMillis();
                        if (this.isPeriodic()) {
                            // wait for next schedule
                            this.jobState = JobState.PENDING;
                        } else {
                            // release job
                            this.jobState = JobState.FINISHED;
                        }
                        ((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr().notifyJobStop(this);
                        Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
                    }
                }
            }
            if (changeToCancelState) {
                // release job
                cancel("job fail", false);
            }
        } catch (Exception e) {
            retryTime++;
            retry = true;
            if (retryTime < maxRetryTime) {
                LOG.warn("warm up job {} exception: {}", jobId, e.getMessage());
            } else {
                // retry three times and release job
                cancel("retry the warm up job until max retry time " + String.valueOf(maxRetryTime), false);
            }
            releaseClients();
        }
    }

    public void replay() throws Exception {
       // No need to replay anything yet
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this, CloudWarmUpJob.class);
        Text.writeString(out, json);
    }

    public static CloudWarmUpJob read(DataInput in) throws IOException {
        String json = Text.readString(in);
        CloudWarmUpJob job = GsonUtils.GSON.fromJson(json, CloudWarmUpJob.class);
        job.rebuildOnTablesFilter();
        return job;
    }
}