TabletScheduler.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRebalanceDiskStmt;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletHealth;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletSchedCtx.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CloneTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * TabletScheduler saved the tablets produced by TabletChecker and try to schedule them.
 * It also try to balance the cluster load.
 *
 * We are expecting an efficient way to recovery the entire cluster and make it balanced.
 * Case 1:
 *  A Backend is down. All tablets which has replica on this BE should be repaired as soon as possible.
 *
 * Case 1.1:
 *  As Backend is down, some tables should be repaired in high priority. So the clone task should be able
 *  to preempted.
 *
 * Case 2:
 *  A new Backend is added to the cluster. Replicas should be transfer to that host to balance the cluster load.
 */
public class TabletScheduler extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(TabletScheduler.class);

    // the minimum interval of updating cluster statistics and priority of tablet info
    private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s

    /*
     * Tablet is added to pendingTablets as well it's id in allTabletTypes.
     * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletTypes when
     * handling a tablet.
     * Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished).
     * So if a tablet's id is still in allTabletTypes, TabletChecker can not add tablet to TabletScheduler.
     *
     * pendingTablets + runningTablets = allTabletTypes
     *
     * pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized'
     */
    private MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = MinMaxPriorityQueue.create();
    private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
    // contains all tabletCtxs which state are RUNNING
    private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
    // save the latest 1000 scheduled tablet info
    private Queue<TabletSchedCtx> schedHistory = EvictingQueue.create(1000);

    // be id -> #working slots
    private Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
    // Tag -> load statistic
    private Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();

    private long lastStatUpdateTime = 0;

    private long lastSlotAdjustTime = 0;

    private Env env;
    private SystemInfoService infoService;
    private TabletInvertedIndex invertedIndex;
    private ColocateTableIndex colocateTableIndex;
    private TabletSchedulerStat stat;
    private Rebalancer rebalancer;
    private Rebalancer diskRebalancer;

    // result of adding a tablet to pendingTablets
    public enum AddResult {
        ADDED, // success to add
        ALREADY_IN, // already added, skip
        LIMIT_EXCEED, // number of pending tablets exceed the limit
        REPLACE_ADDED,  // succ to add, and evict a lowest task
        DISABLED // scheduler has been disabled.
    }

    public TabletScheduler(Env env, SystemInfoService infoService, TabletInvertedIndex invertedIndex,
                           TabletSchedulerStat stat, String rebalancerType) {
        super("tablet scheduler", Config.tablet_schedule_interval_ms);
        this.env = env;
        this.infoService = infoService;
        this.invertedIndex = invertedIndex;
        this.colocateTableIndex = env.getColocateTableIndex();
        this.stat = stat;
        if (rebalancerType.equalsIgnoreCase("partition")) {
            this.rebalancer = new PartitionRebalancer(infoService, invertedIndex, backendsWorkingSlots);
        } else {
            this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex, backendsWorkingSlots);
        }
        // if rebalancer can not get new task, then use diskRebalancer to get task
        this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex, backendsWorkingSlots);
    }

    // for fe ut
    public synchronized void clear() {
        pendingTablets.clear();
        allTabletTypes.clear();
        runningTablets.clear();
        schedHistory.clear();

        lastStatUpdateTime = 0;
        lastSlotAdjustTime = 0;
    }

    public TabletSchedulerStat getStat() {
        return stat;
    }

    // just return be or partition rebalancer
    public Rebalancer getRebalancer() {
        return rebalancer;
    }

    /*
     * update working slots at the beginning of each round
     */
    private boolean updateWorkingSlots() {
        ImmutableMap<Long, Backend> backends;
        try {
            backends = infoService.getAllBackendsByAllCluster();
        } catch (AnalysisException e) {
            LOG.warn("failed to get backends with current cluster", e);
            return false;
        }
        for (Backend backend : backends.values()) {
            if (!backend.hasPathHash() && backend.isAlive()) {
                // when upgrading, backend may not get path info yet. so return false and wait for next round.
                // and we should check if backend is alive. If backend is dead when upgrading, this backend
                // will never report its path hash, and tablet scheduler is blocked.
                LOG.info("backend {}:{} with id {} doesn't have path info.", backend.getHost(),
                        backend.getBePort(), backend.getId());
                return false;
            }
        }

        // update exist backends
        Set<Long> deletedBeIds = Sets.newHashSet();
        for (Long beId : backendsWorkingSlots.keySet()) {
            if (backends.containsKey(beId)) {
                Map<Long, TStorageMedium> paths = Maps.newHashMap();
                backends.get(beId).getDisks().values().stream()
                        .filter(v -> v.getState() == DiskState.ONLINE)
                        .forEach(v -> paths.put(v.getPathHash(), v.getStorageMedium()));
                backendsWorkingSlots.get(beId).updatePaths(paths);
            } else {
                deletedBeIds.add(beId);
            }
        }

        // delete non-exist backends
        for (Long beId : deletedBeIds) {
            backendsWorkingSlots.remove(beId);
            LOG.info("delete non exist backend: {}", beId);
        }

        // add new backends
        for (Backend be : backends.values()) {
            if (!backendsWorkingSlots.containsKey(be.getId())) {
                Map<Long, TStorageMedium> paths = Maps.newHashMap();
                be.getDisks().values().stream()
                        .filter(v -> v.getState() == DiskState.ONLINE)
                        .forEach(v -> paths.put(v.getPathHash(), v.getStorageMedium()));
                PathSlot slot = new PathSlot(paths, be.getId());
                backendsWorkingSlots.put(be.getId(), slot);
                LOG.info("add new backend {} with slots num: {}", be.getId(), be.getDisks().size());
            }
        }

        return true;
    }

    public Map<Long, PathSlot> getBackendsWorkingSlots() {
        return backendsWorkingSlots;
    }

    /**
     * add a ready-to-be-scheduled tablet to pendingTablets, if it has not being added before.
     * if force is true, do not check if tablet is already added before.
     */
    public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) {
        if (!force && Config.disable_tablet_scheduler) {
            return AddResult.DISABLED;
        }

        // REPAIR has higher priority than BALANCE.
        // Suppose adding a BALANCE tablet successfully, then adding this tablet's REPAIR ctx will fail.
        // But we set allTabletTypes[tabletId] to REPAIR. Later at the beginning of scheduling this tablet,
        // it will reset its type as allTabletTypes[tabletId], so its type will convert to REPAIR.

        long tabletId = tablet.getTabletId();
        boolean contains = allTabletTypes.containsKey(tabletId);
        if (contains && !force) {
            if (tablet.getType() == TabletSchedCtx.Type.REPAIR) {
                allTabletTypes.put(tabletId, TabletSchedCtx.Type.REPAIR);
            }
            return AddResult.ALREADY_IN;
        }

        AddResult addResult = AddResult.ADDED;
        // if this is not a force add,
        // and number of scheduling tablets exceed the limit,
        // refuse to add.
        if (!force && (pendingTablets.size() >= Config.max_scheduling_tablets
                || runningTablets.size() >= Config.max_scheduling_tablets)) {
            // For a sched tablet, if its compare value is bigger, it will be more close to queue's tail position,
            // and its priority is lower.
            TabletSchedCtx lowestPriorityTablet = pendingTablets.peekLast();
            if (lowestPriorityTablet == null || lowestPriorityTablet.compareTo(tablet) <= 0) {
                return AddResult.LIMIT_EXCEED;
            }
            addResult = AddResult.REPLACE_ADDED;
            pendingTablets.pollLast();
            finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
                    "evict lower priority sched tablet because pending queue is full");
        }

        if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
            allTabletTypes.put(tabletId, tablet.getType());
        }

        pendingTablets.offer(tablet);
        if (!contains) {
            LOG.info("Add tablet to pending queue, {}", tablet);
        }

        return addResult;
    }



    public synchronized boolean containsTablet(long tabletId) {
        return allTabletTypes.containsKey(tabletId);
    }

    public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
        diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS());
    }

    public synchronized void rebalanceDisk(List<Backend> backends, long timeoutS) {
        diskRebalancer.addPrioBackends(backends, timeoutS);
    }

    public synchronized void cancelRebalanceDisk(List<Backend> backends) {
        diskRebalancer.removePrioBackends(backends);
    }

    public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt stmt) {
        diskRebalancer.removePrioBackends(stmt.getBackends());
    }

    /**
     * Iterate current tablets, change their priority to VERY_HIGH if necessary.
     */
    public synchronized void changeTabletsPriorityToVeryHigh(long dbId, long tblId, List<Long> partitionIds) {
        MinMaxPriorityQueue<TabletSchedCtx> newPendingTablets = MinMaxPriorityQueue.create();
        for (TabletSchedCtx tabletCtx : pendingTablets) {
            if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
                    && partitionIds.contains(tabletCtx.getPartitionId())) {
                tabletCtx.setPriority(Priority.VERY_HIGH);
                tabletCtx.setLastVisitedTime(1L);
            }
            newPendingTablets.add(tabletCtx);
        }
        pendingTablets = newPendingTablets;
    }

    /**
     * TabletScheduler will run as a daemon thread at a very short interval(default 5 sec)
     * Firstly, it will try to update cluster load statistic and check if priority need to be adjusted.
     * Then, it will schedule the tablets in pendingTablets.
     * Thirdly, it will check the current running tasks.
     * Finally, it try to balance the cluster if possible.
     *
     * Schedule rules:
     * 1. tablet with higher priority will be scheduled first.
     * 2. high priority should be downgraded if it fails to be schedule too many times.
     * 3. priority may be upgraded if it is not being schedule for a long time.
     * 4. every pending task should has a max scheduled time, if schedule fails too many times, if should be removed.
     * 5. every running task should has a timeout, to avoid running forever.
     * 6. every running task should also has a max failure time,
     *    if clone task fails too many times, if should be removed.
     *
     */
    @Override
    public void runAfterCatalogReady() {
        if (!updateWorkingSlots()) {
            return;
        }

        updateLoadStatistics();
        handleRunningTablets();
        selectTabletsForBalance();
        schedulePendingTablets();

        stat.counterTabletScheduleRound.incrementAndGet();
    }


    private void updateLoadStatistics() {
        updateLoadStatistic();
        rebalancer.updateLoadStatistic(statisticMap);
        diskRebalancer.updateLoadStatistic(statisticMap);

        Set<Long> alterTableIds = Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds();
        rebalancer.updateAlterTableIds(alterTableIds);
        diskRebalancer.updateAlterTableIds(alterTableIds);

        lastStatUpdateTime = System.currentTimeMillis();
    }

    /**
     * Here is the only place we update the load statistic info.
     * We will not update this info dynamically along with the clone job's running.
     * Although it will cause a little bit inaccurate, but is within a controllable range,
     * because we already limit the total number of running clone jobs in cluster by 'backend slots'
     */
    private void updateLoadStatistic() {
        Map<Tag, LoadStatisticForTag> newStatisticMap = Maps.newHashMap();
        Set<Tag> tags = infoService.getTags();
        for (Tag tag : tags) {
            LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex, rebalancer);
            loadStatistic.init();
            newStatisticMap.put(tag, loadStatistic);
            if (LOG.isDebugEnabled()) {
                LOG.debug("update load statistic for tag {}:\n{}", tag, loadStatistic.getBrief());
            }
        }
        Map<Long, Long> pathsCopingSize = getPathsCopingSize();
        for (LoadStatisticForTag loadStatistic : newStatisticMap.values()) {
            for (BackendLoadStatistic beLoadStatistic : loadStatistic.getBackendLoadStatistics()) {
                beLoadStatistic.incrPathsCopingSize(pathsCopingSize);
            }
        }

        this.statisticMap = newStatisticMap;
    }

    public Map<Tag, LoadStatisticForTag> getStatisticMap() {
        return statisticMap;
    }

    /**
     * get at most BATCH_NUM tablets from queue, and try to schedule them.
     * After handle, the tablet info should be
     * 1. in runningTablets with state RUNNING, if being scheduled success.
     * 2. or in schedHistory with state CANCELLING, if some unrecoverable error happens.
     * 3. or in pendingTablets with state PENDING, if failed to be scheduled.
     *
     * if in schedHistory, it should be removed from allTabletTypes.
     */
    private void schedulePendingTablets() {
        long start = System.currentTimeMillis();
        List<TabletSchedCtx> currentBatch = getNextTabletCtxBatch();
        if (LOG.isDebugEnabled()) {
            LOG.debug("get {} tablets to schedule", currentBatch.size());
        }

        AgentBatchTask batchTask = new AgentBatchTask();
        for (TabletSchedCtx tabletCtx : currentBatch) {
            try {
                scheduleTablet(tabletCtx, batchTask);
            } catch (SchedException e) {
                tabletCtx.setErrMsg(e.getMessage());
                if (e.getStatus() == Status.SCHEDULE_FAILED) {
                    boolean isExceedLimit = tabletCtx.onSchedFailedAndCheckExceedLimit(e.getSubCode());
                    if (isExceedLimit) {
                        finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
                                "schedule failed too many times and " + e.getMessage());
                    } else {
                        // we must release resource it current hold, and be scheduled again
                        tabletCtx.releaseResource(this);
                        // adjust priority to avoid some higher priority always be the first in pendingTablets
                        stat.counterTabletScheduledFailed.incrementAndGet();
                        addBackToPendingTablets(tabletCtx);
                    }
                } else if (e.getStatus() == Status.FINISHED) {
                    // schedule redundant tablet or scheduler disabled will throw this exception
                    stat.counterTabletScheduledSucceeded.incrementAndGet();
                    finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getStatus(), e.getMessage());
                } else {
                    Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus());
                    // discard
                    stat.counterTabletScheduledDiscard.incrementAndGet();
                    tabletCtx.setSchedFailedCode(e.getSubCode());
                    finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
                }
                continue;
            } catch (Exception e) {
                LOG.warn("got unexpected exception, discard this schedule. tablet: {}",
                        tabletCtx.getTabletId(), e);
                stat.counterTabletScheduledFailed.incrementAndGet();
                tabletCtx.setSchedFailedCode(SubCode.NONE);
                tabletCtx.setErrMsg(e.getMessage());
                finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, Status.UNRECOVERABLE, e.getMessage());
                continue;
            }

            Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState());
            stat.counterTabletScheduledSucceeded.incrementAndGet();
            addToRunningTablets(tabletCtx);
        }

        // must send task after adding tablet info to runningTablets.
        for (AgentTask task : batchTask.getAllTasks()) {
            if (AgentTaskQueue.addTask(task)) {
                stat.counterCloneTask.incrementAndGet();
            }
            LOG.info("add clone task to agent task queue: {}", task);
        }

        // send task immediately
        AgentTaskExecutor.submit(batchTask);

        long cost = System.currentTimeMillis() - start;
        stat.counterTabletScheduleCostMs.addAndGet(cost);
    }

    private synchronized void addToRunningTablets(TabletSchedCtx tabletCtx) {
        runningTablets.put(tabletCtx.getTabletId(), tabletCtx);
    }

    /**
     * we take the tablet out of the runningTablets and than handle it,
     * avoid other threads see it.
     * Whoever takes this tablet, make sure to put it to the schedHistory or back to runningTablets.
     */
    private synchronized TabletSchedCtx takeRunningTablets(long tabletId) {
        return runningTablets.remove(tabletId);
    }

    /**
     * Try to schedule a single tablet.
     */
    private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
        if (Config.disable_tablet_scheduler) {
            // do not schedule more tablet is tablet scheduler is disabled.
            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "tablet scheduler is disabled");
        }
        if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) {
            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "balance is disabled");
        }

        long currentTime = System.currentTimeMillis();
        tabletCtx.setLastSchedTime(currentTime);
        tabletCtx.setLastVisitedTime(currentTime);
        stat.counterTabletScheduled.incrementAndGet();

        TabletHealth tabletHealth;
        Database db = Env.getCurrentInternalCatalog().getDbOrException(tabletCtx.getDbId(),
                s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                        "db " + tabletCtx.getDbId() + " does not exist"));
        OlapTable tbl = (OlapTable) db.getTableOrException(tabletCtx.getTblId(),
                s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                        "tbl " + tabletCtx.getTblId() + " does not exist"));
        tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE,
                    "table " + tbl.getName() + " does not exist"));
        try {
            long tabletId = tabletCtx.getTabletId();

            boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());

            OlapTableState tableState = tbl.getState();

            Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
            if (partition == null) {
                throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "partition does not exist");
            }

            MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId());
            if (idx == null) {
                throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "index does not exist");
            }

            ReplicaAllocation replicaAlloc = null;
            Tablet tablet = idx.getTablet(tabletId);
            Preconditions.checkNotNull(tablet);
            if (isColocateTable) {
                GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
                if (groupId == null) {
                    throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                            "colocate group does not exist");
                }
                ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
                if (groupSchema == null) {
                    throw new SchedException(Status.UNRECOVERABLE,
                            "colocate group schema " + groupId + " does not exist");
                }
                replicaAlloc = groupSchema.getReplicaAlloc();

                int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
                if (tabletOrderIdx == -1) {
                    tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
                }
                Preconditions.checkState(tabletOrderIdx != -1);

                Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
                tabletHealth = tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc, backendsSet);
                tabletHealth.priority = Priority.HIGH;
                tabletCtx.setColocateGroupBackendIds(backendsSet);
            } else {
                replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
                List<Long> aliveBeIds = infoService.getAllBackendIds(true);
                tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
            }

            if (tabletCtx.getType() != allTabletTypes.get(tabletId)) {
                TabletSchedCtx.Type curType = tabletCtx.getType();
                TabletSchedCtx.Type newType = allTabletTypes.get(tabletId);
                if (curType == TabletSchedCtx.Type.BALANCE && newType == TabletSchedCtx.Type.REPAIR) {
                    tabletCtx.setType(newType);
                    tabletCtx.setReplicaAlloc(replicaAlloc);
                    tabletCtx.setTag(null);
                } else {
                    throw new SchedException(Status.UNRECOVERABLE, "can not convert type of tablet "
                            + tabletId + " from " + curType.name() + " to " + newType.name());
                }
            }

            if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) {
                if (tableState != OlapTableState.NORMAL) {
                    // If table is under ALTER process, do not allow to do balance.
                    throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                            "table's state is not NORMAL");
                }

                try {
                    for (TransactionState transactionState :
                            Env.getCurrentGlobalTransactionMgr().getPreCommittedTxnList(db.getId())) {
                        if (transactionState.getTableIdList().contains(tbl.getId())) {
                            // If table releate to transaction with precommitted status, do not allow to do balance.
                            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                                    "There exists PRECOMMITTED transaction related to table");
                        }
                    }
                } catch (AnalysisException e) {
                    // CHECKSTYLE IGNORE THIS LINE
                    LOG.warn("Exception:", e);
                }
            }

            if (tabletHealth.status != TabletStatus.VERSION_INCOMPLETE
                    && (partition.getState() != PartitionState.NORMAL || tableState != OlapTableState.NORMAL)
                    && tableState != OlapTableState.WAITING_STABLE) {
                // If table is under ALTER process(before FINISHING), do not allow to add or delete replica.
                // VERSION_INCOMPLETE will repair the replica in place, which is allowed.
                // The WAITING_STABLE state is an exception. This state indicates that the table is
                // executing an alter job, but the alter job is in a PENDING state and is waiting for
                // the table to become stable. In this case, we allow the tablet repair to proceed.
                throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                        "table is in alter process, but tablet status is " + tabletHealth.status.name());
            }

            tabletCtx.setTabletHealth(tabletHealth);
            tabletCtx.setIsUniqKeyMergeOnWrite(tbl.isUniqKeyMergeOnWrite());
            if (tabletHealth.status == TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) {
                throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "tablet is healthy");
            } else if (tabletHealth.status != TabletStatus.HEALTHY
                    && tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) {
                // we select an unhealthy tablet to do balance, which is not right.
                // so here we stop this task.
                throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                        "tablet is unhealthy when doing balance");
            }

            // for disk balance more accurately, we only schedule tablet when has lastly stat info about disk
            if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE
                    && tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) {
                checkDiskBalanceLastSuccTime(tabletCtx.getTempSrcBackendId(), tabletCtx.getTempSrcPathHash());
            }
            // we do not concern priority here.
            // once we take the tablet out of priority queue, priority is meaningless.
            tabletCtx.setTablet(tablet);
            tabletCtx.updateTabletSize();
            tabletCtx.setVersionInfo(partition.getVisibleVersion(), partition.getCommittedVersion());
            tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
            tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());

            handleTabletByTypeAndStatus(tabletHealth.status, tabletCtx, batchTask);
        } finally {
            tbl.writeUnlock();
        }
    }

    private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws SchedException {
        PathSlot pathSlot = backendsWorkingSlots.get(beId);
        if (pathSlot == null) {
            throw new SchedException(Status.UNRECOVERABLE, "path slot does not exist");
        }
        long succTime = pathSlot.getDiskBalanceLastSuccTime(pathHash);
        if (succTime > lastStatUpdateTime) {
            throw new SchedException(Status.UNRECOVERABLE, "stat info is outdated");
        }
    }

    public void updateDestPathHash(TabletSchedCtx tabletCtx) {
        // find dest replica
        Optional<Replica> destReplica = tabletCtx.getReplicas()
                .stream().filter(replica -> replica.getBackendIdWithoutException()
                    == tabletCtx.getDestBackendId()).findAny();
        if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
            destReplica.get().setPathHash(tabletCtx.getDestPathHash());
        }
    }

    public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
        PathSlot pathSlot = backendsWorkingSlots.get(beId);
        if (pathSlot == null) {
            return;
        }
        pathSlot.updateDiskBalanceLastSuccTime(pathHash);
    }

    private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
            throws SchedException {
        if (tabletCtx.getType() == Type.REPAIR) {
            switch (status) {
                case REPLICA_MISSING:
                    handleReplicaMissing(tabletCtx, batchTask);
                    break;
                case VERSION_INCOMPLETE:
                case NEED_FURTHER_REPAIR:
                    // same as version incomplete, it prefers to the dest replica which need further repair
                    handleReplicaVersionIncomplete(tabletCtx, batchTask);
                    break;
                case REPLICA_RELOCATING:
                    handleReplicaRelocating(tabletCtx, batchTask);
                    break;
                case REDUNDANT:
                    handleRedundantReplica(tabletCtx, false);
                    break;
                case FORCE_REDUNDANT:
                    handleRedundantReplica(tabletCtx, true);
                    break;
                case REPLICA_MISSING_FOR_TAG:
                    handleReplicaMissingForTag(tabletCtx, batchTask);
                    break;
                case COLOCATE_MISMATCH:
                    handleColocateMismatch(tabletCtx, batchTask);
                    break;
                case COLOCATE_REDUNDANT:
                    handleColocateRedundant(tabletCtx);
                    break;
                case REPLICA_COMPACTION_TOO_SLOW:
                    handleReplicaTooSlow(tabletCtx);
                    break;
                case UNRECOVERABLE:
                    throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "tablet is unrecoverable");
                default:
                    break;
            }
        } else {
            // balance
            doBalance(tabletCtx, batchTask);
        }
    }

    /**
     * Replica is missing, which means there is no enough alive replicas.
     * So we need to find a destination backend to clone a new replica as possible as we can.
     * 1. find an available path in a backend as destination:
     *      1. backend need to be alive.
     *      2. backend of existing replicas should be excluded. (should not be on same host either)
     *      3. backend with proper tag.
     *      4. backend has available slot for clone.
     *      5. replica can fit in the path (consider the threshold of disk capacity and usage percent).
     *      6. try to find a path with lowest load score.
     * 2. find an appropriate source replica:
     *      1. source replica should be healthy
     *      2. backend of source replica has available slot for clone.
     *
     * 3. send clone task to destination backend
     */
    private void handleReplicaMissing(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
        stat.counterReplicaMissingErr.incrementAndGet();
        // check compaction too slow file is recovered
        if (tabletCtx.compactionRecovered()) {
            return;
        }

        // find proper tag
        Tag tag = chooseProperTag(tabletCtx, true);
        // find an available dest backend and path
        RootPathLoadStatistic destPath = chooseAvailableDestPath(tabletCtx, tag, false /* not for colocate */);
        Preconditions.checkNotNull(destPath);
        tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash());
        // choose a source replica for cloning from
        tabletCtx.chooseSrcReplica(backendsWorkingSlots, -1);

        // create clone task
        batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
        incrDestPathCopingSize(tabletCtx);
    }

    // In dealing with the case of missing replicas, we need to select a tag with missing replicas
    // according to the distribution of replicas.
    // If no replica of the tag is missing, an exception is thrown.
    // And for deleting redundant replica, also find out a tag which has redundant replica.
    private Tag chooseProperTag(TabletSchedCtx tabletCtx, boolean forMissingReplica) throws SchedException {
        Tablet tablet = tabletCtx.getTablet();
        List<Replica> replicas = tablet.getReplicas();
        Map<Tag, Short> allocMap = tabletCtx.getReplicaAlloc().getAllocMap();
        Map<Tag, Short> currentAllocMap = Maps.newHashMap();
        for (Replica replica : replicas) {
            long beId;
            try {
                beId = replica.getBackendId();
            } catch (UserException e) {
                LOG.warn("replica is not found", e);
                beId = -1;
            }
            Backend be = infoService.getBackend(beId);
            if (replica.isScheduleAvailable() && replica.isAlive() && !replica.tooSlow()
                    && be.isMixNode()) {
                Short num = currentAllocMap.getOrDefault(be.getLocationTag(), (short) 0);
                currentAllocMap.put(be.getLocationTag(), (short) (num + 1));
            }
        }

        for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
            short curNum = currentAllocMap.getOrDefault(entry.getKey(), (short) 0);
            if (forMissingReplica && curNum < entry.getValue()) {
                return entry.getKey();
            }
            if (!forMissingReplica && curNum > entry.getValue()) {
                return entry.getKey();
            }
        }

        throw new SchedException(Status.UNRECOVERABLE, "no proper tag is chose for tablet " + tablet.getId());
    }

    /**
     * Replica version is incomplete, which means this replica is missing some version,
     * and need to be cloned from a healthy replica, in-place.
     *
     * 1. find the incomplete replica as destination replica
     * 2. find a healthy replica as source replica
     * 3. send clone task
     */
    private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
            throws SchedException {
        stat.counterReplicaVersionMissingErr.incrementAndGet();
        try {
            tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots);
        } catch (SchedException e) {
            // could not find dest, try add a missing.
            if (e.getStatus() == Status.UNRECOVERABLE) {
                // This situation may occur when the BE nodes
                // where all replicas of a tablet are located are decommission,
                // and this task is a VERSION_INCOMPLETE task.
                // This will lead to failure to select a suitable dest replica.
                // At this time, we try to convert this task to a REPLICA_MISSING task, and schedule it again.
                if (LOG.isDebugEnabled()) {
                    LOG.debug("failed to find version incomplete replica for VERSION_INCOMPLETE task. tablet id: {}, "
                            + "try to find a new backend", tabletCtx.getTabletId());
                }
                tabletCtx.releaseResource(this, true);
                tabletCtx.setTabletStatus(TabletStatus.REPLICA_MISSING);
                handleReplicaMissing(tabletCtx, batchTask);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("succeed to find new backend for VERSION_INCOMPLETE task. tablet id: {}",
                            tabletCtx.getTabletId());
                }
                return;
            } else {
                throw e;
            }
        }
        tabletCtx.chooseSrcReplicaForVersionIncomplete(backendsWorkingSlots);

        // create clone task
        batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
    }

    /*
     * There are enough alive replicas with complete version in this tablet, but some of backends may
     * under decommission.
     * First, we try to find a version incomplete replica on available BE.
     * If failed to find, then try to find a new BE to clone the replicas.
     *
     * Give examples of why:
     * Tablet X has 3 replicas on A, B, C 3 BEs.
     * C is decommission, so we choose the BE D to relocating the new replica,
     * After relocating, Tablet X has 4 replicas: A, B, C(decommission), D(may be version incomplete)
     * But D may be version incomplete because the clone task ran a long time, the new version
     * has been published.
     * At the next time of tablet checking, Tablet X's status is still REPLICA_RELOCATING,
     * If we don't choose D as dest BE to do the new relocating, it will choose new backend E to
     * store the new replicas. So back and forth, the number of replicas will increase forever.
     */
    private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
            throws SchedException {
        stat.counterReplicaUnavailableErr.incrementAndGet();
        tabletCtx.setTabletStatus(TabletStatus.VERSION_INCOMPLETE);
        handleReplicaVersionIncomplete(tabletCtx, batchTask);
    }

    /**
     *  replica is redundant, which means there are more replicas than we expected, which need to be dropped.
     *  we just drop one redundant replica at a time, for safety reason.
     *  choosing a replica to drop base on following priority:
     *  1. backend has been dropped
     *  2. replica is bad
     *  3. backend is not available
     *  4. replica's state is CLONE or DECOMMISSION
     *  5. replica's last failed version > 0
     *  6. replica with lower version
     *  7. replica is the src replica of a rebalance task, we can try to get it from rebalancer
     *  8. replica on higher load backend
     */
    private void handleRedundantReplica(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        stat.counterReplicaRedundantErr.incrementAndGet();

        if (deleteBackendDropped(tabletCtx, force)
                || deleteBadReplica(tabletCtx, force)
                || deleteBackendUnavailable(tabletCtx, force)
                || deleteTooSlowReplica(tabletCtx, force)
                || deleteCloneOrDecommissionReplica(tabletCtx, force)
                || deleteReplicaWithFailedVersion(tabletCtx, force)
                || deleteReplicaWithLowerVersion(tabletCtx, force)
                || deleteReplicaOnSameHost(tabletCtx, force)
                || deleteReplicaNotInValidTag(tabletCtx, force)
                || deleteReplicaChosenByRebalancer(tabletCtx, force)
                || deleteReplicaOnUrgentHighDisk(tabletCtx, force)
                || deleteFromScaleInDropReplicas(tabletCtx, force)
                || deleteReplicaOnHighLoadBackend(tabletCtx, force)) {
            // if we delete at least one redundant replica, we still throw a SchedException with status FINISHED
            // to remove this tablet from the pendingTablets(consider it as finished)
            throw new SchedException(Status.FINISHED, "redundant replica is deleted");
        }
        throw new SchedException(Status.UNRECOVERABLE, "unable to delete any redundant replicas");
    }

    private boolean deleteBackendDropped(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        for (Replica replica : tabletCtx.getReplicas()) {
            long beId = replica.getBackendIdWithoutException();
            if (infoService.getBackend(beId) == null) {
                deleteReplicaInternal(tabletCtx, replica, "backend dropped", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteBadReplica(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        for (Replica replica : tabletCtx.getReplicas()) {
            if (replica.isBad()) {
                deleteReplicaInternal(tabletCtx, replica, "replica is bad", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteTooSlowReplica(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        for (Replica replica : tabletCtx.getReplicas()) {
            if (replica.tooSlow()) {
                deleteReplicaInternal(tabletCtx, replica, "replica is too slow", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteBackendUnavailable(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        for (Replica replica : tabletCtx.getReplicas()) {
            Backend be = infoService.getBackend(replica.getBackendIdWithoutException());
            if (be == null) {
                // this case should be handled in deleteBackendDropped()
                continue;
            }
            if (!replica.isScheduleAvailable()) {
                String reason = be.isScheduleAvailable() ? "backend unavailable" : "user drop replica";
                deleteReplicaInternal(tabletCtx, replica, reason, force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteCloneOrDecommissionReplica(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        for (Replica replica : tabletCtx.getReplicas()) {
            if (replica.getState() == ReplicaState.CLONE || replica.getState() == ReplicaState.DECOMMISSION) {
                deleteReplicaInternal(tabletCtx, replica, replica.getState() + " state", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteReplicaWithFailedVersion(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        for (Replica replica : tabletCtx.getReplicas()) {
            if (replica.getLastFailedVersion() > 0) {
                deleteReplicaInternal(tabletCtx, replica, "version incomplete", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteReplicaWithLowerVersion(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        for (Replica replica : tabletCtx.getReplicas()) {
            if (!replica.checkVersionCatchUp(tabletCtx.getCommittedVersion(), false)) {
                deleteReplicaInternal(tabletCtx, replica, "lower version", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteReplicaOnSameHost(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        // collect replicas of this tablet.
        // host -> (replicas on same host)
        Map<String, List<Replica>> hostToReplicas = Maps.newHashMap();
        for (Replica replica : tabletCtx.getReplicas()) {
            Backend be = infoService.getBackend(replica.getBackendIdWithoutException());
            if (be == null) {
                // this case should be handled in deleteBackendDropped()
                return false;
            }
            List<Replica> replicas = hostToReplicas.get(be.getHost());
            if (replicas == null) {
                replicas = Lists.newArrayList();
                hostToReplicas.put(be.getHost(), replicas);
            }
            replicas.add(replica);
        }

        // find if there are replicas on same host, if yes, delete one.
        for (List<Replica> replicas : hostToReplicas.values()) {
            if (replicas.size() > 1) {
                // delete one replica from replicas on same host.
                // better to choose high load backend
                Tag tag = chooseProperTag(tabletCtx, false);
                LoadStatisticForTag statistic = statisticMap.get(tag);
                if (statistic == null) {
                    return false;
                }
                return deleteFromHighLoadBackend(tabletCtx, replicas, force, statistic);
            }
        }

        return false;
    }

    private boolean deleteReplicaNotInValidTag(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        Tablet tablet = tabletCtx.getTablet();
        List<Replica> replicas = tablet.getReplicas();
        Map<Tag, Short> allocMap = tabletCtx.getReplicaAlloc().getAllocMap();
        for (Replica replica : replicas) {
            Backend be = infoService.getBackend(replica.getBackendIdWithoutException());
            if (be.isMixNode() && !allocMap.containsKey(be.getLocationTag())) {
                deleteReplicaInternal(tabletCtx, replica, "not in valid tag", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteReplicaChosenByRebalancer(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        Long id = rebalancer.getToDeleteReplicaId(tabletCtx);
        if (id == -1L) {
            return false;
        }
        Replica chosenReplica = tabletCtx.getTablet().getReplicaById(id);
        if (chosenReplica == null) {
            return false;
        }

        deleteReplicaInternal(tabletCtx, chosenReplica, "src replica of rebalance", force);
        rebalancer.invalidateToDeleteReplicaId(tabletCtx);

        return true;
    }

    private boolean deleteReplicaOnUrgentHighDisk(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        Tag tag = chooseProperTag(tabletCtx, false);
        LoadStatisticForTag statistic = statisticMap.get(tag);
        if (statistic == null) {
            return false;
        }

        Replica chosenReplica = null;
        double maxUsages = -1;
        for (Replica replica : tabletCtx.getReplicas()) {
            BackendLoadStatistic beStatistic = statistic
                    .getBackendLoadStatistic(replica.getBackendIdWithoutException());
            if (beStatistic == null) {
                continue;
            }
            RootPathLoadStatistic path = beStatistic.getPathStatisticByPathHash(replica.getPathHash());
            if (path != null && path.isGlobalHighUsage() && path.getUsedPercent() > maxUsages) {
                maxUsages = path.getUsedPercent();
                chosenReplica = replica;
            }
        }

        if (chosenReplica != null) {
            deleteReplicaInternal(tabletCtx, chosenReplica, "high usage disk", force);
            return true;
        }
        return false;
    }

    private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        Tag tag = chooseProperTag(tabletCtx, false);
        LoadStatisticForTag statistic = statisticMap.get(tag);
        if (statistic == null) {
            return false;
        }

        return deleteFromHighLoadBackend(tabletCtx, tabletCtx.getReplicas(), force, statistic);
    }

    private boolean deleteFromScaleInDropReplicas(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
        // Check if there are any scale drop replicas
        for (Replica replica : tabletCtx.getReplicas()) {
            if (replica.isScaleInDrop()) {
                deleteReplicaInternal(tabletCtx, replica, "scale drop replica", force);
                return true;
            }
        }
        return false;
    }

    private boolean deleteFromHighLoadBackend(TabletSchedCtx tabletCtx, List<Replica> replicas,
            boolean force, LoadStatisticForTag statistic) throws SchedException {
        Replica chosenReplica = null;
        double maxScore = 0;
        long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
        for (Replica replica : replicas) {
            long beId = replica.getBackendIdWithoutException();
            BackendLoadStatistic beStatistic = statistic
                    .getBackendLoadStatistic(beId);
            if (beStatistic == null) {
                continue;
            }

            /*
             * If the backend does not have the specified storage medium, we use mix load score to make
             * sure that at least one replica can be chosen.
             * This can happen if the Doris cluster is deployed with all, for example, SSD medium,
             * but create all tables with HDD storage medium property. Then getLoadScore(SSD) will
             * always return 0.0, so that no replica will be chosen.
             */
            double loadScore = 0.0;
            if (beStatistic.hasMedium(tabletCtx.getStorageMedium())) {
                loadScore = beStatistic.getLoadScore(tabletCtx.getStorageMedium());
            } else {
                loadScore = beStatistic.getMixLoadScore();
            }

            if (loadScore > maxScore) {
                maxScore = loadScore;
                chosenReplica = replica;
            }

            if (debugHighBeId > 0 && beId == debugHighBeId) {
                chosenReplica = replica;
                break;
            }
        }

        if (chosenReplica != null) {
            deleteReplicaInternal(tabletCtx, chosenReplica, "high load backend", force);
            return true;
        }
        return false;
    }

    /**
     * Just delete replica which does not locate in colocate backends set.
     * return true if delete one replica, otherwise, return false.
     */
    private boolean handleColocateRedundant(TabletSchedCtx tabletCtx) throws SchedException {
        Preconditions.checkNotNull(tabletCtx.getColocateBackendsSet());
        for (Replica replica : tabletCtx.getReplicas()) {
            if (tabletCtx.getColocateBackendsSet().contains(replica.getBackendIdWithoutException())
                    && !replica.isBad()) {
                continue;
            }

            // If the replica is not in ColocateBackendsSet or is bad, delete it.
            deleteReplicaInternal(tabletCtx, replica, "colocate redundant", false);
            throw new SchedException(Status.FINISHED, "colocate redundant replica is deleted");
        }
        throw new SchedException(Status.UNRECOVERABLE, "unable to delete any colocate redundant replicas");
    }

    /**
     * remove the replica which has the most version count, and much more than others
     * return true if delete one replica, otherwise, return false.
     */
    private void handleReplicaTooSlow(TabletSchedCtx tabletCtx) throws SchedException {
        Replica chosenReplica = null;
        long maxVersionCount = -1;
        int normalReplicaCount = 0;
        for (Replica replica : tabletCtx.getReplicas()) {
            if (replica.isAlive() && !replica.tooSlow()) {
                normalReplicaCount++;
            }
            if (replica.getVisibleVersionCount() > maxVersionCount) {
                maxVersionCount = replica.getVisibleVersionCount();
                chosenReplica = replica;
            }
        }
        if (chosenReplica != null && chosenReplica.isAlive() && !chosenReplica.tooSlow()
                && chosenReplica.tooBigVersionCount()
                && normalReplicaCount - 1 >= tabletCtx.getReplicas().size() / 2 + 1) {
            chosenReplica.setState(ReplicaState.COMPACTION_TOO_SLOW);
            LOG.info("set replica id :{} tablet id: {}, backend id: {} to COMPACTION_TOO_SLOW",
                    chosenReplica.getId(), tabletCtx.getTablet().getId(), chosenReplica.getBackendIdWithoutException());
            throw new SchedException(Status.FINISHED, "set replica to COMPACTION_TOO_SLOW");
        }
        throw new SchedException(Status.FINISHED, "No replica set to COMPACTION_TOO_SLOW");
    }

    private void deleteReplicaInternal(TabletSchedCtx tabletCtx,
            Replica replica, String reason, boolean force) throws SchedException {

        List<Replica> replicas = tabletCtx.getTablet().getReplicas();
        boolean otherCatchup = replicas.stream().anyMatch(
                r -> r != replica
                && (r.getVersion() > replica.getVersion()
                        || (r.getVersion() == replica.getVersion() && r.getLastFailedVersion() < 0)));
        if (!otherCatchup) {
            LOG.info("can not delete only one replica, tabletId = {} replicaId = {}", tabletCtx.getTabletId(),
                     replica.getId());
            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                    "the only one latest replia can not be dropped, tabletId = "
                    + tabletCtx.getTabletId() + ", replicaId = " + replica.getId());
        }

        /*
         * Before deleting a replica, we should make sure that
         * there is no running txn on it and no more txns will be on it.
         * So we do followings:
         * 1. If replica is loadable, set a watermark txn id on it and set it state as DECOMMISSION,
         *      but not deleting it this time.
         *      The DECOMMISSION state will ensure that no more txns will be on this replicas.
         * 2. Wait for any txns before the watermark txn id to be finished.
         *      If all are finished, which means this replica is
         *      safe to be deleted.
         */
        long beId = replica.getBackendIdWithoutException();
        if (!force && !Config.enable_force_drop_redundant_replica
                && !FeConstants.runningUnitTest
                && (replica.getState().canLoad() || replica.getState() == ReplicaState.DECOMMISSION)) {
            if (replica.getState() != ReplicaState.DECOMMISSION) {
                replica.setState(ReplicaState.DECOMMISSION);
                // set priority to normal because it may wait for a long time.
                // Remain it as VERY_HIGH may block other task.
                tabletCtx.setPriority(Priority.NORMAL);
                LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}",
                        replica.getId(), beId, tabletCtx.getTabletId(), reason);
            }
            try {
                long preWatermarkTxnId = replica.getPreWatermarkTxnId();
                if (preWatermarkTxnId == -1) {
                    preWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
                            .getTransactionIDGenerator().getNextTransactionId();
                    replica.setPreWatermarkTxnId(preWatermarkTxnId);
                    LOG.info("set decommission replica {} on backend {} of tablet {} pre watermark txn id {}",
                            replica.getId(), beId, tabletCtx.getTabletId(), preWatermarkTxnId);
                }

                long postWatermarkTxnId = replica.getPostWatermarkTxnId();
                if (postWatermarkTxnId == -1) {
                    if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(preWatermarkTxnId,
                            tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) {
                        throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
                                "wait txn before pre watermark txn " + preWatermarkTxnId + " to be finished");
                    }
                    postWatermarkTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();

                    replica.setPostWatermarkTxnId(postWatermarkTxnId);
                    LOG.info("set decommission replica {} on backend {} of tablet {} post watermark txn id {}",
                            replica.getId(), beId, tabletCtx.getTabletId(), postWatermarkTxnId);
                }

                if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId,
                        tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) {
                    throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
                            "wait txn before post watermark txn  " + postWatermarkTxnId + " to be finished");
                }
            } catch (SchedException e) {
                throw e;
            } catch (Exception e) {
                throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
            }
        }

        // delete this replica from catalog.
        // it will also delete replica from tablet inverted index.
        tabletCtx.deleteReplica(replica);

        if (force || FeConstants.runningUnitTest) {
            // send the delete replica task.
            // also, this may not be necessary, but delete it will make things simpler.
            // NOTICE: only delete the replica from meta may not work. sometimes we can depend on tablet report
            // deleting these replicas, but in FORCE_REDUNDANT case, replica may be added to meta again in report
            // process.
            sendDeleteReplicaTask(beId, tabletCtx.getTabletId(), replica.getId(),
                    tabletCtx.getSchemaHash());
        }

        // write edit log
        ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(tabletCtx.getDbId(),
                tabletCtx.getTblId(),
                tabletCtx.getPartitionId(),
                tabletCtx.getIndexId(),
                tabletCtx.getTabletId(),
                beId);

        Env.getCurrentEnv().getEditLog().logDeleteReplica(info);

        LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}",
                tabletCtx.getTabletId(), beId, reason, force);
    }

    private void sendDeleteReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) {
        DropReplicaTask task = new DropReplicaTask(backendId, tabletId, replicaId, schemaHash, false);
        AgentBatchTask batchTask = new AgentBatchTask();
        batchTask.addTask(task);
        AgentTaskExecutor.submit(batchTask);
        LOG.info("send delete replica task for tablet {} in backend {}", tabletId, backendId);
    }

    /**
     * Missing for tag, which means some of replicas of this tablet are allocated in wrong backend with specified tag.
     * Treat it as replica missing, and in handleReplicaMissing(),
     * it will find a property backend to create new replica.
     */
    private void handleReplicaMissingForTag(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
            throws SchedException {
        stat.counterReplicaMissingForTagErr.incrementAndGet();
        handleReplicaMissing(tabletCtx, batchTask);
    }

    /**
     * Replicas of colocate table's tablet does not locate on right backends set.
     *      backends set:       1,2,3
     *      tablet replicas:    1,2,5
     *
     *      backends set:       1,2,3
     *      tablet replicas:    1,2
     *
     *      backends set:       1,2,3
     *      tablet replicas:    1,2,4,5
     */
    private void handleColocateMismatch(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
        Preconditions.checkNotNull(tabletCtx.getColocateBackendsSet());

        stat.counterReplicaColocateMismatch.incrementAndGet();
        // find an available dest backend and path
        RootPathLoadStatistic destPath = chooseAvailableDestPath(tabletCtx, null, true /* for colocate */);
        Preconditions.checkNotNull(destPath);
        tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash());

        // choose a source replica for cloning from
        tabletCtx.chooseSrcReplica(backendsWorkingSlots, -1);

        // create clone task
        batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
        incrDestPathCopingSize(tabletCtx);
    }

    /**
     * Try to select some alternative tablets for balance. Add them to pendingTablets with priority LOW,
     * and waiting to be scheduled.
     */
    private void selectTabletsForBalance() {
        if (Config.disable_balance || Config.disable_tablet_scheduler) {
            LOG.info("balance or tablet scheduler is disabled. skip selecting tablets for balance");
            return;
        }

        // TODO: too ugly, remove balance_be_then_disk later.
        if (Config.balance_be_then_disk) {
            boolean hasBeBalance = selectTabletsForBeBalance();
            selectTabletsForDiskBalance(hasBeBalance);
        } else {
            selectTabletsForDiskBalance(false);
            selectTabletsForBeBalance();
        }
    }

    private boolean selectTabletsForBeBalance() {
        int limit = getBalanceSchedQuotoLeft();
        if (limit <= 0) {
            return false;
        }

        int addNum = 0;
        List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
        Collections.shuffle(alternativeTablets);
        for (TabletSchedCtx tabletCtx : alternativeTablets) {
            if (addNum >= limit) {
                break;
            }
            if (addTablet(tabletCtx, false) == AddResult.ADDED) {
                addNum++;
            } else {
                rebalancer.onTabletFailed(tabletCtx);
            }
        }
        return addNum > 0;
    }

    private void selectTabletsForDiskBalance(boolean hasBeBalance) {
        if (Config.disable_disk_balance) {
            LOG.info("disk balance is disabled. skip selecting tablets for disk balance");
            return;
        }

        int limit = getBalanceSchedQuotoLeft();
        if (limit <= 0) {
            return;
        }

        int addNum = 0;
        for (TabletSchedCtx tabletCtx : diskRebalancer.selectAlternativeTablets()) {
            if (addNum >= limit) {
                break;
            }
            // add if task from prio backend or cluster is balanced
            if (!hasBeBalance || Config.be_rebalancer_idle_seconds <= 0
                    || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) {
                if (addTablet(tabletCtx, false) == AddResult.ADDED) {
                    addNum++;
                }
            }
        }
    }

    private int getBalanceSchedQuotoLeft() {
        // No need to prefetch too many balance task to pending queue.
        // Because for every sched, it will re select the balance task.
        return Math.min(Config.schedule_batch_size - getPendingNum(),
                Config.max_balancing_tablets - getBalanceTabletsNumber());
    }

    /**
     * Try to create a balance task for a tablet.
     */
    private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
        stat.counterBalanceSchedule.incrementAndGet();
        AgentTask task = null;
        if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) {
            task = diskRebalancer.createBalanceTask(tabletCtx);
            checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
            checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
        } else if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
            task = rebalancer.createBalanceTask(tabletCtx);
        } else {
            throw new SchedException(Status.UNRECOVERABLE,
                "unknown balance type: " + tabletCtx.getBalanceType().toString());
        }
        batchTask.addTask(task);
        incrDestPathCopingSize(tabletCtx);
    }

    // choose a path on a backend which is fit for the tablet
    // if forColocate is false, the tag must be set.
    private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, Tag tag, boolean forColocate)
            throws SchedException {
        boolean noPathForNewReplica = false;
        try {
            return doChooseAvailableDestPath(tabletCtx, tag, forColocate);
        } catch (SchedException e) {
            if (e.getStatus() == Status.UNRECOVERABLE) {
                noPathForNewReplica = true;
            }
            throw e;
        } finally {
            Tablet tablet = tabletCtx.getTablet();
            if (tablet != null) {
                tablet.setLastTimeNoPathForNewReplica(noPathForNewReplica ? System.currentTimeMillis() : -1L);
            }
        }
    }

    private RootPathLoadStatistic doChooseAvailableDestPath(TabletSchedCtx tabletCtx, Tag tag, boolean forColocate)
            throws SchedException {
        List<BackendLoadStatistic> beStatistics;
        if (tag != null) {
            Preconditions.checkState(!forColocate);
            LoadStatisticForTag statistic = statisticMap.get(tag);
            if (statistic == null) {
                throw new SchedException(Status.UNRECOVERABLE,
                        String.format("tag %s does not exist. available tags: %s", tag,
                            Joiner.on(",").join(statisticMap.keySet().stream().limit(5).toArray())));
            }
            beStatistics = statistic.getSortedBeLoadStats(null /* sorted ignore medium */);
        } else {
            // for colocate task, get BackendLoadStatistic by colocateBackendIds
            Preconditions.checkState(forColocate);
            Preconditions.checkState(tabletCtx.getColocateBackendsSet() != null);
            Set<Long> colocateBackendIds = tabletCtx.getColocateBackendsSet();

            beStatistics = Lists.newArrayList();
            for (LoadStatisticForTag loadStatisticForTag : statisticMap.values()) {
                for (long beId : colocateBackendIds) {
                    BackendLoadStatistic backendLoadStatistic = loadStatisticForTag.getBackendLoadStatistic(beId);
                    if (backendLoadStatistic != null) {
                        beStatistics.add(backendLoadStatistic);
                    }
                }
            }
        }

        // get all available paths which this tablet can fit in.
        // beStatistics is sorted by mix load score in ascend order, so select from first to last.
        List<BePathLoadStatPair> allFitPathsSameMedium = Lists.newArrayList();
        List<BePathLoadStatPair> allFitPathsDiffMedium = Lists.newArrayList();
        for (BackendLoadStatistic bes : beStatistics) {
            if (!bes.isAvailable()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {} is not available, skip. tablet: {}", bes.getBeId(), tabletCtx.getTabletId());
                }
                continue;
            }

            // exclude BE which already has replica of this tablet or another BE at same host has this replica
            if (tabletCtx.filterDestBE(bes.getBeId())) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {} already has replica of this tablet or another BE "
                                    + "at same host has this replica, skip. tablet: {}",
                            bes.getBeId(), tabletCtx.getTabletId());
                }
                continue;
            }

            // If this for colocate table, only choose backend in colocate backend set.
            // Else, check the tag.
            if (forColocate) {
                if (!tabletCtx.getColocateBackendsSet().contains(bes.getBeId())) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("backend {} is not in colocate backend set, skip. tablet: {}",
                                bes.getBeId(), tabletCtx.getTabletId());
                    }
                    continue;
                }
            } else if (!bes.getTag().equals(tag)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {}'s tag {} is not equal to tablet's tag {}, skip. tablet: {}",
                            bes.getBeId(), bes.getTag(), tag, tabletCtx.getTabletId());
                }
                continue;
            }

            List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
            BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(),
                    resultPaths, false);
            if (st.ok()) {
                resultPaths.stream().forEach(path -> allFitPathsSameMedium.add(new BePathLoadStatPair(bes, path)));
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {} unable to find path for tablet: {}. {}", bes.getBeId(), tabletCtx, st);
                }
                resultPaths.clear();
                st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), resultPaths, true);
                if (st.ok()) {
                    resultPaths.stream().forEach(path -> allFitPathsDiffMedium.add(new BePathLoadStatPair(bes, path)));
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("backend {} unable to find path for supplementing tablet: {}. {}",
                                bes.getBeId(), tabletCtx, st);
                    }
                }
            }
        }

        // all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
        // just get first available path.
        // we try to find a path with specified media type, if not find, arbitrarily use one.
        List<BePathLoadStatPair> allFitPaths =
                !allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium : allFitPathsDiffMedium;
        if (allFitPaths.isEmpty()) {
            List<String> backendsInfo = Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values().stream()
                    .filter(be -> be.getLocationTag().equals(tag))
                    .map(Backend::getDetailsForCreateReplica)
                    .collect(Collectors.toList());
            throw new SchedException(Status.UNRECOVERABLE, String.format("unable to find dest path for new replica"
                    + " for replica allocation { %s } with tag %s storage medium %s, backends on this tag is: %s",
                    tabletCtx.getReplicaAlloc(), tag, tabletCtx.getStorageMedium(), backendsInfo));
        }

        BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(allFitPaths);
        Collections.sort(allFitPaths, comparator);

        for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
            RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic();
            if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {}'s path {}'s storage medium {} "
                            + "is not equal to tablet's storage medium {}, skip. tablet: {}",
                            rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(),
                            rootPathLoadStatistic.getStorageMedium(), tabletCtx.getStorageMedium(),
                            tabletCtx.getTabletId());
                }
                continue;
            }

            PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
            if (slot == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {}'s path {}'s slot is null, skip. tablet: {}",
                            rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(),
                            tabletCtx.getTabletId());
                }
                continue;
            }

            long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
            if (pathHash == -1) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {}'s path {}'s slot is full, skip. tablet: {}",
                            rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(),
                            tabletCtx.getTabletId());
                }
                continue;
            }
            return rootPathLoadStatistic;
        }

        boolean hasBePath = false;

        // no root path with specified media type is found, get arbitrary one.
        for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
            RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic();
            PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
            if (slot == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {}'s path {}'s slot is null, skip. tablet: {}",
                            rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(),
                            tabletCtx.getTabletId());
                }
                continue;
            }

            hasBePath = true;
            long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
            if (pathHash == -1) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("backend {}'s path {}'s slot is full, skip. tablet: {}",
                            rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(),
                            tabletCtx.getTabletId());
                }
                continue;
            }
            return rootPathLoadStatistic;
        }

        if (hasBePath) {
            throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
                    "scheduler waiting for dest backend slot");
        } else {
            throw new SchedException(Status.UNRECOVERABLE,
                    "unable to find dest path which can be fit in");
        }
    }

    private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
        Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING);
        addTablet(tabletCtx, true /* force */);
    }

    private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
        if (state == TabletSchedCtx.State.CANCELLED || state == TabletSchedCtx.State.UNEXPECTED) {
            if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE
                    && tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
                rebalancer.onTabletFailed(tabletCtx);
            }
        }

        // use 2 steps to avoid nested database lock and synchronized.(releaseTabletCtx() may hold db lock)
        // remove the tablet ctx, so that no other process can see it
        removeTabletCtx(tabletCtx, reason);
        // release resources taken by tablet ctx
        releaseTabletCtx(tabletCtx, state, status == Status.UNRECOVERABLE);

        // if check immediately, then no need to wait TabletChecker's 20s
        if (state == TabletSchedCtx.State.FINISHED && !Config.disable_tablet_scheduler) {
            tryAddAfterFinished(tabletCtx);
        }
    }

    private void tryAddAfterFinished(TabletSchedCtx tabletCtx) {
        int finishedCounter = tabletCtx.getFinishedCounter();
        finishedCounter++;
        tabletCtx.setFinishedCounter(finishedCounter);
        if (finishedCounter >= TabletSchedCtx.FINISHED_COUNTER_THRESHOLD) {
            return;
        }

        Database db = Env.getCurrentInternalCatalog().getDbNullable(tabletCtx.getDbId());
        if (db == null) {
            return;
        }
        OlapTable tbl = (OlapTable) db.getTableNullable(tabletCtx.getTblId());
        if (tbl == null) {
            return;
        }
        tbl.readLock();
        try {
            Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
            if (partition == null) {
                return;
            }

            MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId());
            if (idx == null) {
                return;
            }

            Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
            if (tablet == null) {
                return;
            }

            tryAddRepairTablet(tablet, tabletCtx.getDbId(), tbl, partition, idx, finishedCounter);
        } finally {
            tbl.readUnlock();
        }
    }

    public void tryAddRepairTablet(Tablet tablet, long dbId, OlapTable table, Partition partition,
            MaterializedIndex idx, int finishedCounter) {
        if (Config.disable_tablet_scheduler) {
            return;
        }

        TabletHealth tabletHealth;
        ReplicaAllocation replicaAlloc;
        Set<Long> colocateBackendIds = null;
        boolean isColocateTable = colocateTableIndex.isColocateTable(table.getId());
        if (isColocateTable) {
            GroupId groupId = colocateTableIndex.getGroup(table.getId());
            if (groupId == null) {
                return;
            }
            ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
            if (groupSchema == null) {
                return;
            }

            replicaAlloc = groupSchema.getReplicaAlloc();
            int tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
            if (tabletOrderIdx == -1) {
                LOG.warn("Unknow colocate tablet order idx: group {}, table {}, partition {}, index {}, tablet {}",
                        groupId, table.getId(), partition.getId(), idx.getId(), tablet.getId());
                return;
            }

            colocateBackendIds = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
            tabletHealth = tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc, colocateBackendIds);
            tabletHealth.priority = Priority.HIGH;
        } else {
            replicaAlloc = table.getPartitionInfo().getReplicaAllocation(partition.getId());
            List<Long> aliveBeIds = infoService.getAllBackendIds(true);
            tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
        }

        if (tabletHealth.status == TabletStatus.HEALTHY || tabletHealth.status == TabletStatus.UNRECOVERABLE) {
            return;
        }

        // first time found this tablet is unhealthy
        if (finishedCounter == 0) {
            if (!tablet.readyToBeRepaired(Env.getCurrentSystemInfo(), tabletHealth.priority)) {
                return;
            }
        } else {
            if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR) {
                // replica is just waiting for finishing txns before furtherRepairWatermarkTxnTd,
                // no need to re add it immediately, can wait a little
                Replica replica = tablet.getReplicaById(tabletHealth.needFurtherRepairReplicaId);
                if (replica != null && replica.getVersion() >= partition.getVisibleVersion()
                        && replica.getLastFailedVersion() < 0) {
                    return;
                }
            }
        }

        TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, dbId, table.getId(),
                partition.getId(), idx.getId(), tablet.getId(), replicaAlloc, System.currentTimeMillis());

        tabletCtx.setTabletHealth(tabletHealth);
        tabletCtx.setFinishedCounter(finishedCounter);
        tabletCtx.setColocateGroupBackendIds(colocateBackendIds);
        tabletCtx.setIsUniqKeyMergeOnWrite(table.isUniqKeyMergeOnWrite());

        addTablet(tabletCtx, false);
    }

    private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, boolean resetReplicaState) {
        tabletCtx.setState(state);
        tabletCtx.releaseResource(this);
        if (resetReplicaState) {
            tabletCtx.resetReplicaState();
        }
        tabletCtx.setFinishedTime(System.currentTimeMillis());
    }

    private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reason) {
        runningTablets.remove(tabletCtx.getTabletId());
        allTabletTypes.remove(tabletCtx.getTabletId());
        schedHistory.add(tabletCtx);
        LOG.info("remove the tablet {}. because: {}", tabletCtx, reason);
    }

    // get next batch of tablets from queue.
    private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
        List<TabletSchedCtx> list = Lists.newArrayList();
        int slotNum = getCurrentAvailableSlotNum();
        // Make slotNum >= 1 to ensure that it could return at least 1 ctx
        // when the pending list is not empty.
        if (slotNum < 1) {
            slotNum = 1;
        }
        while (list.size() < Config.schedule_batch_size && slotNum > 0) {
            TabletSchedCtx tablet = pendingTablets.pollFirst();
            if (tablet == null) {
                // no more tablets
                break;
            }
            list.add(tablet);
            TabletStatus status = tablet.getTabletStatus();
            // for a clone, it will take 2 slots: src slot and dst slot.
            if (!(status == TabletStatus.REDUNDANT
                    || status == TabletStatus.FORCE_REDUNDANT
                    || status == TabletStatus.COLOCATE_REDUNDANT
                    || status == TabletStatus.REPLICA_COMPACTION_TOO_SLOW)) {
                slotNum -= 2;
            }
        }
        return list;
    }

    private int getCurrentAvailableSlotNum() {
        int total = 0;
        for (PathSlot pathSlot : backendsWorkingSlots.values()) {
            total += pathSlot.getTotalAvailSlotNum();
        }
        return total;
    }

    public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask migrationTask,
                        TFinishTaskRequest request) {
        long tabletId = migrationTask.getTabletId();
        TabletSchedCtx tabletCtx = takeRunningTablets(tabletId);
        if (tabletCtx == null) {
            // tablet does not exist, the task may be created by ReportHandler.tabletReport(ssd => hdd)
            LOG.warn("tablet info does not exist: {}", tabletId);
            return true;
        }
        if (tabletCtx.getBalanceType() != TabletSchedCtx.BalanceType.DISK_BALANCE) {
            // this should not happen
            LOG.warn("task type is not as excepted. tablet {}", tabletId);
            return true;
        }
        if (request.getTaskStatus().getStatusCode() == TStatusCode.OK) {
            // if we have a success task, then stat must be refreshed before schedule a new task
            updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
            updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
            updateDestPathHash(tabletCtx);
            finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
        } else {
            String errMsg = request.getTaskStatus().getErrorMsgs().get(0);
            tabletCtx.setErrMsg(errMsg);
            finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, errMsg);
        }

        return true;
    }

    /**
     * return true if we want to remove the clone task from AgentTaskQueue
     */
    public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) {
        long tabletId = cloneTask.getTabletId();
        TabletSchedCtx tabletCtx = takeRunningTablets(tabletId);
        if (tabletCtx == null) {
            LOG.warn("tablet info does not exist: {}", tabletId);
            // tablet does not exist, no need to keep task.
            return true;
        }
        if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) {
            // this should not happen
            LOG.warn("task type is not as excepted. tablet {}", tabletId);
            return true;
        }

        Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState());
        try {
            tabletCtx.finishCloneTask(cloneTask, request);
        } catch (SchedException e) {
            tabletCtx.setErrMsg(e.getMessage());
            if (e.getStatus() == Status.RUNNING_FAILED) {
                tabletCtx.increaseFailedRunningCounter();
                if (!tabletCtx.isExceedFailedRunningLimit()) {
                    stat.counterCloneTaskFailed.incrementAndGet();
                    tabletCtx.setState(TabletSchedCtx.State.PENDING);
                    tabletCtx.releaseResource(this);
                    tabletCtx.resetFailedSchedCounter();
                    addBackToPendingTablets(tabletCtx);
                    return false;
                } else {
                    // unrecoverable
                    stat.counterTabletScheduledDiscard.incrementAndGet();
                    finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
                            e.getMessage());
                    return true;
                }
            } else if (e.getStatus() == Status.UNRECOVERABLE) {
                // unrecoverable
                stat.counterTabletScheduledDiscard.incrementAndGet();
                finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
                return true;
            } else if (e.getStatus() == Status.FINISHED) {
                // tablet is already healthy, just remove
                finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
                return true;
            }
        } catch (Exception e) {
            LOG.warn("got unexpected exception when finish clone task. tablet: {}",
                    tabletCtx.getTabletId(), e);
            stat.counterTabletScheduledDiscard.incrementAndGet();
            finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, Status.UNRECOVERABLE, e.getMessage());
            return true;
        }

        Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.FINISHED);
        stat.counterCloneTaskSucceeded.incrementAndGet();
        gatherStatistics(tabletCtx);
        finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
        return true;
    }

    /**
     * Gather the running statistic of the task.
     * It will be evaluated for future strategy.
     * This should only be called when the tablet is down with state FINISHED.
     */
    private void gatherStatistics(TabletSchedCtx tabletCtx) {
        if (tabletCtx.getCopySize() > 0 && tabletCtx.getCopyTimeMs() > 0) {
            if (tabletCtx.getSrcBackendId() != -1 && tabletCtx.getSrcPathHash() != -1) {
                PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getSrcBackendId());
                if (pathSlot != null) {
                    pathSlot.updateStatistic(tabletCtx.getSrcPathHash(), tabletCtx.getCopySize(),
                            tabletCtx.getCopyTimeMs());
                }
            }

            if (tabletCtx.getDestBackendId() != -1 && tabletCtx.getDestPathHash() != -1) {
                PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getDestBackendId());
                if (pathSlot != null) {
                    pathSlot.updateStatistic(tabletCtx.getDestPathHash(), tabletCtx.getCopySize(),
                            tabletCtx.getCopyTimeMs());
                }
            }
        }

        if (System.currentTimeMillis() - lastSlotAdjustTime < STAT_UPDATE_INTERVAL_MS) {
            return;
        }

        // TODO(cmy): update the slot num base on statistic.
        // need to find a better way to determine the slot number.

        lastSlotAdjustTime = System.currentTimeMillis();
    }

    /**
     * handle tablets which are running.
     * We should finished the task if
     * 1. Tablet is already healthy
     * 2. Task is timeout.
     *
     * But here we just handle the timeout case here. Let the 'finishCloneTask()' check if tablet is healthy.
     * We guarantee that if tablet is in runningTablets, the 'finishCloneTask()' will finally be called,
     * so no need to worry that running tablets will never end.
     * This is also avoid nesting 'synchronized' and database lock.
     *
     * If task is timeout, remove the tablet.
     */
    public void handleRunningTablets() {
        Set<Long> aliveBeIds = Sets.newHashSet(Env.getCurrentSystemInfo().getAllBackendIds(true));
        // 1. remove the tablet ctx if timeout
        List<TabletSchedCtx> cancelTablets = Lists.newArrayList();
        synchronized (this) {
            for (TabletSchedCtx tabletCtx : runningTablets.values()) {
                long srcBeId = tabletCtx.getSrcBackendId();
                long destBeId = tabletCtx.getDestBackendId();
                if (Config.disable_tablet_scheduler) {
                    tabletCtx.setErrMsg("tablet scheduler is disabled");
                    cancelTablets.add(tabletCtx);
                } else if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) {
                    tabletCtx.setErrMsg("balance is disabled");
                    cancelTablets.add(tabletCtx);
                } else if (tabletCtx.isTimeout()) {
                    tabletCtx.setErrMsg("timeout");
                    cancelTablets.add(tabletCtx);
                    stat.counterCloneTaskTimeout.incrementAndGet();
                } else if (destBeId > 0 && !aliveBeIds.contains(destBeId)) {
                    tabletCtx.setErrMsg("dest be " + destBeId + " is dead");
                    cancelTablets.add(tabletCtx);
                } else if (srcBeId > 0 && !aliveBeIds.contains(srcBeId)) {
                    tabletCtx.setErrMsg("src be " + srcBeId + " is dead");
                    cancelTablets.add(tabletCtx);
                }
            }
        }

        // 2. release ctx
        cancelTablets.forEach(t -> {
            // Set "resetReplicaState" to true because
            // task should also be considered as UNRECOVERABLE,
            // so need to reset replica state.
            finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, t.getErrMsg());
        });
    }

    // only use for fe ut
    public MinMaxPriorityQueue<TabletSchedCtx> getPendingTabletQueue() {
        return pendingTablets;
    }

    public List<List<String>> getPendingTabletsInfo(int limit) {
        return collectTabletCtx(getPendingTablets(limit));
    }

    public List<TabletSchedCtx> getPendingTablets(int limit) {
        return getCopiedTablets(pendingTablets, limit);
    }

    public List<List<String>> getRunningTabletsInfo(int limit) {
        return collectTabletCtx(getRunningTablets(limit));
    }

    public List<TabletSchedCtx> getRunningTablets(int limit) {
        return getCopiedTablets(runningTablets.values(), limit);
    }

    public List<List<String>> getHistoryTabletsInfo(int limit) {
        return collectTabletCtx(getHistoryTablets(limit));
    }

    public List<TabletSchedCtx> getHistoryTablets(int limit) {
        return getCopiedTablets(schedHistory, limit);
    }

    private List<List<String>> collectTabletCtx(List<TabletSchedCtx> tabletCtxs) {
        List<List<String>> result = Lists.newArrayList();
        tabletCtxs.forEach(t -> {
            result.add(t.getBrief());
        });
        return result;
    }

    private synchronized List<TabletSchedCtx> getCopiedTablets(Collection<TabletSchedCtx> source, int limit) {
        List<TabletSchedCtx> tabletCtxs = Lists.newArrayList();
        source.stream().limit(limit).forEach(t -> {
            tabletCtxs.add(t);
        });
        return tabletCtxs;
    }

    public synchronized int getPendingNum() {
        return pendingTablets.size();
    }

    public synchronized int getRunningNum() {
        return runningTablets.size();
    }

    public synchronized int getHistoryNum() {
        return schedHistory.size();
    }

    public synchronized int getTotalNum() {
        return allTabletTypes.size();
    }

    public synchronized int getBalanceTabletsNumber() {
        return (int) (pendingTablets.stream().filter(t -> t.getType() == Type.BALANCE).count()
                + runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count());
    }

    private synchronized Map<Long, Long> getPathsCopingSize() {
        Map<Long, Long> pathsCopingSize = Maps.newHashMap();
        for (TabletSchedCtx tablet : runningTablets.values()) {
            long pathHash = tablet.getDestPathHash();
            if (pathHash == 0 || pathHash == -1) {
                continue;
            }

            long copingSize = tablet.getDestEstimatedCopingSize();
            if (copingSize > 0) {
                Long size = pathsCopingSize.getOrDefault(pathHash, 0L);
                pathsCopingSize.put(pathHash, size + copingSize);
            }
        }
        return pathsCopingSize;
    }

    private void incrDestPathCopingSize(TabletSchedCtx tablet) {
        long destPathHash = tablet.getDestPathHash();
        if (destPathHash == -1 || destPathHash == 0) {
            return;
        }

        for (LoadStatisticForTag loadStatistic : statisticMap.values()) {
            BackendLoadStatistic beLoadStatistic = loadStatistic.getBackendLoadStatistics().stream()
                    .filter(v -> v.getBeId() == tablet.getDestBackendId()).findFirst().orElse(null);
            if (beLoadStatistic != null) {
                beLoadStatistic.incrPathCopingSize(destPathHash, tablet.getDestEstimatedCopingSize());
                break;
            }
        }
    }

    /**
     * PathSlot keeps track of slot num per path of a Backend.
     * Each path on a Backend has several slot.
     * If a path's available slot num become 0, no task should be assigned to this path.
     */
    public static class PathSlot {
        // path hash -> slot num
        private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
        private long beId;
        // only use in takeAnAvailBalanceSlotFrom, make pick RR
        private Table<Tag, TStorageMedium, Long> lastPickPathHashs = HashBasedTable.create();

        public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
            this.beId = beId;
            for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
                pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
            }
        }

        // update the path
        public synchronized void updatePaths(Map<Long, TStorageMedium> paths) {
            // delete non exist path
            pathSlots.entrySet().removeIf(entry -> !paths.containsKey(entry.getKey()));

            // add new path
            for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
                if (!pathSlots.containsKey(entry.getKey())) {
                    pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
                }
            }
        }

        /**
         * Update the statistic of specified path
         */
        public synchronized void updateStatistic(long pathHash, long copySize, long copyTimeMs) {
            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return;
            }
            slot.totalCopySize += copySize;
            slot.totalCopyTimeMs += copyTimeMs;
        }

        public synchronized boolean hasAvailableSlot(long pathHash) {
            if (pathHash == -1) {
                return false;
            }

            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return false;
            }
            if (slot.getAvailable() == 0) {
                return false;
            }
            return true;
        }

        public synchronized boolean hasAvailableBalanceSlot(long pathHash) {
            if (pathHash == -1) {
                return false;
            }
            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return false;
            }
            if (slot.getAvailableBalance() == 0) {
                return false;
            }
            return true;
        }

        /**
         * If the specified 'pathHash' has available slot, decrease the slot number and return this path hash
         */
        public synchronized long takeSlot(long pathHash) throws SchedException {
            if (pathHash == -1) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("path hash is not set.", new Exception());
                }
                throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
                        "backend " + beId + " path hash is not set");
            }

            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("path {} is not exist", pathHash);
                }
                return -1;
            }
            if (slot.used >= slot.getTotal()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("path {} has no available slot", pathHash);
                }
                return -1;
            }
            slot.used++;
            return pathHash;
        }

        public synchronized void freeSlot(long pathHash) {
            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return;
            }
            if (slot.used > 0) {
                slot.used--;
            }
        }

        public synchronized int getTotalAvailSlotNum() {
            int total = 0;
            for (Slot slot : pathSlots.values()) {
                total += slot.getAvailable();
            }
            return total;
        }

        public synchronized int getTotalAvailBalanceSlotNum() {
            int num = 0;
            for (Slot slot : pathSlots.values()) {
                num += slot.getAvailableBalance();
            }
            return num;
        }

        public synchronized List<List<String>> getSlotInfo(long beId) {
            List<List<String>> results = Lists.newArrayList();
            pathSlots.forEach((key, value) -> {
                List<String> result = Lists.newArrayList();
                result.add(String.valueOf(beId));
                result.add(String.valueOf(key));
                result.add(String.valueOf(value.getAvailable()));
                result.add(String.valueOf(value.getTotal()));
                result.add(String.valueOf(value.getAvailableBalance()));
                result.add(String.valueOf(value.getAvgRate()));
                results.add(result);
            });
            return results;
        }

        public synchronized int getAvailableBalanceNum(long pathHash) {
            Slot slot = pathSlots.get(pathHash);
            return slot != null ? slot.getAvailableBalance() : 0;
        }

        public synchronized long takeBalanceSlot(long pathHash) {
            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return -1;
            }
            if (slot.balanceUsed < slot.getBalanceTotal()) {
                slot.balanceUsed++;
                return pathHash;
            }
            return -1;
        }

        public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, Tag tag, TStorageMedium medium) {
            if (pathHashs.isEmpty()) {
                return -1;
            }

            if (tag == null) {
                tag = Tag.DEFAULT_BACKEND_TAG;
            }

            Collections.sort(pathHashs);
            synchronized (this) {
                Long lastPathHash = lastPickPathHashs.get(tag, medium);
                if (lastPathHash == null) {
                    lastPathHash = -1L;
                }
                int preferSlotIndex = pathHashs.indexOf(lastPathHash) + 1;
                if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
                    preferSlotIndex = 0;
                }

                for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
                    long pathHash = pathHashs.get(i);
                    if (takeBalanceSlot(pathHash) != -1) {
                        lastPickPathHashs.put(tag, medium, pathHash);
                        return pathHash;
                    }
                }
                for (int i = 0; i < preferSlotIndex; i++) {
                    long pathHash = pathHashs.get(i);
                    if (takeBalanceSlot(pathHash) != -1) {
                        lastPickPathHashs.put(tag, medium, pathHash);
                        return pathHash;
                    }
                }
            }
            return -1;
        }

        public synchronized void freeBalanceSlot(long pathHash) {
            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return;
            }
            if (slot.balanceUsed > 0) {
                slot.balanceUsed--;
            }
        }

        public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return;
            }
            slot.diskBalanceLastSuccTime = System.currentTimeMillis();
        }

        public synchronized long getDiskBalanceLastSuccTime(long pathHash) {
            Slot slot = pathSlots.get(pathHash);
            if (slot == null) {
                return 0L;
            }
            return slot.diskBalanceLastSuccTime;
        }
    }

    public List<List<String>> getSlotsInfo() {
        List<List<String>> result = Lists.newArrayList();
        for (long beId : backendsWorkingSlots.keySet()) {
            PathSlot slot = backendsWorkingSlots.get(beId);
            result.addAll(slot.getSlotInfo(beId));
        }
        return result;
    }

    public static class Slot {
        public int used;
        public int balanceUsed;

        public long totalCopySize = 0;
        public long totalCopyTimeMs = 0;

        // for disk balance
        public long diskBalanceLastSuccTime = 0;

        private TStorageMedium storageMedium;

        public Slot(TStorageMedium storageMedium) {
            this.storageMedium = storageMedium;
            this.used = 0;
            this.balanceUsed = 0;
        }

        public int getAvailable() {
            return Math.max(0, getTotal() - used);
        }

        public int getTotal() {
            if (storageMedium == TStorageMedium.SSD) {
                return Config.schedule_slot_num_per_ssd_path;
            } else {
                return Config.schedule_slot_num_per_hdd_path;
            }
        }

        public int getAvailableBalance() {
            int leftBalance = Math.max(0, getBalanceTotal() - balanceUsed);
            return Math.min(leftBalance, getAvailable());
        }

        public int getBalanceTotal() {
            return Math.max(1, Config.balance_slot_num_per_path);
        }

        // return avg rate, Bytes/S
        public double getAvgRate() {
            if (totalCopyTimeMs / 1000 == 0) {
                return 0.0;
            }
            return totalCopySize / ((double) totalCopyTimeMs / 1000);
        }
    }
}