BeLoadRebalancer.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/*
 * BeLoadRebalancer strategy:
 * 1. selecting alternative tablets from high load backends, and return them to tablet scheduler.
 * 2. given a tablet, find a backend to migration.
 * 3. deleting the redundant replica in high load, so don't override getCachedSrcBackendId().
 */
public class BeLoadRebalancer extends Rebalancer {
    private static final Logger LOG = LogManager.getLogger(BeLoadRebalancer.class);

    public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
            Map<Long, PathSlot> backendsWorkingSlots) {
        super(infoService, invertedIndex, backendsWorkingSlots);
    }

    /*
     * Try to select alternative tablets to balance the specified cluster.
     * 1. Classify the backend into low, mid and high class by load score.
     * 2. Try to select tablets from high load backends.
     *      1. Here we only select alternative tablets, without considering selected tablets' status,
     *         and whether it is benefit for balance (All these will be checked in tablet scheduler)
     *      2. Only select tablets from 'high' backends.
     *      3. Only select tablets from 'high' and 'mid' paths.
     *
     * Here we only select tablets from high load node, do not set its src or dest, all this will be set
     * when this tablet is being scheduled in tablet scheduler.
     *
     * NOTICE that we may select any available tablets here, ignore their state.
     * The state will be checked when being scheduled in tablet scheduler.
     */
    @Override
    protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
            LoadStatisticForTag clusterStat, TStorageMedium medium) {
        List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
        List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
        List<BackendLoadStatistic> highBEs = Lists.newArrayList();
        boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, medium);

        if (lowBEs.isEmpty() && highBEs.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("cluster is balance with medium: {}. skip", medium);
            }
            return alternativeTablets;
        }

        // first we should check if low backends is available.
        // if all low backends is not available, we should not start balance
        if (lowBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
            LOG.info("all low load backends is dead: {} with medium: {}. skip",
                    lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
            return alternativeTablets;
        }

        if (lowBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) {
            LOG.info("all low load backends {} have no available disk with medium: {}. skip",
                    lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
            return alternativeTablets;
        }

        long numOfLowPaths = 0;
        for (BackendLoadStatistic backendLoadStatistic : lowBEs) {
            if (!backendLoadStatistic.isAvailable()) {
                continue;
            }
            PathSlot pathSlot = backendsWorkingSlots.get(backendLoadStatistic.getBeId());
            if (pathSlot != null) {
                numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
            }
        }
        LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, isUrgent {}",
                numOfLowPaths, medium, clusterStat.getTag(), isUrgent);

        List<String> alternativeTabletInfos = Lists.newArrayList();
        int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
        List<Set<Long>> lowBETablets = lowBEs.stream()
                .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
                .collect(Collectors.toList());

        boolean hasCandidateTablet = false;

        // choose tablets from high load backends.
        // BackendLoadStatistic is sorted by load score in ascend order,
        // so we need to traverse it from last to first
        OUTER:
        for (int i = highBEs.size() - 1; i >= 0; i--) {
            BackendLoadStatistic beStat = highBEs.get(i);
            PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
            if (pathSlot == null) {
                continue;
            }

            boolean choseHighDisk = isUrgent && beStat.getMaxDiskClazz(medium) == Classification.HIGH;

            // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets
            Map<Long, Integer> remainingPaths = Maps.newHashMap();
            Set<Long> pathHigh = null;
            if (choseHighDisk) {
                pathHigh = beStat.getAvailPaths(medium).stream().filter(RootPathLoadStatistic::isGlobalHighUsage)
                        .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
            } else {
                // classify the paths.
                pathHigh = Sets.newHashSet();
                Set<Long> pathLow = Sets.newHashSet();
                Set<Long> pathMid = Sets.newHashSet();
                beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
                // we only select tablets from available mid and high load path
                pathHigh.addAll(pathMid);
            }

            double highDiskMaxUsage = 0;
            for (Long pathHash : pathHigh) {
                int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash);
                if (availBalanceNum > 0) {
                    remainingPaths.put(pathHash, availBalanceNum);
                }

                RootPathLoadStatistic pathStat = beStat.getPathStatisticByPathHash(pathHash);
                if (pathStat != null) {
                    highDiskMaxUsage = Math.max(highDiskMaxUsage, pathStat.getUsedPercent());
                }
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("high be {}, medium: {}, path high: {}, remainingPaths: {}, chose high disk: {}",
                        beStat.getBeId(), medium, pathHigh, remainingPaths, choseHighDisk);
            }

            if (remainingPaths.isEmpty()) {
                continue;
            }

            // get all tablets on this backend, and shuffle them for random selection
            List<Pair<Long, Long>> tabletIdSizes = invertedIndex.getTabletSizeByBackendIdAndStorageMedium(
                    beStat.getBeId(), medium);
            if (!isUrgent
                    || tabletIdSizes.size() < Config.urgent_balance_pick_large_tablet_num_threshold
                    || highDiskMaxUsage < (double) Config.urgent_balance_pick_large_disk_usage_percentage / 100.0
                    || Config.urgent_balance_shuffle_large_tablet_percentage >= 100
                    || Config.urgent_balance_shuffle_large_tablet_percentage < 0) {
                Collections.shuffle(tabletIdSizes);
            } else {
                Collections.sort(tabletIdSizes, new Pair.PairComparator<Pair<Long, Long>>());
                if (Config.urgent_balance_shuffle_large_tablet_percentage > 0) {
                    int startIndex = (int) (tabletIdSizes.size()
                            * (1 - (double) Config.urgent_balance_shuffle_large_tablet_percentage / 100.0));
                    Collections.shuffle(tabletIdSizes.subList(startIndex, tabletIdSizes.size()));
                }
            }

            // select tablet from shuffled tablets
            for (int j = tabletIdSizes.size() - 1; j >= 0; j--) {
                long tabletId = tabletIdSizes.get(j).key();
                if (clusterAvailableBEnum <= invertedIndex.getReplicasByTabletId(tabletId).size()) {
                    continue;
                }

                if (alternativeTablets.stream().anyMatch(tabletCtx -> tabletId == tabletCtx.getTabletId())) {
                    continue;
                }

                Replica replica = null;
                try {
                    replica = invertedIndex.getReplica(tabletId, beStat.getBeId());
                } catch (IllegalStateException e) {
                    continue;
                }
                if (replica == null) {
                    continue;
                }

                // check if replica's is on 'high' or 'mid' path.
                // and only select it if the selected tablets num of this path
                // does not exceed the limit (BALANCE_SLOT_NUM_FOR_PATH).
                long replicaPathHash = replica.getPathHash();
                long replicaDataSize = replica.getDataSize();
                if (remainingPaths.containsKey(replicaPathHash)) {
                    TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
                    if (!canBalanceTablet(tabletMeta)) {
                        continue;
                    }

                    hasCandidateTablet = true;

                    // for urgent disk, pick tablets order by size,
                    // then it may always pick tablets that was on the low backends.
                    if (!lowBETablets.isEmpty()
                            && lowBETablets.stream().allMatch(tablets -> tablets.contains(tabletId))) {
                        continue;
                    }

                    boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replicaDataSize,
                            medium, null, false) == BalanceStatus.OK);
                    if (!isFit) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("tablet {} with size {} medium {} not fit in low backends",
                                    tabletId, replica.getDataSize(), medium);
                        }
                        continue;
                    }

                    TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
                            tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
                            tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/,
                            System.currentTimeMillis());
                    tabletCtx.setTag(clusterStat.getTag());
                    // balance task's priority is always LOW
                    tabletCtx.setPriority(isUrgent ? Priority.NORMAL : Priority.LOW);

                    alternativeTablets.add(tabletCtx);
                    alternativeTabletInfos.add("{ tabletId=" + tabletId + ", beId=" + beStat.getBeId()
                            + ", pathHash=" + replica.getPathHash()
                            + ", replicaLocalSize=" + replica.getDataSize() + " }");
                    if (--numOfLowPaths <= 0) {
                        // enough
                        break OUTER;
                    }

                    // update remaining paths
                    int remaining = remainingPaths.get(replicaPathHash) - 1;
                    if (remaining <= 0) {
                        remainingPaths.remove(replicaPathHash);
                    } else {
                        remainingPaths.put(replicaPathHash, remaining);
                    }
                }
            }
        } // end for high backends

        if (!alternativeTablets.isEmpty()) {
            LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}",
                    medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos);
        } else if (isUrgent && !hasCandidateTablet) {
            LOG.info("urgent balance cann't found candidate tablets. medium: {}, tag: {}",
                    medium, clusterStat.getTag());
        }
        return alternativeTablets;
    }


    /*
     * Create a clone task of this selected tablet for balance.
     * 1. Check if this tablet has replica on high load backend. If not, the balance will be cancelled.
     *    If yes, select a replica as source replica.
     * 2. Select a low load backend as destination. And tablet should not has replica on this backend.
     */
    @Override
    public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
        LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
        if (clusterStat == null) {
            throw new SchedException(Status.UNRECOVERABLE,
                    String.format("tag %s does not exist", tabletCtx.getTag()));
        }

        // get classification of backends
        List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
        List<BackendLoadStatistic> highBEs = Lists.newArrayList();
        boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, tabletCtx.getStorageMedium());
        String isUrgentInfo = isUrgent ? " for urgent" : " for non-urgent";

        if (lowBEs.isEmpty() && highBEs.isEmpty()) {
            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "cluster is balance");
        }

        // if all low backends is not available, return
        if (lowBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
            throw new SchedException(Status.UNRECOVERABLE, "all low load backends is unavailable");
        }

        List<Replica> replicas = tabletCtx.getReplicas();

        // Check if this tablet has replica on high load backend.
        // Also create a set to save hosts of this tablet.
        Set<String> hosts = Sets.newHashSet();
        List<BackendLoadStatistic> replicaHighBEs = Lists.newArrayList();
        for (BackendLoadStatistic beStat : highBEs) {
            if (replicas.stream().anyMatch(replica -> beStat.getBeId() == replica.getBackendIdWithoutException())) {
                replicaHighBEs.add(beStat);
            }
            Backend be = infoService.getBackend(beStat.getBeId());
            if (be == null) {
                throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                        "backend is dropped: " + beStat.getBeId());
            }
            hosts.add(be.getHost());
        }
        if (replicaHighBEs.isEmpty()) {
            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                    "no replica on high load backend" + isUrgentInfo);
        }

        // select a replica as source
        boolean setSource = false;
        for (Replica replica : replicas) {
            PathSlot slot = backendsWorkingSlots.get(replica.getBackendIdWithoutException());
            if (slot == null) {
                continue;
            }
            long pathHash = slot.takeBalanceSlot(replica.getPathHash());
            if (pathHash != -1) {
                tabletCtx.setSrc(replica);
                setSource = true;
                break;
            }
        }
        if (!setSource) {
            throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot" + isUrgentInfo);
        }

        // Select a low load backend as destination.
        List<BackendLoadStatistic> candidates = Lists.newArrayList();
        for (BackendLoadStatistic beStat : lowBEs) {
            if (beStat.isAvailable() && replicas.stream()
                    .noneMatch(r -> r.getBackendIdWithoutException() == beStat.getBeId())) {
                // check if on same host.
                Backend lowBackend = infoService.getBackend(beStat.getBeId());
                if (lowBackend == null) {
                    continue;
                }
                if (!Config.allow_replica_on_same_host && hosts.contains(lowBackend.getHost())) {
                    continue;
                }

                // no replica on this low load backend
                // 1. check if this clone task can make the cluster more balance.
                BalanceStatus bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), null,
                        false /* not supplement */);
                if (bs != BalanceStatus.OK) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("tablet not fit in BE {}, reason: {}, {}",
                                beStat.getBeId(), bs.getErrMsgs(), isUrgentInfo);
                    }
                    continue;
                }

                if (!Config.be_rebalancer_fuzzy_test && !isUrgent) {
                    boolean moreBalanced = replicaHighBEs.stream().anyMatch(highBeStat ->
                            clusterStat.isMoreBalanced(highBeStat.getBeId(), beStat.getBeId(),
                            tabletCtx.getTabletId(), tabletCtx.getTabletSize(),
                            tabletCtx.getStorageMedium()));
                    if (!moreBalanced) {
                        continue;
                    }
                }

                PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
                if (slot == null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("BE does not have slot: {}", beStat.getBeId());
                    }
                    continue;
                }

                candidates.add(beStat);
            }
        }

        if (candidates.isEmpty()) {
            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                    "unable to find low dest backend" + isUrgentInfo);
        }

        List<BePathLoadStatPair> candFitPaths = Lists.newArrayList();
        for (BackendLoadStatistic beStat : candidates) {
            PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
            if (slot == null) {
                continue;
            }

            List<RootPathLoadStatistic> pathLow = null;
            if (isUrgent) {
                pathLow = beStat.getAvailPaths(tabletCtx.getStorageMedium()).stream()
                        .filter(RootPathLoadStatistic::isGlobalLowUsage)
                        .collect(Collectors.toList());
            } else {
                // classify the paths.
                // And we only select path from 'low' and 'mid' paths
                pathLow = Lists.newArrayList();
                List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
                List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
                beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());

                pathLow.addAll(pathMid);
            }
            pathLow.forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path)));
        }

        if (candFitPaths.isEmpty()) {
            throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
                    "unable to find low dest backend to fit in paths" + isUrgentInfo);
        }

        BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(candFitPaths);
        Collections.sort(candFitPaths, comparator);
        for (BePathLoadStatPair bePathLoadStat : candFitPaths) {
            BackendLoadStatistic beStat = bePathLoadStat.getBackendLoadStatistic();
            RootPathLoadStatistic pathStat = bePathLoadStat.getPathLoadStatistic();

            PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
            if (slot == null) {
                continue;
            }
            if (slot.takeBalanceSlot(pathStat.getPathHash()) != -1) {
                tabletCtx.setDest(beStat.getBeId(), pathStat.getPathHash());
                return;
            }
        }

        throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
                "unable to take dest slot" + isUrgentInfo);
    }

}