CloudTabletRebalancer.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.catalog;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class CloudTabletRebalancer extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(CloudTabletRebalancer.class);

    private volatile ConcurrentHashMap<Long, Set<Tablet>> beToTabletsGlobal =
            new ConcurrentHashMap<Long, Set<Tablet>>();

    private volatile ConcurrentHashMap<Long, Set<Tablet>> beToColocateTabletsGlobal =
            new ConcurrentHashMap<Long, Set<Tablet>>();

    // used for cloud tablet report
    private volatile ConcurrentHashMap<Long, Set<Tablet>> beToTabletsGlobalInSecondary =
            new ConcurrentHashMap<Long, Set<Tablet>>();

    private volatile ConcurrentHashMap<Long, Set<Tablet>> futureBeToTabletsGlobal;

    private Map<String, List<Long>> clusterToBes;

    private Set<Long> allBes;

    // partitionId -> indexId -> be -> tablet
    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>> partitionToTablets;

    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
            futurePartitionToTablets;

    // tableId -> be -> tablet
    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable;

    private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> futureBeToTabletsInTable;

    private Map<Long, Long> beToDecommissionedTime = new HashMap<Long, Long>();

    private Random rand = new Random();

    private boolean indexBalanced = true;

    private boolean tableBalanced = true;

    // Scheduling phase for active-tablet priority scheduling.
    // ACTIVE_ONLY: only schedule objects (partition/table) that have activeCnt > 0 (non-internal).
    // INACTIVE_ONLY: schedule objects that are not in ACTIVE_ONLY set, with internal db objects always last.
    // ALL: schedule all objects (keeps internal db last when priority scheduling enabled).
    private enum ActiveSchedulePhase {
        ACTIVE_ONLY,
        INACTIVE_ONLY,
        ALL
    }

    private volatile boolean inited = false;

    private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new LinkedBlockingQueue<Pair<Long, Long>>();

    private Map<InfightTablet, InfightTask> tabletToInfightTask = new ConcurrentHashMap<>();

    private final ConcurrentHashMap<WarmupBatchKey, WarmupBatch> warmupBatches = new ConcurrentHashMap<>();

    private volatile ScheduledExecutorService warmupBatchScheduler;

    private volatile ScheduledExecutorService warmupCheckScheduler;

    private volatile ExecutorService warmupRpcExecutor;

    private final ConcurrentLinkedQueue<WarmupTabletTask> failedWarmupTasks = new ConcurrentLinkedQueue<>();

    private CloudSystemInfoService cloudSystemInfoService;

    private final Object warmupExecutorInitLock = new Object();

    private BalanceTypeEnum globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();

    private Set<Long> activeTabletIds = new HashSet<>();

    // cache for scheduling order in one daemon run (rebuilt in statRouteInfo)
    // table/partition active count is computed from activeTabletIds
    private volatile Map<Long, Long> tableIdToActiveCount = new ConcurrentHashMap<>();
    private volatile Map<Long, Long> partitionIdToActiveCount = new ConcurrentHashMap<>();
    private volatile Map<Long, Long> dbIdToActiveCount = new ConcurrentHashMap<>();
    private volatile Map<Long, Long> tableIdToDbId = new ConcurrentHashMap<>();
    private volatile Map<Long, Long> partitionIdToDbId = new ConcurrentHashMap<>();
    // run-level cache: dbId -> isInternalDb (rebuilt in statRouteInfo)
    private volatile Map<Long, Boolean> dbIdToInternal = new ConcurrentHashMap<>();
    private static final Set<String> INTERNAL_DB_NAMES = Sets.newHashSet("__internal_schema", "information_schema");

    private static final class LocationKey {
        private final long dbId;
        private final long tableId;
        private final long partitionId;
        private final long indexId;

        private LocationKey(long dbId, long tableId, long partitionId, long indexId) {
            this.dbId = dbId;
            this.tableId = tableId;
            this.partitionId = partitionId;
            this.indexId = indexId;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof LocationKey)) {
                return false;
            }
            LocationKey that = (LocationKey) o;
            return dbId == that.dbId
                    && tableId == that.tableId
                    && partitionId == that.partitionId
                    && indexId == that.indexId;
        }

        @Override
        public int hashCode() {
            return Objects.hash(dbId, tableId, partitionId, indexId);
        }
    }

    /**
     * Get the current balance type for a compute group, falling back to global balance type if not found
     */
    private BalanceTypeEnum getCurrentBalanceType(String clusterId) {
        ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId);
        if (cg == null) {
            LOG.debug("compute group not found, use global balance type, id {}", clusterId);
            return globalBalanceTypeEnum;
        }

        BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType();
        if (isComputeGroupBalanceChanged(clusterId)) {
            return computeGroupBalanceType;
        }
        return globalBalanceTypeEnum;
    }

    /**
     * Get the current task timeout for a compute group, falling back to global timeout if not found
     */
    private int getCurrentTaskTimeout(String clusterId) {
        ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId);
        if (cg == null) {
            return Config.cloud_pre_heating_time_limit_sec;
        }

        int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout();
        if (isComputeGroupBalanceChanged(clusterId)) {
            return computeGroupTimeout;
        }

        return Config.cloud_pre_heating_time_limit_sec;
    }

    private boolean isComputeGroupBalanceChanged(String clusterId) {
        ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId);
        if (cg == null) {
            return false;
        }

        BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType();
        int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout();
        return computeGroupBalanceType != ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM
               || computeGroupTimeout != ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT;
    }

    public CloudTabletRebalancer(CloudSystemInfoService cloudSystemInfoService) {
        super("cloud tablet rebalancer", Config.cloud_tablet_rebalancer_interval_second * 1000);
        this.cloudSystemInfoService = cloudSystemInfoService;
    }

    private void initializeWarmupExecutorsIfNeeded() {
        if (warmupRpcExecutor != null) {
            return; // Already initialized
        }
        synchronized (warmupExecutorInitLock) {
            if (warmupRpcExecutor != null) {
                return; // Double check
            }
            Env env = Env.getCurrentEnv();
            if (env == null || !env.isMaster()) {
                LOG.info("Env not initialized or not master, skip start warmup batch scheduler");
                return;
            }
            warmupRpcExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
                Math.max(1, Config.cloud_warm_up_rpc_async_pool_size), 1000,
                "cloud-warmup-rpc-dispatch", true);
            warmupBatchScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
                Thread t = new Thread(r, "cloud-warmup-batch-flusher");
                t.setDaemon(true);
                return t;
            });
            long flushInterval = Math.max(1L, Config.cloud_warm_up_batch_flush_interval_ms);
            warmupBatchScheduler.scheduleAtFixedRate(this::flushExpiredWarmupBatches,
                    flushInterval, flushInterval, TimeUnit.MILLISECONDS);

            warmupCheckScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
                Thread t = new Thread(r, "cloud-warmup-checker");
                t.setDaemon(true);
                return t;
            });
            long warmupCheckInterval = 10L;
            warmupCheckScheduler.scheduleAtFixedRate(() -> {
                try {
                    // send check rpc to be, 10s check once
                    checkInflightWarmUpCacheAsync();
                } catch (Throwable t) {
                    LOG.warn("unexpected error when checking inflight warm up cache async", t);
                }
            }, warmupCheckInterval, warmupCheckInterval, TimeUnit.SECONDS);
            LOG.info("Warmup executors initialized successfully");
        }
    }

    private interface Operator {
        void op(Database db, Table table, Partition partition, MaterializedIndex index, String cluster);
    }

    public enum BalanceType {
        GLOBAL,
        TABLE,
        PARTITION
    }

    public enum StatType {
        GLOBAL,
        TABLE,
        PARTITION,
        SMOOTH_UPGRADE,
        WARM_UP_CACHE
    }

    @Getter
    private class InfightTablet {
        private final Long tabletId;
        private final String clusterId;

        public InfightTablet(Long tabletId, String clusterId) {
            this.tabletId = tabletId;
            this.clusterId = clusterId;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            InfightTablet that = (InfightTablet) o;
            return tabletId.equals(that.tabletId) && clusterId.equals(that.clusterId);
        }

        @Override
        public int hashCode() {
            return Objects.hash(tabletId, clusterId);
        }
    }

    private class InfightTask {
        public Tablet pickedTablet;
        public long srcBe;
        public long destBe;
        public Map<Long, Set<Tablet>> beToTablets;
        public long startTimestamp;
        BalanceType balanceType;
    }

    @Getter
    private static class WarmupBatchKey {
        private final long srcBe;
        private final long destBe;

        WarmupBatchKey(long srcBe, long destBe) {
            this.srcBe = srcBe;
            this.destBe = destBe;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof WarmupBatchKey)) {
                return false;
            }
            WarmupBatchKey that = (WarmupBatchKey) o;
            return srcBe == that.srcBe && destBe == that.destBe;
        }

        @Override
        public int hashCode() {
            return Objects.hash(srcBe, destBe);
        }
    }

    private static class WarmupTabletTask {
        private final Tablet pickedTablet;
        private final long srcBe;
        private final long destBe;
        private final String clusterId;

        WarmupTabletTask(Tablet pickedTablet, long srcBe, long destBe, String clusterId) {
            this.pickedTablet = pickedTablet;
            this.srcBe = srcBe;
            this.destBe = destBe;
            this.clusterId = clusterId;
        }
    }

    private static class WarmupBatch {
        private final WarmupBatchKey key;
        private final List<WarmupTabletTask> tasks = new ArrayList<>();
        private long lastUpdateMs = System.currentTimeMillis();

        WarmupBatch(WarmupBatchKey key) {
            this.key = key;
        }

        synchronized List<WarmupTabletTask> addTask(WarmupTabletTask task, int batchSize) {
            tasks.add(task);
            lastUpdateMs = System.currentTimeMillis();
            if (tasks.size() >= batchSize) {
                return drain();
            }
            return Collections.emptyList();
        }

        synchronized List<WarmupTabletTask> drainIfExpired(long flushIntervalMs) {
            if (tasks.isEmpty()) {
                return Collections.emptyList();
            }
            if (System.currentTimeMillis() - lastUpdateMs >= flushIntervalMs) {
                return drain();
            }
            return Collections.emptyList();
        }

        synchronized boolean isEmpty() {
            return tasks.isEmpty();
        }

        private List<WarmupTabletTask> drain() {
            List<WarmupTabletTask> copy = new ArrayList<>(tasks);
            tasks.clear();
            lastUpdateMs = System.currentTimeMillis();
            return copy;
        }
    }

    private class TransferPairInfo {
        public long srcBe;
        public long destBe;
        public long minTabletsNum;
        public long maxTabletsNum;
    }

    public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) {
        Set<Long> tabletIds = Sets.newHashSet();
        Set<Tablet> tablets = beToTabletsGlobal.get(beId);
        if (tablets != null) {
            //  Create a copy
            for (Tablet tablet : new HashSet<>(tablets)) {
                tabletIds.add(tablet.getId());
            }
        }

        Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
        if (colocateTablets != null) {
            //  Create a copy
            for (Tablet tablet : new HashSet<>(colocateTablets)) {
                tabletIds.add(tablet.getId());
            }
        }

        return tabletIds;
    }

    public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) {
        Set<Long> tabletIds = Sets.newHashSet();
        Set<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
        if (tablets != null) {
            //  Create a copy
            for (Tablet tablet : new HashSet<>(tablets)) {
                tabletIds.add(tablet.getId());
            }
        }
        return tabletIds;
    }

    public Set<Long> getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) {
        Set<Long> tabletIds = Sets.newHashSet();
        tabletIds.addAll(getSnapshotTabletsInPrimaryByBeId(beId));
        tabletIds.addAll(getSnapshotTabletsInSecondaryByBeId(beId));
        return tabletIds;
    }

    public int getTabletNumByBackendId(long beId) {
        Map<Long, Set<Tablet>> sourceMap = beToTabletsGlobal;
        ConcurrentHashMap<Long, Set<Tablet>> futureMap = futureBeToTabletsGlobal;
        if (futureMap != null && !futureMap.isEmpty()) {
            sourceMap = futureMap;
        }

        Set<Tablet> tablets = sourceMap.get(beId);
        Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);

        int tabletsSize = (tablets == null) ? 0 : tablets.size();
        int colocateTabletsSize = (colocateTablets == null) ? 0 : colocateTablets.size();

        return tabletsSize + colocateTabletsSize;
    }

    // 1 build cluster to backends info
    // 2 complete route info
    // 3 check whether the inflight preheating task has been completed
    // 4 migrate tablet for smooth upgrade
    // 5 statistics be to tablets mapping information
    // 6 partition-level balance
    // 7 if tablets in partition-level already balanced, perform table balance
    // 8 if tablets in partition-level and table-level already balanced, perform global balance
    // 9 check whether all tablets of decomission node have been migrated
    @Override
    protected void runAfterCatalogReady() {
        // Initialize warmup executors when catalog is ready
        initializeWarmupExecutorsIfNeeded();

        if (Config.enable_cloud_multi_replica) {
            LOG.info("Tablet balance is temporarily not supported when multi replica enabled");
            return;
        }

        LOG.info("cloud tablet rebalance begin");
        long start = System.currentTimeMillis();
        activeTabletIds = getActiveTabletIds();
        globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();

        buildClusterToBackendMap();
        if (!completeRouteInfo()) {
            return;
        }

        statRouteInfo();
        migrateTabletsForSmoothUpgrade();
        statRouteInfo();

        indexBalanced = true;
        tableBalanced = true;

        performBalancing();

        checkDecommissionState(clusterToBes);
        inited = true;
        long sleepSeconds = Config.cloud_tablet_rebalancer_interval_second;
        if (sleepSeconds < 0L) {
            LOG.warn("cloud tablet rebalance interval second is negative, change it to default 1s");
            sleepSeconds = 1L;
        }
        long balanceEnd = System.currentTimeMillis();
        if (DebugPointUtil.isEnable("CloudTabletRebalancer.balanceEnd.tooLong")) {
            LOG.info("debug pointCloudTabletRebalancer.balanceEnd.tooLong");
            // slower the balance end time to trigger next balance immediately
            balanceEnd += (Config.cloud_tablet_rebalancer_interval_second + 10L) * 1000L;
        }
        if (balanceEnd - start > Config.cloud_tablet_rebalancer_interval_second * 1000L) {
            sleepSeconds = 1L;
        }
        setInterval(sleepSeconds * 1000L);
        LOG.info("finished to rebalancer. cost: {} ms, rebalancer sche interval {} s",
                (System.currentTimeMillis() - start), sleepSeconds);
    }

    private void buildClusterToBackendMap() {
        clusterToBes = new HashMap<>();
        allBes = new HashSet<>();
        for (Long beId : cloudSystemInfoService.getAllBackendIds()) {
            Backend be = cloudSystemInfoService.getBackend(beId);
            if (be == null) {
                LOG.info("backend {} not found", beId);
                continue;
            }
            clusterToBes.putIfAbsent(be.getCloudClusterId(), new ArrayList<>());
            clusterToBes.get(be.getCloudClusterId()).add(beId);
            allBes.add(beId);
        }
        LOG.info("cluster to backends {}", clusterToBes);
    }

    private void migrateTabletsForSmoothUpgrade() {
        Pair<Long, Long> pair;
        while (!tabletsMigrateTasks.isEmpty()) {
            try {
                pair = tabletsMigrateTasks.take();
                LOG.debug("begin tablets migration from be {} to be {}", pair.first, pair.second);
                migrateTablets(pair.first, pair.second);
            } catch (InterruptedException e) {
                LOG.warn("migrate tablets failed", e);
                throw new RuntimeException(e);
            }
        }
    }

    private void performBalancing() {
        // ATTN: In general, the order of `balance` should follow `partition`, `table`, and `global`.
        // This is because performing `global` scheduling first and then `partition` scheduling may
        // lead to ineffective scheduling. Specifically, `global` scheduling might place multiple tablets belonging
        // to the same table or partition onto the same BE, while `partition` scheduling later requires these tablets
        // to be dispersed across different BEs, resulting in unnecessary scheduling.
        if (!Config.enable_cloud_active_tablet_priority_scheduling) {
            // Legacy scheduling: schedule the full set.
            if (Config.enable_cloud_partition_balance) {
                balanceAllPartitionsByPhase(ActiveSchedulePhase.ALL);
            }
            if (Config.enable_cloud_table_balance && indexBalanced) {
                balanceAllTablesByPhase(ActiveSchedulePhase.ALL);
            }
            if (Config.enable_cloud_global_balance && indexBalanced && tableBalanced) {
                globalBalance();
            }
        } else {
            // When enabled, do a real two-phase scheduling:
            // Phase 1: schedule only active partitions/tables first.
            // If all active objects are balanced in this run, enter Phase 2:
            // schedule remaining (all - active) objects.
            boolean activeBalanced = true;

            // Phase 1: active-only
            boolean activeIndexBalanced = true;
            boolean activeTableBalanced = true;
            if (Config.enable_cloud_partition_balance) {
                activeIndexBalanced = balanceAllPartitionsByPhase(ActiveSchedulePhase.ACTIVE_ONLY);
            }
            if (Config.enable_cloud_table_balance && activeIndexBalanced) {
                activeTableBalanced = balanceAllTablesByPhase(ActiveSchedulePhase.ACTIVE_ONLY);
            }

            activeBalanced = (!Config.enable_cloud_partition_balance || activeIndexBalanced)
                    && (!Config.enable_cloud_table_balance || activeTableBalanced);

            if (LOG.isDebugEnabled()) {
                LOG.debug("active scheduling phase done: activeIndexBalanced={}, activeTableBalanced={}, "
                                + "activeBalanced={}, clusterNum={}",
                        activeIndexBalanced, activeTableBalanced, activeBalanced, clusterToBes.size());
            }

            if (!activeBalanced) {
                // Active objects are not balanced yet; skip phase2 to avoid diluting scheduling budget.
                return;
            }

            // Phase 2: inactive (all - active), then global if enabled and ready.
            boolean phase2IndexBalanced = true;
            boolean phase2TableBalanced = true;
            if (Config.enable_cloud_partition_balance) {
                phase2IndexBalanced = balanceAllPartitionsByPhase(ActiveSchedulePhase.INACTIVE_ONLY);
            }
            if (Config.enable_cloud_table_balance && phase2IndexBalanced) {
                phase2TableBalanced = balanceAllTablesByPhase(ActiveSchedulePhase.INACTIVE_ONLY);
            }
            if (Config.enable_cloud_global_balance && phase2IndexBalanced && phase2TableBalanced) {
                globalBalance();
            }
        }
    }

    private boolean balanceAllPartitionsByPhase(ActiveSchedulePhase phase) {
        // Reuse existing "balanced" flags as a per-phase signal.
        indexBalanced = true;

        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tablet>> entry : beToTabletsGlobal.entrySet()) {
                LOG.debug("before partition balance({}) be {} tablet num {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
            for (Map.Entry<Long, Set<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
                LOG.debug("before partition balance({}) be {} tablet num(current + pre heating inflight) {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
        }

        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
        for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
            balanceInPartition(entry.getValue(), entry.getKey(), infos, phase);
        }
        if (infos.isEmpty()) {
            return true;
        }
        long oldSize = infos.size();
        infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.PARTITION);
        LOG.info("collect to editlog partitions({}) before size={} after size={} infos", phase, oldSize, infos.size());
        try {
            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
        } catch (Exception e) {
            LOG.warn("failed to update cloud replicas", e);
            return false;
        }

        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tablet>> entry : beToTabletsGlobal.entrySet()) {
                LOG.debug("after partition balance({}) be {} tablet num {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
            for (Map.Entry<Long, Set<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
                LOG.debug("after partition balance({}) be {} tablet num(current + pre heating inflight) {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
        }
        return indexBalanced;
    }

    private boolean balanceAllTablesByPhase(ActiveSchedulePhase phase) {
        tableBalanced = true;

        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tablet>> entry : beToTabletsGlobal.entrySet()) {
                LOG.debug("before table balance({}) be {} tablet num {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
            for (Map.Entry<Long, Set<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
                LOG.debug("before table balance({}) be {} tablet num(current + pre heating inflight) {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
        }

        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
        for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
            balanceInTable(entry.getValue(), entry.getKey(), infos, phase);
        }
        if (infos.isEmpty()) {
            return true;
        }
        long oldSize = infos.size();
        infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.TABLE);
        LOG.info("collect to editlog table({}) before size={} after size={} infos", phase, oldSize, infos.size());
        try {
            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
        } catch (Exception e) {
            LOG.warn("failed to update cloud replicas", e);
            return false;
        }

        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tablet>> entry : beToTabletsGlobal.entrySet()) {
                LOG.debug("after table balance({}) be {} tablet num {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
            for (Map.Entry<Long, Set<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
                LOG.debug("after table balance({}) be {} tablet num(current + pre heating inflight) {}",
                        phase, entry.getKey(), entry.getValue().size());
            }
        }
        return tableBalanced;
    }

    public void globalBalance() {
        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tablet>> entry : beToTabletsGlobal.entrySet()) {
                LOG.debug("before global balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
            }
            for (Map.Entry<Long, Set<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
                LOG.debug("before global balance be {} tablet num(current + pre heating inflight) {}",
                        entry.getKey(), entry.getValue().size());
            }
        }

        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
        for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
            balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
        }
        if (infos.isEmpty()) {
            return;
        }
        long oldSize = infos.size();
        infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.GLOBAL);
        LOG.info("collect to editlog global before size={} after size={} infos", oldSize, infos.size());
        try {
            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
        } catch (Exception e) {
            LOG.warn("failed to update cloud replicas", e);
            // edit log failed, try next time
            return;
        }

        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tablet>> entry : beToTabletsGlobal.entrySet()) {
                LOG.debug("after global balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
            }
            for (Map.Entry<Long, Set<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
                LOG.debug("after global balance be {} tablet num(current + pre heating inflight) {}",
                        entry.getKey(), entry.getValue().size());
            }
        }
    }

    public void checkInflightWarmUpCacheAsync() {
        Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long, List<InfightTask>>();

        Set<InfightTablet> invalidTasks = new HashSet<>();
        for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
            String clusterId = entry.getKey().getClusterId();
            BalanceTypeEnum balanceTypeEnum = getCurrentBalanceType(clusterId);
            if (balanceTypeEnum == BalanceTypeEnum.WITHOUT_WARMUP) {
                // no need check warmup cache async
                invalidTasks.add(entry.getKey());
                continue;
            }
            beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
            beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
        }
        invalidTasks.forEach(key -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("remove inflight warmup task tablet {} cluster {} no need warmup",
                        key.getTabletId(), key.getClusterId());
            }
            tabletToInfightTask.remove(key);
        });

        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
        long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
        for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
            LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
            Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
            if (DebugPointUtil.isEnable("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull")) {
                LOG.info("debug point CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull, be {}", destBackend);
                destBackend = null;
            }
            if (destBackend == null || (!destBackend.isAlive() && destBackend.getLastUpdateMs() < needRehashDeadTime)) {
                // dest backend not exist or dead too long, need remove all inflight tasks in this dest backend
                List<InfightTablet> toRemove = new LinkedList<>();
                for (InfightTask task : entry.getValue()) {
                    for (InfightTablet key : tabletToInfightTask.keySet()) {
                        toRemove.add(new InfightTablet(task.pickedTablet.getId(), key.clusterId));
                    }
                }
                for (InfightTablet key : toRemove) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("remove tablet {}-{}", key.getClusterId(), key.getTabletId());
                    }
                    tabletToInfightTask.remove(key);
                }
                continue;
            }
            if (!destBackend.isAlive()) {
                // dest backend dead, dead time smaller than rehash_tablet_after_be_dead_seconds, wait next time
                continue;
            }
            List<Long> tablets = entry.getValue().stream()
                    .map(task -> task.pickedTablet.getId()).collect(Collectors.toList());
            // check dest backend whether warmup cache done
            Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
            if (taskDone == null) {
                LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}",
                        entry.getKey(), entry.getValue());
                continue;
            }
            String clusterId = cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId();
            for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
                InfightTask task = tabletToInfightTask
                        .getOrDefault(new InfightTablet(result.getKey(), clusterId), null);
                handleWarmupCompletion(task, clusterId, result.getValue(), result.getKey(), infos);
            }
        }
        long oldSize = infos.size();
        infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.WARM_UP_CACHE);
        LOG.info("collect to editlog warmup before size={} after size={} infos", oldSize, infos.size());
        try {
            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
        } catch (Exception e) {
            LOG.warn("failed to update cloud replicas", e);
            // edit log failed, try next time
            return;
        }

        // recalculate inflight beToTablets, just for print the log
        beToInfightTasks.clear();
        for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
            beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
            beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
        }

        for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
            LOG.info("after pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
        }
    }

    public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
        for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
            List<Long> beList = entry.getValue();
            for (long beId : beList) {
                Set<Tablet> tablets = beToTabletsGlobal.get(beId);
                int tabletNum = tablets == null ? 0 : tablets.size();
                Backend backend = cloudSystemInfoService.getBackend(beId);
                if (backend == null) {
                    LOG.info("backend {} not found", beId);
                    continue;
                }
                if (!backend.isDecommissioning()) {
                    continue;
                }
                // here check wal
                long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
                LOG.info("check decommissioning be {} state {} tabletNum {} isActive {} beList {}, wal num {}",
                        backend.getId(), backend.isDecommissioning(), tabletNum, backend.isActive(), beList, walNum);
                if ((tabletNum != 0 || backend.isActive() || walNum != 0) && beList.size() != 1) {
                    continue;
                }
                if (beToDecommissionedTime.containsKey(beId)) {
                    continue;
                }
                LOG.info("prepare to notify meta service be {} decommissioned", backend.getAddress());
                Cloud.AlterClusterRequest.Builder builder =
                        Cloud.AlterClusterRequest.newBuilder()
                                .setRequestIp(FrontendOptions.getLocalHostAddressCached());
                builder.setCloudUniqueId(Config.cloud_unique_id);
                builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);

                Cloud.ClusterPB.Builder clusterBuilder =
                        Cloud.ClusterPB.newBuilder();
                clusterBuilder.setClusterName(backend.getCloudClusterName());
                clusterBuilder.setClusterId(backend.getCloudClusterId());
                clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);

                Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
                nodeBuilder.setIp(backend.getHost());
                nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
                nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
                nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);

                clusterBuilder.addNodes(nodeBuilder);
                builder.setCluster(clusterBuilder);

                Cloud.AlterClusterResponse response;
                try {
                    response = MetaServiceProxy.getInstance().alterCluster(builder.build());
                    if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
                        LOG.warn("notify decommission response: {}", response);
                        continue;
                    }
                    LOG.info("notify decommission response: {} ", response);
                } catch (RpcException e) {
                    LOG.warn("failed to notify decommission", e);
                    continue;
                }
                beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
            }
        }
    }

    private boolean completeRouteInfo() {
        List<UpdateCloudReplicaInfo> updateReplicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
        long[] assignedErrNum = {0L};
        long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
        loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> {
            boolean assigned = false;
            List<Long> beIds = new ArrayList<Long>();
            List<Long> tabletIds = new ArrayList<Long>();
            boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId());
            for (Tablet tablet : index.getTablets()) {
                for (Replica r : tablet.getReplicas()) {
                    CloudReplica replica = (CloudReplica) r;
                    // clean secondary map
                    replica.checkAndClearSecondaryClusterToBe(cluster, needRehashDeadTime);
                    InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster);
                    // colocate table no need to update primary backends
                    if (isColocated) {
                        replica.clearClusterToBe(cluster);
                        tabletToInfightTask.remove(taskKey);
                        continue;
                    }

                    // primary backend is alive or dead not long
                    Backend be = replica.getPrimaryBackend(cluster, false);
                    if (be != null && (be.isQueryAvailable()
                            || (!be.isQueryDisabled()
                            // Compatible with older version upgrades, see https://github.com/apache/doris/pull/42986
                            && (be.getLastUpdateMs() <= 0 || be.getLastUpdateMs() > needRehashDeadTime)))) {
                        beIds.add(be.getId());
                        tabletIds.add(tablet.getId());
                        continue;
                    }

                    // primary backend not available too long, change one
                    long beId = -1L;
                    be = replica.getSecondaryBackend(cluster);
                    if (be != null && be.isQueryAvailable()) {
                        beId = be.getId();
                    } else {
                        InfightTask task = tabletToInfightTask.get(taskKey);
                        be = task == null ? null : Env.getCurrentSystemInfo().getBackend(task.destBe);
                        if (be != null && be.isQueryAvailable()) {
                            beId = be.getId();
                        } else {
                            try {
                                beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true);
                            } catch (ComputeGroupException e) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("failed to hash replica to be {}", cluster, e);
                                }
                                beId = -1;
                            }
                        }
                    }

                    if (beId <= 0) {
                        assignedErrNum[0]++;
                        continue;
                    }

                    LOG.debug("clusterId {} tablet {} change primary backend from {} to {}",
                            cluster, tablet.getId(), be == null ? -1 : be.getId(), beId);
                    tabletToInfightTask.remove(taskKey);

                    ((CloudReplica) replica).updateClusterToPrimaryBe(cluster, beId);
                    beIds.add(beId);
                    tabletIds.add(tablet.getId());
                    assigned = true;
                }
            }

            if (assigned) {
                UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(db.getId(), table.getId(),
                        partition.getId(), index.getId(), cluster, beIds, tabletIds);
                updateReplicaInfos.add(info);
            }
        });

        LOG.info("collect to editlog route {} infos, error num {}", updateReplicaInfos.size(), assignedErrNum[0]);

        if (updateReplicaInfos.isEmpty()) {
            return true;
        }

        try {
            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(updateReplicaInfos);
        } catch (Exception e) {
            LOG.warn("failed to update cloud replicas", e);
            // edit log failed, try next time
            return false;
        }

        return true;
    }

    public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet,
                                ConcurrentHashMap<Long, Set<Tablet>> globalBeToTablets,
                                ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
                                ConcurrentHashMap<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
                                    partToTablets) {
        // global
        globalBeToTablets.putIfAbsent(be, ConcurrentHashMap.newKeySet());
        globalBeToTablets.get(be).add(tablet);

        // table
        beToTabletsInTable.putIfAbsent(tableId, new ConcurrentHashMap<Long, Set<Tablet>>());
        ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfTable = beToTabletsInTable.get(tableId);
        beToTabletsOfTable.putIfAbsent(be, ConcurrentHashMap.newKeySet());
        beToTabletsOfTable.get(be).add(tablet);

        // partition
        partToTablets.putIfAbsent(partId, new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>());
        ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> indexToTablets = partToTablets.get(partId);
        indexToTablets.putIfAbsent(indexId, new ConcurrentHashMap<Long, Set<Tablet>>());
        ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfIndex = indexToTablets.get(indexId);
        beToTabletsOfIndex.putIfAbsent(be, ConcurrentHashMap.newKeySet());
        beToTabletsOfIndex.get(be).add(tablet);
    }

    private void enqueueWarmupTask(WarmupTabletTask task) {
        WarmupBatchKey key = new WarmupBatchKey(task.srcBe, task.destBe);
        WarmupBatch batch = warmupBatches.computeIfAbsent(key, WarmupBatch::new);
        List<WarmupTabletTask> readyTasks = batch.addTask(task, Math.max(1, Config.cloud_warm_up_batch_size));
        if (!readyTasks.isEmpty()) {
            dispatchWarmupBatch(key, readyTasks);
        }
    }

    private void dispatchWarmupBatch(WarmupBatchKey key, List<WarmupTabletTask> tasks) {
        if (tasks.isEmpty()) {
            return;
        }
        initializeWarmupExecutorsIfNeeded();
        if (warmupRpcExecutor != null) {
            warmupRpcExecutor.submit(() -> sendWarmupBatch(key, tasks));
        } else {
            LOG.warn("warmupRpcExecutor is not initialized, skip dispatching warmup batch");
        }
    }

    private void sendWarmupBatch(WarmupBatchKey key, List<WarmupTabletTask> tasks) {
        Backend srcBackend = cloudSystemInfoService.getBackend(key.getSrcBe());
        Backend destBackend = cloudSystemInfoService.getBackend(key.getDestBe());
        if (srcBackend == null || destBackend == null || !destBackend.isAlive()) {
            handleWarmupBatchFailure(tasks, new IllegalStateException(
                    String.format("backend missing or dead, src %s dest %s", srcBackend, destBackend)));
            return;
        }
        List<Long> tabletIds = tasks.stream().map(task -> task.pickedTablet.getId()).collect(Collectors.toList());
        try {
            sendPreHeatingRpc(tabletIds, key.getSrcBe(), key.getDestBe());
        } catch (Exception e) {
            handleWarmupBatchFailure(tasks, e);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("dispatch preheat batch {} from {} to {}, tablet num {}",
                    tabletIds, key.getSrcBe(), key.getDestBe(), tabletIds.size());
        }
    }

    private void handleWarmupBatchFailure(List<WarmupTabletTask> tasks, Exception e) {
        if (e != null) {
            LOG.warn("preheat batch failed, size {}", tasks.size(), e);
        }
        for (WarmupTabletTask task : tasks) {
            failedWarmupTasks.offer(task);
        }
    }

    private void revertWarmupState(WarmupTabletTask task) {
        updateBeToTablets(task.pickedTablet, task.destBe, task.srcBe,
                futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
        tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), task.clusterId));
    }

    private void processFailedWarmupTasks() {
        WarmupTabletTask task;
        while ((task = failedWarmupTasks.poll()) != null) {
            revertWarmupState(task);
        }
    }

    private void flushExpiredWarmupBatches() {
        long flushInterval = Math.max(1L, Config.cloud_warm_up_batch_flush_interval_ms);
        for (Map.Entry<WarmupBatchKey, WarmupBatch> entry : warmupBatches.entrySet()) {
            List<WarmupTabletTask> readyTasks = entry.getValue().drainIfExpired(flushInterval);
            if (!readyTasks.isEmpty()) {
                dispatchWarmupBatch(entry.getKey(), readyTasks);
            }
        }
    }

    public void statRouteInfo() {
        ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobal = new ConcurrentHashMap<Long, Set<Tablet>>();
        ConcurrentHashMap<Long, Set<Tablet>> tmpFutureBeToTabletsGlobal = new ConcurrentHashMap<Long, Set<Tablet>>();
        ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobalInSecondary
                = new ConcurrentHashMap<Long, Set<Tablet>>();
        ConcurrentHashMap<Long, Set<Tablet>> tmpBeToColocateTabletsGlobal
                = new ConcurrentHashMap<Long, Set<Tablet>>();

        partitionToTablets = new ConcurrentHashMap<Long,
            ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>();
        futurePartitionToTablets =
                new ConcurrentHashMap<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>();

        beToTabletsInTable = new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>();
        futureBeToTabletsInTable = new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>();

        // rebuild scheduling caches for this run
        Map<Long, Long> tmpTableActive = new HashMap<>();
        Map<Long, Long> tmpPartitionActive = new HashMap<>();
        Map<Long, Long> tmpDbActive = new HashMap<>();
        Map<Long, Long> tmpTableToDb = new HashMap<>();
        Map<Long, Long> tmpPartitionToDb = new HashMap<>();
        Map<Long, Boolean> tmpDbInternal = new HashMap<>();

        loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> {
            boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId());
            tmpTableToDb.put(table.getId(), db.getId());
            tmpPartitionToDb.put(partition.getId(), db.getId());
            tmpDbInternal.computeIfAbsent(db.getId(), k -> {
                String name = db.getFullName();
                return name != null && INTERNAL_DB_NAMES.contains(name);
            });
            for (Tablet tablet : index.getTablets()) {
                // active tablet scoring (used for scheduling order)
                if (activeTabletIds != null && !activeTabletIds.isEmpty() && activeTabletIds.contains(tablet.getId())) {
                    tmpTableActive.merge(table.getId(), 1L, Long::sum);
                    tmpPartitionActive.merge(partition.getId(), 1L, Long::sum);
                    tmpDbActive.merge(db.getId(), 1L, Long::sum);
                }
                for (Replica r : tablet.getReplicas()) {
                    CloudReplica replica = (CloudReplica) r;
                    if (isColocated) {
                        long beId = -1L;
                        try {
                            beId = replica.getColocatedBeId(cluster);
                        } catch (ComputeGroupException e) {
                            continue;
                        }
                        if (allBes.contains(beId)) {
                            Set<Tablet> colocateTablets =
                                    tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new HashSet<>());
                            colocateTablets.add(tablet);
                        }
                        continue;
                    }

                    Backend be = replica.getPrimaryBackend(cluster, false);
                    long beId = be == null ? -1L : be.getId();
                    if (!allBes.contains(beId)) {
                        continue;
                    }

                    Backend secondaryBe = replica.getSecondaryBackend(cluster);
                    long secondaryBeId = secondaryBe == null ? -1L : secondaryBe.getId();
                    if (allBes.contains(secondaryBeId)) {
                        Set<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
                                .computeIfAbsent(secondaryBeId, k -> new HashSet<>());
                        tablets.add(tablet);
                    }

                    InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster);
                    InfightTask task = tabletToInfightTask.get(taskKey);
                    long futureBeId = task == null ? beId : task.destBe;
                    fillBeToTablets(beId, table.getId(), partition.getId(), index.getId(), tablet,
                            tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets);

                    fillBeToTablets(futureBeId, table.getId(), partition.getId(), index.getId(), tablet,
                            tmpFutureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
                }
            }
        });

        beToTabletsGlobal = tmpBeToTabletsGlobal;
        futureBeToTabletsGlobal = tmpFutureBeToTabletsGlobal;
        beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary;
        beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal;

        tableIdToActiveCount = new ConcurrentHashMap<>(tmpTableActive);
        partitionIdToActiveCount = new ConcurrentHashMap<>(tmpPartitionActive);
        dbIdToActiveCount = new ConcurrentHashMap<>(tmpDbActive);
        tableIdToDbId = new ConcurrentHashMap<>(tmpTableToDb);
        partitionIdToDbId = new ConcurrentHashMap<>(tmpPartitionToDb);
        dbIdToInternal = new ConcurrentHashMap<>(tmpDbInternal);
    }

    public void loopCloudReplica(Operator operator) {
        List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
        for (Long dbId : dbIds) {
            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }
            List<Table> tableList = db.getTables();
            for (Table table : tableList) {
                if (!table.isManagedTable()) {
                    continue;
                }
                OlapTable olapTable = (OlapTable) table;
                table.readLock();
                try {
                    for (Partition partition : olapTable.getAllPartitions()) {
                        for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
                            for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
                                String cluster = entry.getKey();
                                operator.op(db, table, partition, index, cluster);
                            }
                        } // end for indices
                    } // end for partitions
                } finally {
                    table.readUnlock();
                }
            }
        }
    }


    private void balanceInPartition(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos,
                                    ActiveSchedulePhase phase) {
        // balance all partition (prefer active partitions/tables, put internal db at tail)
        Iterable<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>> partitions;
        if (Config.enable_cloud_active_tablet_priority_scheduling) {
            final Comparator<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>> cmp =
                    partitionEntryComparator();
            // Phase-aware filtering and ordering.
            // - ACTIVE_ONLY: only non-internal partitions with activeCnt > 0
            // - INACTIVE_ONLY: all remaining partitions (non-internal inactive first, internal last)
            // - ALL: active (TopN first if configured) -> inactive -> internal
            List<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>> nonInternalActive =
                    new ArrayList<>();
            List<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>> nonInternalInactive =
                    new ArrayList<>();
            List<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>> internalPartitions =
                    new ArrayList<>();

            for (Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>> e
                    : futurePartitionToTablets.entrySet()) {
                long partId = e.getKey();
                boolean internal = isInternalDbId(partitionIdToDbId.get(partId));
                long activeCnt = partitionIdToActiveCount.getOrDefault(partId, 0L);

                if (internal) {
                    // internal partitions are always handled at the end (not in ACTIVE_ONLY).
                    internalPartitions.add(e);
                    continue;
                }

                if (activeCnt > 0) {
                    nonInternalActive.add(e);
                } else {
                    nonInternalInactive.add(e);
                }
            }

            nonInternalActive.sort(cmp);
            nonInternalInactive.sort(cmp);
            internalPartitions.sort(cmp);

            List<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>> ordered =
                    new ArrayList<>(futurePartitionToTablets.size());
            if (phase == ActiveSchedulePhase.ACTIVE_ONLY) {
                // In ACTIVE_ONLY phase, schedule all active partitions (already sorted by cmp, most active first)
                ordered.addAll(nonInternalActive);
            } else if (phase == ActiveSchedulePhase.INACTIVE_ONLY) {
                ordered.addAll(nonInternalInactive);
                ordered.addAll(internalPartitions);
            } else { // ALL
                // All active (already sorted by cmp, most active first), then inactive, then internal
                ordered.addAll(nonInternalActive);
                ordered.addAll(nonInternalInactive);
                ordered.addAll(internalPartitions);
            }

            partitions = ordered;
        } else {
            partitions = futurePartitionToTablets.entrySet();
        }

        for (Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>> partitionEntry
                : partitions) {
            Map<Long, ConcurrentHashMap<Long, Set<Tablet>>> indexToTablets = partitionEntry.getValue();
            // balance all index of a partition
            List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> indexes =
                    new ArrayList<>(indexToTablets.entrySet());
            // index-level ordering is not critical; keep stable by id
            indexes.sort(Comparator.comparingLong(Map.Entry::getKey));
            for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry : indexes) {
                // balance a index
                // Fast path: this index has no tablets in this cluster, skip to avoid useless balanceImpl work.
                if (calculateTotalTablets(bes, entry.getValue()) == 0) {
                    continue;
                }
                balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION, infos);
            }
        }
    }

    private void balanceInTable(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos,
                                ActiveSchedulePhase phase) {
        // balance all tables (prefer active tables/dbs, put internal db at tail)
        Iterable<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> tables;
        if (Config.enable_cloud_active_tablet_priority_scheduling) {
            final Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> cmp = tableEntryComparator();
            List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> nonInternalActive = new ArrayList<>();
            List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> nonInternalInactive = new ArrayList<>();
            List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> internalTables = new ArrayList<>();

            for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> e : futureBeToTabletsInTable.entrySet()) {
                long tableId = e.getKey();
                boolean internal = isInternalDbId(tableIdToDbId.get(tableId));
                long activeCnt = tableIdToActiveCount.getOrDefault(tableId, 0L);
                if (internal) {
                    internalTables.add(e);
                    continue;
                }
                if (activeCnt > 0) {
                    nonInternalActive.add(e);
                } else {
                    nonInternalInactive.add(e);
                }
            }

            nonInternalActive.sort(cmp);
            nonInternalInactive.sort(cmp);
            internalTables.sort(cmp);

            List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> ordered =
                    new ArrayList<>(futureBeToTabletsInTable.size());
            if (phase == ActiveSchedulePhase.ACTIVE_ONLY) {
                ordered.addAll(nonInternalActive);
            } else if (phase == ActiveSchedulePhase.INACTIVE_ONLY) {
                ordered.addAll(nonInternalInactive);
                ordered.addAll(internalTables);
            } else { // ALL
                ordered.addAll(nonInternalActive);
                ordered.addAll(nonInternalInactive);
                ordered.addAll(internalTables);
            }

            tables = ordered;
        } else {
            tables = futureBeToTabletsInTable.entrySet();
        }

        for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry : tables) {
            // Fast path: this table has no tablets in this cluster, skip.
            if (calculateTotalTablets(bes, entry.getValue()) == 0) {
                continue;
            }
            balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, infos);
        }
    }

    // For unit test: override this method to avoid dependency on Env/internal catalog.
    protected boolean isInternalDbId(Long dbId) {
        if (dbId == null || dbId <= 0) {
            return false;
        }
        Boolean cached = dbIdToInternal.get(dbId);
        if (cached != null) {
            return cached;
        }
        // Fallback (should be rare): consult catalog and populate cache.
        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
        boolean internal = false;
        if (db != null) {
            String name = db.getFullName();
            internal = name != null && INTERNAL_DB_NAMES.contains(name);
        }
        dbIdToInternal.put(dbId, internal);
        return internal;
    }

    private Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> tableEntryComparator() {
        return (a, b) -> {
            Long tableIdA = a.getKey();
            Long tableIdB = b.getKey();
            boolean internalA = isInternalDbId(tableIdToDbId.get(tableIdA));
            boolean internalB = isInternalDbId(tableIdToDbId.get(tableIdB));
            if (internalA != internalB) {
                return internalA ? 1 : -1; // internal goes last
            }
            long dbActiveA = dbIdToActiveCount.getOrDefault(tableIdToDbId.get(tableIdA), 0L);
            long dbActiveB = dbIdToActiveCount.getOrDefault(tableIdToDbId.get(tableIdB), 0L);
            int cmpDb = Long.compare(dbActiveB, dbActiveA);
            if (cmpDb != 0) {
                return cmpDb;
            }
            long activeA = tableIdToActiveCount.getOrDefault(tableIdA, 0L);
            long activeB = tableIdToActiveCount.getOrDefault(tableIdB, 0L);
            int cmp = Long.compare(activeB, activeA); // more active first
            if (cmp != 0) {
                return cmp;
            }
            return Long.compare(tableIdB, tableIdA); // tabletId bigger, newer first
        };
    }

    private Comparator<Map.Entry<Long,
            ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>> partitionEntryComparator() {
        return (a, b) -> {
            Long partIdA = a.getKey();
            Long partIdB = b.getKey();
            boolean internalA = isInternalDbId(partitionIdToDbId.get(partIdA));
            boolean internalB = isInternalDbId(partitionIdToDbId.get(partIdB));
            if (internalA != internalB) {
                return internalA ? 1 : -1; // internal goes last
            }
            long dbActiveA = dbIdToActiveCount.getOrDefault(partitionIdToDbId.get(partIdA), 0L);
            long dbActiveB = dbIdToActiveCount.getOrDefault(partitionIdToDbId.get(partIdB), 0L);
            int cmpDb = Long.compare(dbActiveB, dbActiveA);
            if (cmpDb != 0) {
                return cmpDb;
            }
            long activeA = partitionIdToActiveCount.getOrDefault(partIdA, 0L);
            long activeB = partitionIdToActiveCount.getOrDefault(partIdB, 0L);
            int cmp = Long.compare(activeB, activeA); // more active first
            if (cmp != 0) {
                return cmp;
            }
            return Long.compare(partIdB, partIdA); // partId bigger, newer first
        };
    }

    private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) throws Exception {
        sendPreHeatingRpc(Collections.singletonList(pickedTablet.getId()), srcBe, destBe);
    }

    private void sendPreHeatingRpc(List<Long> tabletIds, long srcBe, long destBe) throws Exception {
        BackendService.Client client = null;
        TNetworkAddress address = null;
        Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
        Backend destBackend = cloudSystemInfoService.getBackend(destBe);
        boolean ok = true;
        try {
            address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
            client = ClientPool.backendPool.borrowObject(address);
            TWarmUpCacheAsyncRequest req = new TWarmUpCacheAsyncRequest();
            req.setHost(srcBackend.getHost());
            req.setBrpcPort(srcBackend.getBrpcPort());
            req.setTabletIds(new ArrayList<>(tabletIds));
            TWarmUpCacheAsyncResponse result = client.warmUpCacheAsync(req);
            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
                LOG.warn("pre cache failed status {} {}", result.getStatus().getStatusCode(),
                        result.getStatus().getErrorMsgs());
            }
        } catch (Exception e) {
            LOG.warn("send pre heating rpc error. backend[{}]", destBackend.getId(), e);
            ok = false;
            throw e;
        } finally {
            if (ok) {
                ClientPool.backendPool.returnObject(address, client);
            } else {
                ClientPool.backendPool.invalidateObject(address, client);
            }
        }
    }

    private Map<Long, Boolean> sendCheckWarmUpCacheAsyncRpc(List<Long> tabletIds, long be) {
        BackendService.Client client = null;
        TNetworkAddress address = null;
        Backend destBackend = cloudSystemInfoService.getBackend(be);
        boolean ok = true;
        try {
            address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
            client = ClientPool.backendPool.borrowObject(address);
            TCheckWarmUpCacheAsyncRequest req = new TCheckWarmUpCacheAsyncRequest();
            req.setTablets(tabletIds);
            TCheckWarmUpCacheAsyncResponse result = client.checkWarmUpCacheAsync(req);
            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
                LOG.warn("check pre tablets {} cache status {} {}", tabletIds, result.getStatus().getStatusCode(),
                        result.getStatus().getErrorMsgs());
            } else {
                LOG.debug("check pre tablets {} cache succ status {} {}", tabletIds, result.getStatus().getStatusCode(),
                        result.getStatus().getErrorMsgs());
            }
            return result.getTaskDone();
        } catch (Exception e) {
            LOG.warn("send check pre cache rpc error. tablets{} backend[{}]", tabletIds, destBackend.getId(), e);
            ok = false;
        } finally {
            if (ok) {
                ClientPool.backendPool.returnObject(address, client);
            } else {
                ClientPool.backendPool.invalidateObject(address, client);
            }
        }
        return null;
    }

    private void handleWarmupCompletion(InfightTask task, String clusterId, boolean isDone, long tabletId,
                                           List<UpdateCloudReplicaInfo> infos) {
        if (task == null) {
            LOG.warn("cannot find inflight task for tablet {}-{}", clusterId, tabletId);
            return;
        }
        boolean shouldUpdateMapping = false;
        BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
        LOG.debug("cluster id {}, balance type {}, tabletId {}, ", clusterId, currentBalanceType, tabletId);

        switch (currentBalanceType) {
            case ASYNC_WARMUP: {
                int currentTaskTimeout = getCurrentTaskTimeout(clusterId);
                boolean timeExceeded = System.currentTimeMillis() / 1000 - task.startTimestamp > currentTaskTimeout;
                LOG.debug("tablet {}-{} warmup cache isDone {} timeExceeded {}",
                        clusterId, tabletId, isDone, timeExceeded);
                if (isDone || timeExceeded) {
                    if (!isDone) {
                        // timeout but not done, not normal, info log
                        LOG.info("{}-{} warmup cache timeout {}, forced to change the mapping",
                                clusterId, tabletId, currentTaskTimeout);
                    } else {
                        // done, normal
                        LOG.debug("{}-{} warmup cache done, change the mapping", clusterId, tabletId);
                    }
                    shouldUpdateMapping = true;
                }
                break;
            }
            case SYNC_WARMUP: {
                if (isDone) {
                    // done, normal
                    LOG.debug("{} sync cache done, change the mapping", tabletId);
                    shouldUpdateMapping = true;
                }
                break;
            }
            default:
                break;
        }

        if (!shouldUpdateMapping) {
            return;
        }

        updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);

        if (LOG.isDebugEnabled()) {
            LOG.debug("remove tablet {}-{}", clusterId, task.pickedTablet.getId());
        }
        tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId));

        if (BalanceTypeEnum.SYNC_WARMUP.equals(currentBalanceType)) {
            try {
                // send sync cache rpc again, ignore the result, the best effort to sync some new data
                sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
            } catch (Exception e) {
                LOG.warn("Failed to preheat tablet {} from {} to {}, "
                                + "help msg change fe config cloud_warm_up_for_rebalance_type to without_warmup, ",
                        task.pickedTablet.getId(), task.srcBe, task.destBe, e);
            }
        }
    }

    private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe,
                                   ConcurrentHashMap<Long, Set<Tablet>> globalBeToTablets,
                                   ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
                                   ConcurrentHashMap<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
                                       Set<Tablet>>>> partToTablets) {
        CloudReplica replica = (CloudReplica) pickedTablet.getReplicas().get(0);
        long tableId = replica.getTableId();
        long partId = replica.getPartitionId();
        long indexId = replica.getIndexId();

        globalBeToTablets.get(srcBe).remove(pickedTablet);
        beToTabletsInTable.get(tableId).get(srcBe).remove(pickedTablet);
        partToTablets.get(partId).get(indexId).get(srcBe).remove(pickedTablet);

        fillBeToTablets(destBe, tableId, partId, indexId, pickedTablet, globalBeToTablets, beToTabletsInTable,
                        partToTablets);
    }

    private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId,
                                      List<UpdateCloudReplicaInfo> infos) {
        CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
        Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
        if (db == null) {
            return;
        }
        OlapTable table = (OlapTable) db.getTableNullable(cloudReplica.getTableId());
        if (table == null) {
            return;
        }

        table.readLock();

        try {
            if (db.getTableNullable(cloudReplica.getTableId()) == null) {
                return;
            }

            cloudReplica.updateClusterToPrimaryBe(clusterId, destBe);
            UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
                    cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
                    pickedTablet.getId(), cloudReplica.getId(), clusterId, destBe);
            infos.add(info);
        } finally {
            table.readUnlock();
        }
    }

    private boolean getTransferPair(List<Long> bes, Map<Long, Set<Tablet>> beToTablets, long avgNum,
                                    TransferPairInfo pairInfo) {
        long srcBe = findSourceBackend(bes, beToTablets);
        long destBe = findDestinationBackend(bes, beToTablets, srcBe);

        if (srcBe == -1 || destBe == -1) {
            return false; // No valid backend found
        }

        long minTabletsNum = beToTablets.get(destBe) == null ? 0 : beToTablets.get(destBe).size();
        long maxTabletsNum = beToTablets.get(srcBe) == null ? 0 : beToTablets.get(srcBe).size();

        if (!isTransferValid(srcBe, minTabletsNum, maxTabletsNum, avgNum)) {
            return false; // Transfer conditions not met
        }

        pairInfo.srcBe = srcBe;
        pairInfo.destBe = destBe;
        pairInfo.minTabletsNum = minTabletsNum;
        pairInfo.maxTabletsNum = maxTabletsNum;
        return true;
    }

    private long findSourceBackend(List<Long> bes, Map<Long, Set<Tablet>> beToTablets) {
        long srcBe = -1;
        long maxTabletsNum = 0;

        for (Long be : bes) {
            long tabletNum = beToTablets.getOrDefault(be, Collections.emptySet()).size();
            Backend backend = cloudSystemInfoService.getBackend(be);

            // Check if the backend is decommissioned
            if (backend != null) {
                if ((backend.isDecommissioning() || backend.isDecommissioned()) && tabletNum > 0) {
                    srcBe = be; // Mark as source if decommissioned and has tablets
                    break; // Exit early if we found a decommissioned backend
                }
                if (!backend.isDecommissioning() && !backend.isDecommissioned() && tabletNum > maxTabletsNum) {
                    srcBe = be;
                    maxTabletsNum = tabletNum;
                }
            } else {
                LOG.info("backend {} not found", be);
            }
        }
        return srcBe;
    }

    private long findDestinationBackend(List<Long> bes, Map<Long, Set<Tablet>> beToTablets, long srcBe) {
        long minTabletsNum = Long.MAX_VALUE;
        List<Long> candidateBes = new ArrayList<>();

        for (Long be : bes) {
            long tabletNum = beToTablets.getOrDefault(be, Collections.emptySet()).size();
            Backend backend = cloudSystemInfoService.getBackend(be);
            if (backend != null && backend.isAlive() && !backend.isDecommissioning()
                    && !backend.isDecommissioned() && !backend.isSmoothUpgradeSrc()) {
                if (tabletNum < minTabletsNum) {
                    // Found a BE with fewer tablets, reset candidates
                    minTabletsNum = tabletNum;
                    candidateBes.clear();
                    candidateBes.add(be);
                } else if (tabletNum == minTabletsNum) {
                    // Found a BE with the same minimum tablet count, add to candidates
                    candidateBes.add(be);
                }
            }
        }

        if (candidateBes.isEmpty()) {
            return -1;
        }

        // Shuffle candidates with the same tablet count for better load balancing
        Collections.shuffle(candidateBes, rand);
        return candidateBes.get(0);
    }

    private boolean isTransferValid(long srcBe, long minTabletsNum, long maxTabletsNum, long avgNum) {
        boolean srcDecommissioned = cloudSystemInfoService.getBackend(srcBe).isDecommissioning()
                || cloudSystemInfoService.getBackend(srcBe).isDecommissioned();

        if (!srcDecommissioned) {
            if ((maxTabletsNum < avgNum * (1 + Config.cloud_rebalance_percent_threshold)
                    && minTabletsNum > avgNum * (1 - Config.cloud_rebalance_percent_threshold))
                    || minTabletsNum > maxTabletsNum - Config.cloud_rebalance_number_threshold) {
                return false;
            }
        }
        return true;
    }

    private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, BalanceType balanceType,
                               ConcurrentHashMap<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
                                   beToTabletsInParts,
                               ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTables) {
        if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()
                || cloudSystemInfoService.getBackend(srcBe).isDecommissioned()) {
            return false; // If source BE is decommissioned, no conflict
        }

        if (balanceType == BalanceType.GLOBAL) {
            return checkGlobalBalanceConflict(srcBe, destBe, cloudReplica, beToTabletsInParts, beToTabletsInTables);
        } else if (balanceType == BalanceType.TABLE) {
            return checkTableBalanceConflict(srcBe, destBe, cloudReplica, beToTabletsInParts);
        }

        return false;
    }

    private boolean checkGlobalBalanceConflict(long srcBe, long destBe, CloudReplica cloudReplica,
                                               ConcurrentHashMap<Long,
                                                   ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
                                                   beToTabletsInParts,
                                               ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
                                                   beToTabletsInTables) {
        long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, beToTabletsInParts);
        long minBeSize = getTabletSizeInParts(destBe, cloudReplica, beToTabletsInParts);

        if (minBeSize >= maxBeSize) {
            return true; // Conflict detected
        }

        maxBeSize = getTabletSizeInBes(srcBe, cloudReplica, beToTabletsInTables);
        minBeSize = getTabletSizeInBes(destBe, cloudReplica, beToTabletsInTables);

        return minBeSize >= maxBeSize; // Conflict detected
    }

    private boolean checkTableBalanceConflict(long srcBe, long destBe, CloudReplica cloudReplica,
                                              ConcurrentHashMap<Long,
                                                  ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
                                                  beToTabletsInParts) {
        long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, beToTabletsInParts);
        long minBeSize = getTabletSizeInParts(destBe, cloudReplica, beToTabletsInParts);

        return minBeSize >= maxBeSize; // Conflict detected
    }

    private long getTabletSizeInParts(long beId, CloudReplica cloudReplica,
                                      ConcurrentHashMap<Long,
                                          ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
                                          beToTabletsInParts) {
        ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> indexToTablets
                = beToTabletsInParts.get(cloudReplica.getPartitionId());
        if (indexToTablets == null) {
            return 0;
        }
        ConcurrentHashMap<Long, Set<Tablet>> beToTablets = indexToTablets.get(cloudReplica.getIndexId());
        if (beToTablets == null) {
            return 0;
        }
        Set<Tablet> tablets = beToTablets.get(beId);
        return tablets == null ? 0 : tablets.size();
    }

    private long getTabletSizeInBes(long beId, CloudReplica cloudReplica,
                                    ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTables) {
        ConcurrentHashMap<Long, Set<Tablet>> beToTablets = beToTabletsInTables.get(cloudReplica.getTableId());
        if (beToTablets == null) {
            return 0;
        }
        Set<Tablet> tablets = beToTablets.get(beId);
        return tablets == null ? 0 : tablets.size();
    }


    private void balanceImpl(List<Long> bes, String clusterId, Map<Long, Set<Tablet>> beToTablets,
            BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
        if (bes == null || bes.isEmpty() || beToTablets == null || beToTablets.isEmpty()) {
            return;
        }

        processFailedWarmupTasks();

        long totalTabletsNum = calculateTotalTablets(bes, beToTablets);
        long beNum = countActiveBackends(bes);

        if (beNum == 0) {
            LOG.warn("zero be, but want balance, skip");
            return;
        }

        long avgNum = totalTabletsNum / beNum;
        long transferNum = calculateTransferNum(avgNum, beNum);

        BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId);
        LOG.debug("balance type {}, be num {}, total tablets num {}, avg num {}, transfer num {}",
                currentBalanceType, beNum, totalTabletsNum, avgNum, transferNum);

        final Set<Long> pickedTabletIds = new HashSet<>();

        for (int i = 0; i < transferNum; i++) {
            TransferPairInfo pairInfo = new TransferPairInfo();
            if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) {
                break; // no need balance
            }

            long srcBe = pairInfo.srcBe;
            long destBe = pairInfo.destBe;

            Tablet pickedTablet = pickTabletPreferCold(srcBe, beToTablets.get(srcBe),
                    this.activeTabletIds, pickedTabletIds);
            if (pickedTablet == null) {
                continue; // No tablet to pick
            }

            pickedTabletIds.add(pickedTablet.getId());

            CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
            Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);

            if ((BalanceTypeEnum.WITHOUT_WARMUP.equals(currentBalanceType)
                    || BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType))
                    && srcBackend != null && srcBackend.isAlive()) {
                // direct switch, update fe meta directly, not send preheating task
                if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) {
                    continue;
                }
                boolean moved = transferTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, infos);
                if (moved) {
                    updateBalanceStatus(balanceType);
                }
                if (BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) {
                    LOG.debug("directly switch {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe,
                            clusterId);
                    // send sync cache rpc, best effort
                    try {
                        sendPreHeatingRpc(pickedTablet, srcBe, destBe);
                    } catch (Exception e) {
                        LOG.debug("Failed to preheat tablet {} from {} to {}, "
                                + "directly policy, just ignore the error",
                                pickedTablet.getId(), srcBe, destBe, e);
                        return;
                    }
                }
            } else {
                // cache warm up
                if (isConflict(srcBe, destBe, cloudReplica, balanceType,
                        futurePartitionToTablets, futureBeToTabletsInTable)) {
                    continue;
                }
                boolean moved = preheatAndUpdateTablet(pickedTablet, srcBe, destBe,
                        clusterId, balanceType, beToTablets);
                if (moved) {
                    updateBalanceStatus(balanceType);
                }
            }
        }
    }

    private long calculateTotalTablets(List<Long> bes, Map<Long, Set<Tablet>> beToTablets) {
        return bes.stream()
                .mapToLong(be -> beToTablets.getOrDefault(be, Collections.emptySet()).size())
                .sum();
    }

    private long countActiveBackends(List<Long> bes) {
        return bes.stream()
                .filter(be -> {
                    Backend backend = cloudSystemInfoService.getBackend(be);
                    return backend != null && !backend.isDecommissioning() && !backend.isDecommissioned();
                })
                .count();
    }

    private long calculateTransferNum(long avgNum, long beNum) {
        return Math.max(Math.round(avgNum * Config.cloud_balance_tablet_percent_per_run), beNum);
    }

    private void updateBalanceStatus(BalanceType balanceType) {
        if (balanceType == BalanceType.PARTITION) {
            indexBalanced = false;
        } else if (balanceType == BalanceType.TABLE) {
            tableBalanced = false;
        }
    }

    private Set<Long> getActiveTabletIds() {
        try {
            // get topN active tablets
            List<TabletSlidingWindowAccessStats.AccessStatsResult> active =
                    TabletSlidingWindowAccessStats.getInstance()
                        .getTopNActive(Config.cloud_active_partition_scheduling_topn);
            if (active == null || active.isEmpty()) {
                return Collections.emptySet();
            }
            Set<Long> ids = new HashSet<>(active.size() * 2);
            for (TabletSlidingWindowAccessStats.AccessStatsResult r : active) {
                ids.add(r.id);
            }
            return ids;
        } catch (Throwable t) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to get active tablets from CloudTabletAccessStats, fallback to random pick", t);
            }
            return Collections.emptySet();
        }
    }

    // Choose non-active (cold) tablet first to re-balance, to reduce impact on hot tablets.
    // Fallback to active/random if no cold tablet is available.
    private Tablet pickTabletPreferCold(long srcBe, Set<Tablet> tablets, Set<Long> activeTabletIds,
                                        Set<Long> pickedTabletIds) {
        if (tablets == null || tablets.isEmpty()) {
            return null;
        }
        // Prefer cold tablets first (when active stats is available)
        boolean hasActiveStats = activeTabletIds != null && !activeTabletIds.isEmpty();
        boolean preferCold = Config.enable_cloud_active_tablet_priority_scheduling && hasActiveStats;

        if (preferCold) {
            Tablet cold = reservoirPick(tablets, pickedTabletIds, activeTabletIds, true);
            if (cold != null) {
                return cold;
            }
        }
        return reservoirPick(tablets, pickedTabletIds, activeTabletIds, false);
    }

    // Reservoir sampling to pick one element uniformly at random from candidates,
    // without allocating intermediate collections.
    private Tablet reservoirPick(Set<Tablet> tablets, Set<Long> pickedTabletIds,
                                 Set<Long> activeTabletIds, boolean requireCold) {
        Tablet chosen = null;
        int seen = 0;
        for (Tablet t : tablets) {
            if (pickedTabletIds.contains(t.getId())) {
                continue;
            }
            if (requireCold && activeTabletIds != null && activeTabletIds.contains(t.getId())) {
                continue;
            }
            seen++;
            if (rand.nextInt(seen) == 0) {
                chosen = t;
            }
        }
        return chosen;
    }

    private boolean preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId,
                                     BalanceType balanceType, Map<Long, Set<Tablet>> beToTablets) {
        Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
        Backend destBackend = cloudSystemInfoService.getBackend(destBe);
        if (srcBackend == null || destBackend == null) {
            LOG.warn("backend missing when preheating tablet {} from {} to {}, cluster {}",
                    pickedTablet.getId(), srcBe, destBe, clusterId);
            return false;
        }

        InfightTask task = new InfightTask();
        task.pickedTablet = pickedTablet;
        task.srcBe = srcBe;
        task.destBe = destBe;
        task.balanceType = balanceType;
        task.beToTablets = beToTablets;
        task.startTimestamp = System.currentTimeMillis() / 1000;
        InfightTablet key = new InfightTablet(pickedTablet.getId(), clusterId);

        tabletToInfightTask.put(key, task);
        updateBeToTablets(pickedTablet, srcBe, destBe,
                futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
        LOG.debug("pre cache {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, clusterId);
        enqueueWarmupTask(new WarmupTabletTask(pickedTablet, srcBe, destBe, clusterId));
        return true;
    }

    private boolean transferTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId,
                            BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
        LOG.debug("transfer {} from {} to {}, cluster {}, type {}",
                pickedTablet.getId(), srcBe, destBe, clusterId, balanceType);
        updateBeToTablets(pickedTablet, srcBe, destBe,
                beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
        updateBeToTablets(pickedTablet, srcBe, destBe,
                futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
        updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
        return true;
    }

    public void addTabletMigrationTask(Long srcBe, Long dstBe) {
        tabletsMigrateTasks.offer(Pair.of(srcBe, dstBe));
    }

    /* Migrate tablet replicas from srcBe to dstBe
     * replica location info will be updated in both master and follower FEs.
     */
    private void migrateTablets(Long srcBe, Long dstBe) {
        // get tablets
        Set<Tablet> tablets = beToTabletsGlobal.get(srcBe);
        if (tablets == null || tablets.isEmpty()) {
            LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
            ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
            return;
        }
        List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
        for (Tablet tablet : tablets) {
            // get replica
            CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0);
            Backend be = cloudSystemInfoService.getBackend(srcBe);
            if (be == null) {
                LOG.info("src backend {} not found", srcBe);
                continue;
            }
            // populate to followers
            Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
            if (db == null) {
                long beId;
                try {
                    beId = cloudReplica.getBackendId();
                } catch (ComputeGroupException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("get backend failed cloudReplica {}", cloudReplica, e);
                    }
                    beId = -1;
                }
                LOG.error("get null db from replica, tabletId={}, partitionId={}, beId={}",
                        cloudReplica.getTableId(), cloudReplica.getPartitionId(), beId);
                continue;
            }
            OlapTable table = (OlapTable) db.getTableNullable(cloudReplica.getTableId());
            if (table == null) {
                continue;
            }

            String clusterId = be.getCloudClusterId();
            String clusterName = be.getCloudClusterName();

            table.readLock();
            try {
                if (db.getTableNullable(cloudReplica.getTableId()) == null) {
                    continue;
                }
                // update replica location info
                cloudReplica.updateClusterToPrimaryBe(clusterId, dstBe);
                UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
                        cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
                        tablet.getId(), cloudReplica.getId(), clusterId, dstBe);
                infos.add(info);
            } finally {
                table.readUnlock();
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}",
                        tablet.getId(), srcBe, dstBe, clusterId, clusterName);
            }
        }
        long oldSize = infos.size();
        infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.SMOOTH_UPGRADE);
        LOG.info("collect to editlog migrate before size={} after size={} infos", oldSize, infos.size());
        try {
            Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
        } catch (Exception e) {
            LOG.warn("update cloud replicas failed", e);
            // edit log failed, try next time
            throw new RuntimeException(e);
        }

        try {
            ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
        } catch (UserException e) {
            LOG.warn("registerWaterShedTxnId get exception", e);
            throw new RuntimeException(e);
        }
    }

    private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos,
                                                                             StatType type) {
        long start = System.currentTimeMillis();
        List<UpdateCloudReplicaInfo> rets = new ArrayList<>();
        // clusterId, infos
        Map<String, List<UpdateCloudReplicaInfo>> clusterIdToInfos = infos.stream()
                .collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId));
        Set<String> notBalancedClusterIds = new HashSet<>(this.clusterToBes.keySet());
        for (Map.Entry<String, List<UpdateCloudReplicaInfo>> entry : clusterIdToInfos.entrySet()) {
            // same cluster
            String clusterId = entry.getKey();
            notBalancedClusterIds.remove(clusterId);
            List<UpdateCloudReplicaInfo> infoList = entry.getValue();
            String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                    .getClusterNameByClusterId(clusterId);
            if (!Strings.isNullOrEmpty(clusterName)) {
                MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, infoList.size());
            }
            Map<LocationKey, List<UpdateCloudReplicaInfo>> sameLocationInfos = infoList.stream()
                    .collect(Collectors.groupingBy(
                            info -> new LocationKey(info.getDbId(), info.getTableId(),
                                info.getPartitionId(), info.getIndexId())));
            sameLocationInfos.forEach((locationKey, locationInfos) -> {
                UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo();
                long dbId = -1;
                long tableId = -1;
                long partitionId = -1;
                long indexId = -1;
                for (UpdateCloudReplicaInfo info : locationInfos) {
                    Preconditions.checkState(clusterId.equals(info.getClusterId()),
                            "impossible, cluster id not eq outer=" + clusterId + ", inner=" + info.getClusterId());

                    dbId = info.getDbId();
                    tableId = info.getTableId();
                    partitionId = info.getPartitionId();
                    indexId = info.getIndexId();

                    long tabletId = info.getTabletId();
                    long replicaId = info.getReplicaId();
                    long beId = info.getBeId();
                    newInfo.getTabletIds().add(tabletId);
                    newInfo.getReplicaIds().add(replicaId);
                    newInfo.getBeIds().add(beId);
                }
                newInfo.setDbId(dbId);
                newInfo.setTableId(tableId);
                newInfo.setPartitionId(partitionId);
                newInfo.setIndexId(indexId);
                newInfo.setClusterId(clusterId);
                // ATTN: in unprotectUpdateCloudReplica, use batch must set tabletId = -1
                newInfo.setTabletId(-1);
                rets.add(newInfo);
            });
        }

        for (String clusterId : notBalancedClusterIds) {
            String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                    .getClusterNameByClusterId(clusterId);
            if (!Strings.isNullOrEmpty(clusterName)) {
                MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, 0);
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur size {} cost {} ms",
                    infos.size(), rets.size(), System.currentTimeMillis() - start);
        }
        return rets;
    }

    public boolean isInited() {
        return inited;
    }
}