Tablet.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

/**
 * This class represents the olap tablet related metadata.
 */
public class Tablet extends MetaObject {
    private static final Logger LOG = LogManager.getLogger(Tablet.class);
    // if current version count of replica is mor than
    // QUERYABLE_TIMES_OF_MIN_VERSION_COUNT times the minimum version count,
    // then the replica would not be considered as queryable.
    private static final int QUERYABLE_TIMES_OF_MIN_VERSION_COUNT = 3;

    public enum TabletStatus {
        HEALTHY,
        REPLICA_MISSING, // not enough alive replica num.
        VERSION_INCOMPLETE, // alive replica num is enough, but version is missing.
        REPLICA_RELOCATING, // replica is healthy, but is under relocating (eg. BE is decommission).
        REDUNDANT, // too much replicas.
        REPLICA_MISSING_FOR_TAG, // not enough healthy replicas in backend with specified tag.
        FORCE_REDUNDANT, // some replica is missing or bad, but there is no other backends for repair,
        // at least one replica has to be deleted first to make room for new replica.
        COLOCATE_MISMATCH, // replicas do not all locate in right colocate backends set.
        COLOCATE_REDUNDANT, // replicas match the colocate backends set, but redundant.
        NEED_FURTHER_REPAIR, // one of replicas need a definite repair.
        UNRECOVERABLE,   // none of replicas are healthy
        REPLICA_COMPACTION_TOO_SLOW // one replica's version count is much more than other replicas;
    }

    public static class TabletHealth {
        public TabletStatus status;
        public TabletSchedCtx.Priority priority;

        // num of alive replica with version complete
        public int aliveAndVersionCompleteNum;

        // NEED_FURTHER_REPAIR replica id
        public long needFurtherRepairReplicaId;

        // has alive replica with version incomplete, prior to repair these replica
        public boolean hasAliveAndVersionIncomplete;

        // this tablet recent write failed, then increase its sched priority
        public boolean hasRecentLoadFailed;

        // this tablet want to add new replica, but not found target backend.
        public boolean noPathForNewReplica;

        public TabletHealth() {
            status = null; // don't set for balance task
            priority = TabletSchedCtx.Priority.NORMAL;
            aliveAndVersionCompleteNum = 0;
            needFurtherRepairReplicaId = -1L;
            hasAliveAndVersionIncomplete = false;
            hasRecentLoadFailed = false;
            noPathForNewReplica = false;
        }
    }

    @SerializedName(value = "id")
    protected long id;
    @SerializedName(value = "rs", alternate = {"replicas"})
    protected List<Replica> replicas;
    @SerializedName(value = "cv", alternate = {"checkedVersion"})
    private long checkedVersion;
    @Deprecated
    @SerializedName(value = "cvs", alternate = {"checkedVersionHash"})
    private long checkedVersionHash;
    @SerializedName(value = "ic", alternate = {"isConsistent"})
    private boolean isConsistent;

    // cooldown conf
    @SerializedName(value = "cri", alternate = {"cooldownReplicaId"})
    private long cooldownReplicaId = -1;
    @SerializedName(value = "ctm", alternate = {"cooldownTerm"})
    private long cooldownTerm = -1;
    private MonitoredReentrantReadWriteLock cooldownConfLock = new MonitoredReentrantReadWriteLock();

    // last time that the tablet checker checks this tablet.
    // no need to persist
    private long lastStatusCheckTime = -1;

    // last time for load data fail
    private long lastLoadFailedTime = -1;

    // if tablet want to add a new replica, but cann't found any backend to locate the new replica.
    // then mark this tablet. For later repair, even try and try to repair this tablet, sched will always fail.
    // For example, 1 tablet contains 3 replicas, if 1 backend is dead, then tablet's healthy status
    // is REPLICA_MISSING. But since no other backend can held the new replica, then sched always fail.
    // So don't increase this tablet's sched priority if it has no path for new replica.
    private long lastTimeNoPathForNewReplica = -1;

    public Tablet() {
        this(0L, new ArrayList<>());
    }

    public Tablet(long tabletId) {
        this(tabletId, new ArrayList<>());
    }

    private Tablet(long tabletId, List<Replica> replicas) {
        this.id = tabletId;
        this.replicas = replicas;
        if (this.replicas == null) {
            this.replicas = new ArrayList<>();
        }

        checkedVersion = -1L;

        isConsistent = true;
    }

    public void setIdForRestore(long tabletId) {
        this.id = tabletId;
    }

    public long getId() {
        return this.id;
    }

    public long getCheckedVersion() {
        return this.checkedVersion;
    }

    public void setCheckedVersion(long checkedVersion) {
        this.checkedVersion = checkedVersion;
    }

    public void setIsConsistent(boolean good) {
        this.isConsistent = good;
    }

    public boolean isConsistent() {
        return isConsistent;
    }

    public void setCooldownConf(long cooldownReplicaId, long cooldownTerm) {
        cooldownConfLock.writeLock().lock();
        this.cooldownReplicaId = cooldownReplicaId;
        this.cooldownTerm = cooldownTerm;
        cooldownConfLock.writeLock().unlock();
    }

    public long getCooldownReplicaId() {
        return cooldownReplicaId;
    }

    public Pair<Long, Long> getCooldownConf() {
        cooldownConfLock.readLock().lock();
        try {
            return Pair.of(cooldownReplicaId, cooldownTerm);
        } finally {
            cooldownConfLock.readLock().unlock();
        }
    }

    protected boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
        boolean delete = false;
        boolean hasBackend = false;
        long version = newReplica.getVersion();
        Iterator<Replica> iterator = replicas.iterator();
        while (iterator.hasNext()) {
            Replica replica = iterator.next();
            if (replica.getBackendIdWithoutException() == newReplica.getBackendIdWithoutException()) {
                hasBackend = true;
                if (replica.getVersion() <= version) {
                    iterator.remove();
                    delete = true;
                }
            }
        }

        return delete || !hasBackend;
    }

    public void addReplica(Replica replica, boolean isRestore) {
        if (isLatestReplicaAndDeleteOld(replica)) {
            replicas.add(replica);
            if (!isRestore) {
                Env.getCurrentInvertedIndex().addReplica(id, replica);
            }
        }
    }

    public void addReplica(Replica replica) {
        addReplica(replica, false);
    }

    public List<Replica> getReplicas() {
        return this.replicas;
    }

    public Set<Long> getBackendIds() {
        Set<Long> beIds = Sets.newHashSet();
        for (Replica replica : replicas) {
            beIds.add(replica.getBackendIdWithoutException());
        }
        return beIds;
    }

    public List<Long> getNormalReplicaBackendIds() {
        try {
            return Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
        } catch (Exception e) {
            LOG.warn("failed to getNormalReplicaBackendIds", e);
            return Lists.newArrayList();
        }
    }

    @FunctionalInterface
    interface BackendIdGetter {
        long get(Replica rep, String be) throws UserException;
    }

    private Multimap<Long, Long> getNormalReplicaBackendPathMapImpl(String beEndpoint, BackendIdGetter idGetter)
            throws UserException {
        Multimap<Long, Long> map = HashMultimap.create();
        SystemInfoService infoService = Env.getCurrentSystemInfo();
        for (Replica replica : replicas) {
            long backendId = idGetter.get(replica, beEndpoint);
            if (!infoService.checkBackendAlive(backendId)) {
                continue;
            }

            if (replica.isBad()) {
                continue;
            }

            ReplicaState state = replica.getState();
            if (state.canLoad()
                    || (state == ReplicaState.DECOMMISSION
                            && replica.getPostWatermarkTxnId() < 0
                            && replica.getLastFailedVersion() < 0)) {
                map.put(backendId, replica.getPathHash());
            }
        }
        return map;
    }

    // return map of (BE id -> path hash) of normal replicas
    // for load plan.
    public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws UserException {
        return getNormalReplicaBackendPathMapImpl(null, (rep, be) -> rep.getBackendId());
    }

    // for cloud mode without ConnectContext. use BE IP to find replica
    protected Multimap<Long, Long> getNormalReplicaBackendPathMapCloud(String beEndpoint) throws UserException {
        return getNormalReplicaBackendPathMapImpl(beEndpoint,
                (rep, be) -> ((CloudReplica) rep).getBackendId(be));
    }

    // When a BE reports a missing version, lastFailedVersion is set. When a write fails on a replica,
    // lastFailedVersion is set.
    // for query
    public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Long>> backendAlivePathHashs,
            boolean allowMissingVersion) {
        List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size());
        List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size());
        List<Replica> deadPathReplica = Lists.newArrayList();
        List<Replica> mayMissingVersionReplica = Lists.newArrayList();
        List<Replica> notCatchupReplica = Lists.newArrayList();

        for (Replica replica : replicas) {
            if (replica.isBad()) {
                continue;
            }
            if (!replica.checkVersionCatchUp(visibleVersion, false)) {
                notCatchupReplica.add(replica);
                continue;
            }
            if (replica.getLastFailedVersion() > 0) {
                mayMissingVersionReplica.add(replica);
                continue;
            }

            Set<Long> thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendIdWithoutException());
            ReplicaState state = replica.getState();
            // if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state.
            // should ignore this case.
            if (replica.getPathHash() != -1 && thisBeAlivePaths != null
                    && !thisBeAlivePaths.contains(replica.getPathHash())
                    && !thisBeAlivePaths.contains(0L)) {
                deadPathReplica.add(replica);
            } else if (state.canQuery()) {
                allQueryableReplica.add(replica);
            } else if (state == ReplicaState.DECOMMISSION) {
                auxiliaryReplica.add(replica);
            }
        }

        if (allQueryableReplica.isEmpty()) {
            allQueryableReplica = auxiliaryReplica;
        }
        if (allQueryableReplica.isEmpty()) {
            allQueryableReplica = deadPathReplica;
        }

        if (allQueryableReplica.isEmpty()) {
            // If be misses a version, be would report failure.
            allQueryableReplica = mayMissingVersionReplica;
        }

        if (allQueryableReplica.isEmpty() && allowMissingVersion) {
            allQueryableReplica = notCatchupReplica;
        }

        if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) {
            long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)
                    .filter(count -> count != -1).min().orElse(Long.MAX_VALUE);
            long maxVersionCount = Config.min_version_count_indicate_replica_compaction_too_slow;
            if (minVersionCount != Long.MAX_VALUE) {
                maxVersionCount = Math.max(maxVersionCount, minVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT);
            }

            final long finalMaxVersionCount = maxVersionCount;
            return allQueryableReplica.stream()
                    .filter(replica -> replica.getVisibleVersionCount() < finalMaxVersionCount)
                    .collect(Collectors.toList());
        }
        return allQueryableReplica;
    }

    public String getDetailsStatusForQuery(long visibleVersion) {
        StringBuilder sb = new StringBuilder("Visible Replicas:");
        sb.append("Visible version: ").append(visibleVersion);
        sb.append(", Replicas: ");
        sb.append(Joiner.on(", ").join(replicas.stream().map(replica -> replica.toStringSimple(true))
                .collect(Collectors.toList())));
        sb.append(".");

        return sb.toString();
    }

    public Replica getReplicaById(long replicaId) {
        for (Replica replica : replicas) {
            if (replica.getId() == replicaId) {
                return replica;
            }
        }
        return null;
    }

    public Replica getReplicaByBackendId(long backendId) {
        for (Replica replica : replicas) {
            if (replica.getBackendIdWithoutException() == backendId) {
                return replica;
            }
        }
        return null;
    }

    public boolean deleteReplica(Replica replica) {
        if (replicas.contains(replica)) {
            replicas.remove(replica);
            Env.getCurrentInvertedIndex().deleteReplica(id, replica.getBackendIdWithoutException());
            return true;
        }
        return false;
    }

    public boolean deleteReplicaByBackendId(long backendId) {
        Iterator<Replica> iterator = replicas.iterator();
        while (iterator.hasNext()) {
            Replica replica = iterator.next();
            if (replica.getBackendIdWithoutException() == backendId) {
                iterator.remove();
                Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
                return true;
            }
        }
        return false;
    }

    @Deprecated
    public Replica deleteReplicaById(long replicaId) {
        Iterator<Replica> iterator = replicas.iterator();
        while (iterator.hasNext()) {
            Replica replica = iterator.next();
            if (replica.getId() == replicaId) {
                LOG.info("delete replica[" + replica.getId() + "]");
                iterator.remove();
                return replica;
            }
        }
        return null;
    }

    // for test,
    // and for some replay cases
    public void clearReplica() {
        this.replicas.clear();
    }

    public void setTabletId(long tabletId) {
        this.id = tabletId;
    }

    public static void sortReplicaByVersionDesc(List<Replica> replicas) {
        // sort replicas by version. higher version in the tops
        replicas.sort(Replica.VERSION_DESC_COMPARATOR);
    }

    @Override
    public String toString() {
        return "tabletId=" + this.id;
    }

    @Deprecated
    @Override
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);

        id = in.readLong();
        int replicaCount = in.readInt();
        for (int i = 0; i < replicaCount; ++i) {
            Replica replica = Replica.read(in);
            if (isLatestReplicaAndDeleteOld(replica)) {
                replicas.add(replica);
            }
        }

        checkedVersion = in.readLong();
        checkedVersionHash = in.readLong();
        isConsistent = in.readBoolean();
    }

    @Deprecated
    public static Tablet read(DataInput in) throws IOException {
        if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
            String json = Text.readString(in);
            return GsonUtils.GSON.fromJson(json, EnvFactory.getInstance().getTabletClass());
        }

        Tablet tablet = EnvFactory.getInstance().createTablet();
        tablet.readFields(in);
        return tablet;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (!(obj instanceof Tablet)) {
            return false;
        }

        Tablet tablet = (Tablet) obj;

        if (replicas != tablet.replicas) {
            if (replicas.size() != tablet.replicas.size()) {
                return false;
            }
            int size = replicas.size();
            for (int i = 0; i < size; i++) {
                if (!tablet.replicas.contains(replicas.get(i))) {
                    return false;
                }
            }
        }
        return id == tablet.id;
    }

    public long getDataSize(boolean singleReplica) {
        LongStream s = replicas.stream().filter(r -> r.getState() == ReplicaState.NORMAL)
                .mapToLong(Replica::getDataSize);
        return singleReplica ? Double.valueOf(s.average().orElse(0)).longValue() : s.sum();
    }

    public long getRemoteDataSize() {
        // if CooldownReplicaId is not init
        if (cooldownReplicaId <= 0) {
            return 0;
        }
        for (Replica r : replicas) {
            if (r.getId() == cooldownReplicaId) {
                return r.getRemoteDataSize();
            }
        }
        // return replica with max remoteDataSize
        return replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
    }

    public long getRowCount(boolean singleReplica) {
        LongStream s = replicas.stream().filter(r -> r.getState() == ReplicaState.NORMAL)
                .mapToLong(Replica::getRowCount);
        return singleReplica ? Double.valueOf(s.average().orElse(0)).longValue() : s.sum();
    }

    // Get the least row count among all valid replicas.
    // The replica with the least row count is the most accurate one. Because it performs most compaction.
    public long getMinReplicaRowCount(long version) {
        long minRowCount = Long.MAX_VALUE;
        long maxReplicaVersion = 0;
        for (Replica r : replicas) {
            if (r.isAlive()
                    && r.checkVersionCatchUp(version, false)
                    && (r.getVersion() > maxReplicaVersion
                        || r.getVersion() == maxReplicaVersion && r.getRowCount() < minRowCount)) {
                minRowCount = r.getRowCount();
                maxReplicaVersion = r.getVersion();
            }
        }
        return minRowCount == Long.MAX_VALUE ? 0 : minRowCount;
    }

    /**
     * A replica is healthy only if
     * 1. the backend is available
     * 2. replica version is caught up, and last failed version is -1
     * <p>
     * A tablet is healthy only if
     * 1. healthy replica num is equal to replicationNum
     * 2. all healthy replicas are in right tag
     */
    public TabletHealth getHealth(SystemInfoService systemInfoService,
            long visibleVersion, ReplicaAllocation replicaAlloc, List<Long> aliveBeIds) {
        Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
        Map<Tag, Short> stableAllocMap = Maps.newHashMap();
        Map<Tag, Short> stableVersionCompleteAllocMap = Maps.newHashMap();

        short replicationNum = replicaAlloc.getTotalReplicaNum();
        int alive = 0;
        int aliveAndVersionComplete = 0;
        int stable = 0;

        Replica needFurtherRepairReplica = null;
        boolean hasAliveAndVersionIncomplete = false;
        Set<String> hosts = Sets.newHashSet();
        ArrayList<Long> versions = new ArrayList<>();
        for (Replica replica : replicas) {
            Backend backend = systemInfoService.getBackend(replica.getBackendIdWithoutException());
            if (!isReplicaAndBackendAlive(replica, backend, hosts)) {
                continue;
            }

            alive++;

            boolean versionCompleted = replica.getLastFailedVersion() < 0 && replica.getVersion() >= visibleVersion;
            if (versionCompleted) {
                aliveAndVersionComplete++;
            }

            if (replica.isScheduleAvailable()) {
                if (replica.needFurtherRepair() && (needFurtherRepairReplica == null || !versionCompleted)) {
                    needFurtherRepairReplica = replica;
                }

                short allocNum = stableAllocMap.getOrDefault(backend.getLocationTag(), (short) 0);
                stableAllocMap.put(backend.getLocationTag(), (short) (allocNum + 1));

                if (versionCompleted) {
                    stable++;
                    versions.add(replica.getVisibleVersionCount());

                    allocNum = stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0);
                    stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum + 1));
                } else {
                    hasAliveAndVersionIncomplete = true;
                }
            }
        }

        TabletHealth tabletHealth = new TabletHealth();
        initTabletHealth(tabletHealth);
        tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete;
        tabletHealth.hasAliveAndVersionIncomplete = hasAliveAndVersionIncomplete;
        if (needFurtherRepairReplica != null) {
            tabletHealth.needFurtherRepairReplicaId = needFurtherRepairReplica.getId();
        }

        // 0. We can not choose a good replica as src to repair this tablet.
        if (aliveAndVersionComplete == 0) {
            tabletHealth.status = TabletStatus.UNRECOVERABLE;
            return tabletHealth;
        } else if (aliveAndVersionComplete < replicationNum && hasAliveAndVersionIncomplete) {
            // not enough good replica, and there exists schedule available replicas and  version incomplete,
            // no matter whether they tag is proper right, fix them immediately.
            tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
            tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
            return tabletHealth;
        }

        // 1. alive replicas are not enough
        int aliveBackendsNum = aliveBeIds.size();
        if (alive < replicationNum && replicas.size() >= aliveBackendsNum
                && aliveBackendsNum >= replicationNum && replicationNum > 1) {
            // there is no enough backend for us to create a new replica, so we have to delete an existing replica,
            // so there can be available backend for us to create a new replica.
            // And if there is only one replica, we will not handle it(maybe need human interference)
            // condition explain:
            // 1. alive < replicationNum: replica is missing or bad
            // 2. replicas.size() >= aliveBackendsNum: the existing replicas occupies all available backends
            // 3. aliveBackendsNum >= replicationNum: make sure after deleting,
            //    there will be at least one backend for new replica.
            // 4. replicationNum > 1: if replication num is set to 1, do not delete any replica, for safety reason
            tabletHealth.status = TabletStatus.FORCE_REDUNDANT;
            tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
            return tabletHealth;
        } else if (alive < replicationNum) {
            tabletHealth.status = TabletStatus.REPLICA_MISSING;
            tabletHealth.priority = alive < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.VERY_HIGH
                    : TabletSchedCtx.Priority.NORMAL;
            return tabletHealth;
        }

        // 2. version complete replicas are not enough
        if (aliveAndVersionComplete < replicationNum) {
            tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
            tabletHealth.priority = alive < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.HIGH
                    : TabletSchedCtx.Priority.NORMAL;
            return tabletHealth;
        } else if (aliveAndVersionComplete > replicationNum) {
            if (needFurtherRepairReplica != null) {
                tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR;
                tabletHealth.priority = TabletSchedCtx.Priority.HIGH;
            } else {
                // we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
                tabletHealth.status = TabletStatus.REDUNDANT;
                tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
            }
            return tabletHealth;
        }

        // 3. replica is under relocating
        if (stable < replicationNum) {
            Set<Long> replicaBeIds = replicas.stream().map(Replica::getBackendIdWithoutException)
                    .collect(Collectors.toSet());
            List<Long> availableBeIds = aliveBeIds.stream().filter(systemInfoService::checkBackendScheduleAvailable)
                    .collect(Collectors.toList());
            if (replicaBeIds.containsAll(availableBeIds)
                    && availableBeIds.size() >= replicationNum
                    && replicationNum > 1) { // No BE can be choose to create a new replica
                tabletHealth.status = TabletStatus.FORCE_REDUNDANT;
                tabletHealth.priority = stable < (replicationNum / 2) + 1
                                ? TabletSchedCtx.Priority.NORMAL : TabletSchedCtx.Priority.LOW;
                return tabletHealth;
            }

            if (stable < replicationNum) {
                tabletHealth.status = TabletStatus.REPLICA_RELOCATING;
                tabletHealth.priority = stable < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.NORMAL
                        : TabletSchedCtx.Priority.LOW;
                return tabletHealth;
            }
        }

        // 4. got enough healthy replicas, check tag
        for (Map.Entry<Tag, Short> alloc : allocMap.entrySet()) {
            if (stableVersionCompleteAllocMap.getOrDefault(alloc.getKey(), (short) 0) < alloc.getValue()) {
                if (stableAllocMap.getOrDefault(alloc.getKey(), (short) 0) >= alloc.getValue()) {
                    tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
                } else {
                    tabletHealth.status = TabletStatus.REPLICA_MISSING_FOR_TAG;
                }
                tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;
                return tabletHealth;
            }
        }

        if (replicas.size() > replicationNum) {
            if (needFurtherRepairReplica != null) {
                tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR;
                tabletHealth.priority = TabletSchedCtx.Priority.HIGH;
            } else {
                // we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
                tabletHealth.status = TabletStatus.REDUNDANT;
                tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
            }
            return tabletHealth;
        }

        // 5. find a replica's version count is much more than others, and drop it
        if (Config.repair_slow_replica && versions.size() == replicas.size() && versions.size() > 1) {
            // sort version
            Collections.sort(versions);
            // get the max version diff
            long delta = versions.get(versions.size() - 1) - versions.get(0);
            double ratio = (double) delta / versions.get(versions.size() - 1);
            if (versions.get(versions.size() - 1) >= Config.min_version_count_indicate_replica_compaction_too_slow
                    && ratio > Config.valid_version_count_delta_ratio_between_replicas) {
                tabletHealth.status = TabletStatus.REPLICA_COMPACTION_TOO_SLOW;
                tabletHealth.priority = Priority.HIGH;
                return tabletHealth;
            }
        }

        // 6. healthy
        tabletHealth.status = TabletStatus.HEALTHY;
        tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;

        return tabletHealth;
    }

    private void initTabletHealth(TabletHealth tabletHealth) {
        long endTime = System.currentTimeMillis() - Config.tablet_recent_load_failed_second * 1000L;
        tabletHealth.hasRecentLoadFailed = lastLoadFailedTime > endTime;
        tabletHealth.noPathForNewReplica = lastTimeNoPathForNewReplica > endTime;
    }

    private boolean isReplicaAndBackendAlive(Replica replica, Backend backend, Set<String> hosts) {
        if (backend == null || !backend.isAlive() || !replica.isAlive()
                || checkHost(hosts, backend) || replica.tooSlow() || !backend.isMixNode()) {
            // this replica is not alive,
            // or if this replica is on same host with another replica, we also treat it as 'dead',
            // so that Tablet Scheduler will create a new replica on different host.
            // ATTN: Replicas on same host is a bug of previous Doris version, so we fix it by this way.
            return false;
        } else {
            return true;
        }
    }

    private boolean checkHost(Set<String> hosts, Backend backend) {
        return !Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && !hosts.add(backend.getHost());
    }

    /**
     * Check colocate table's tablet health
     * 1. Mismatch:
     *      backends set:       1,2,3
     *      tablet replicas:    1,2,5
     *
     *      backends set:       1,2,3
     *      tablet replicas:    1,2
     *
     *      backends set:       1,2,3
     *      tablet replicas:    1,2,4,5
     *
     * 2. Version incomplete:
     *      backend matched, but some replica(in backends set)'s version is incomplete
     *
     * 3. Redundant:
     *      backends set:       1,2,3
     *      tablet replicas:    1,2,3,4
     *
     * No need to check if backend is available. We consider all backends in 'backendsSet' are available,
     * If not, unavailable backends will be relocated by CalocateTableBalancer first.
     */
    public TabletHealth getColocateHealth(long visibleVersion,
            ReplicaAllocation replicaAlloc, Set<Long> backendsSet) {
        SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
        short replicationNum = replicaAlloc.getTotalReplicaNum();
        boolean hasAliveAndVersionIncomplete = false;
        int aliveAndVersionComplete = 0;
        Set<String> hosts = Sets.newHashSet();
        for (Replica replica : replicas) {
            Backend backend = systemInfoService.getBackend(replica.getBackendIdWithoutException());
            if (!isReplicaAndBackendAlive(replica, backend, hosts)) {
                continue;
            }

            boolean versionCompleted = replica.getLastFailedVersion() < 0 && replica.getVersion() >= visibleVersion;
            if (versionCompleted) {
                aliveAndVersionComplete++;
            }

            if (replica.isScheduleAvailable()) {
                if (!versionCompleted) {
                    hasAliveAndVersionIncomplete = true;
                }
            }
        }

        TabletHealth tabletHealth = new TabletHealth();
        initTabletHealth(tabletHealth);
        tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete;
        tabletHealth.hasAliveAndVersionIncomplete = hasAliveAndVersionIncomplete;
        tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;

        // 0. We can not choose a good replica as src to repair this tablet.
        if (aliveAndVersionComplete == 0) {
            tabletHealth.status = TabletStatus.UNRECOVERABLE;
            return tabletHealth;
        } else if (aliveAndVersionComplete < replicationNum && hasAliveAndVersionIncomplete) {
            // not enough good replica, and there exists schedule available replicas and  version incomplete,
            // no matter whether they tag is proper right, fix them immediately.
            tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
            tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
            return tabletHealth;
        }

        // Here we don't need to care about tag. Because the replicas of the colocate table has been confirmed
        // in ColocateTableCheckerAndBalancer.
        Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
        // 1. check if replicas' backends are mismatch
        //    There is no REPLICA_MISSING status for colocate table's tablet.
        //    Because if the following check doesn't pass, the COLOCATE_MISMATCH will return.
        Set<Long> replicaBackendIds = getBackendIds();
        if (!replicaBackendIds.containsAll(backendsSet)) {
            tabletHealth.status = TabletStatus.COLOCATE_MISMATCH;
            return tabletHealth;
        }

        // 2. check version completeness
        for (Replica replica : replicas) {
            if (!backendsSet.contains(replica.getBackendIdWithoutException())) {
                // We don't care about replicas that are not in backendsSet.
                // eg:  replicaBackendIds=(1,2,3,4); backendsSet=(1,2,3),
                //      then replica 4 should be skipped here and then goto ```COLOCATE_REDUNDANT``` in step 3
                continue;
            }

            if (!replica.isAlive()) {
                if (replica.isBad()) {
                    // If this replica is bad but located on one of backendsSet,
                    // we have drop it first, or we can find any other BE for new replica.
                    tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT;
                } else {
                    // maybe in replica's DECOMMISSION state
                    // Here we return VERSION_INCOMPLETE,
                    // and the tablet scheduler will finally set it's state to NORMAL.
                    tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
                }
                return tabletHealth;
            }

            if (replica.getLastFailedVersion() > 0 || replica.getVersion() < visibleVersion) {
                // this replica is alive but version incomplete
                tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
                return tabletHealth;
            }
        }

        // 3. check redundant
        if (replicas.size() > totalReplicaNum) {
            tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT;
            return tabletHealth;
        }

        tabletHealth.status = TabletStatus.HEALTHY;
        return tabletHealth;
    }

    /**
     * check if this tablet is ready to be repaired, based on priority.
     * VERY_HIGH: repair immediately
     * HIGH:    delay Config.tablet_repair_delay_factor_second * 1;
     * NORMAL:  delay Config.tablet_repair_delay_factor_second * 2;
     * LOW:     delay Config.tablet_repair_delay_factor_second * 3;
     */
    public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.Priority priority) {
        if (FeConstants.runningUnitTest) {
            return true;
        }

        if (priority == Priority.VERY_HIGH) {
            return true;
        }

        boolean allBeAliveOrDecommissioned = true;
        for (Replica replica : replicas) {
            Backend backend = infoService.getBackend(replica.getBackendIdWithoutException());
            if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) {
                allBeAliveOrDecommissioned = false;
                break;
            }
        }

        if (allBeAliveOrDecommissioned) {
            return true;
        }

        long currentTime = System.currentTimeMillis();

        // first check, wait for next round
        if (lastStatusCheckTime == -1) {
            lastStatusCheckTime = currentTime;
            return false;
        }

        boolean ready = false;
        switch (priority) {
            case HIGH:
                ready = currentTime - lastStatusCheckTime > Config.tablet_repair_delay_factor_second * 1000 * 1;
                break;
            case NORMAL:
                ready = currentTime - lastStatusCheckTime > Config.tablet_repair_delay_factor_second * 1000 * 2;
                break;
            case LOW:
                ready = currentTime - lastStatusCheckTime > Config.tablet_repair_delay_factor_second * 1000 * 3;
                break;
            default:
                break;
        }

        return ready;
    }

    public void setLastStatusCheckTime(long lastStatusCheckTime) {
        this.lastStatusCheckTime = lastStatusCheckTime;
    }

    public long getLastLoadFailedTime() {
        return lastLoadFailedTime;
    }

    public void setLastLoadFailedTime(long lastLoadFailedTime) {
        this.lastLoadFailedTime = lastLoadFailedTime;
    }

    public void setLastTimeNoPathForNewReplica(long lastTimeNoPathForNewReplica) {
        this.lastTimeNoPathForNewReplica = lastTimeNoPathForNewReplica;
    }
}