TwoDimensionalGreedyRebalanceAlgo.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
import org.apache.doris.common.Pair;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.security.SecureRandom;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition.
 * It'll generate multiple `PartitionMove`, only decide which partition to move, fromBe, toBe.
 * The next step is to select a tablet to move.
 *
 * <p>From among moves that decrease the skew of a most skewed partition,
 * it prefers ones that reduce the skew of the cluster.
 * A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
 * The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
 * minimum total replica count over all bes.
 *
 * This class is modified from kudu TwoDimensionalGreedyAlgo.
 */
public class TwoDimensionalGreedyRebalanceAlgo {
    private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);

    private final EqualSkewOption equalSkewOption;
    private static final Random rand = new SecureRandom();

    public static class PartitionMove {
        Long partitionId;
        Long indexId;
        Long fromBe;
        Long toBe;

        PartitionMove(Long p, Long i, Long f, Long t) {
            this.partitionId = p;
            this.indexId = i;
            this.fromBe = f;
            this.toBe = t;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            PartitionMove that = (PartitionMove) o;
            return Objects.equal(partitionId, that.partitionId)
                    && Objects.equal(indexId, that.indexId)
                    && Objects.equal(fromBe, that.fromBe)
                    && Objects.equal(toBe, that.toBe);
        }

        @Override
        public int hashCode() {
            return Objects.hashCode(partitionId, indexId, fromBe, toBe);
        }

        @Override
        public String toString() {
            return "ReplicaMove{"
                    + "pid=" + partitionId + "-" + indexId
                    + ", from=" + fromBe
                    + ", to=" + toBe
                    + '}';
        }
    }

    public enum EqualSkewOption {
        // generally only be used on unit test
        PICK_FIRST,
        PICK_RANDOM
    }

    public enum ExtremumType {
        MAX,
        MIN
    }

    public static class IntersectionResult {
        Long replicaCountPartition;
        Long replicaCountTotal;
        List<Long> beWithExtremumCount;
        List<Long> intersection;
    }

    TwoDimensionalGreedyRebalanceAlgo() {
        this(EqualSkewOption.PICK_RANDOM);
    }

    TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
        this.equalSkewOption = equalSkewOption;
    }

    // maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
    // May modify the ClusterBalanceInfo
    public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
        Preconditions.checkArgument(maxMovesNum >= 0);
        if (maxMovesNum == 0) {
            maxMovesNum = Integer.MAX_VALUE;
        }

        if (info.partitionInfoBySkew.isEmpty()) {
            // Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
            // the partition skew, partition count for all the be should be 0.
            // Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
            NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
            if (LOG.isDebugEnabled()) {
                LOG.debug(keySet);
            }

            return Lists.newArrayList();
        }

        NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
        if (keySet.isEmpty() || keySet.last() == 0L) {
            // the number of replica on specified medium we get from getReplicaNumByBeIdAndStorageMedium() is
            // defined by table properties,but in fact there may not has SSD/HDD disk on this backend.
            // So if we found that no SSD/HDD disk on this backend, set the replica number to 0,
            // but the partitionInfoBySkew doesn't consider this scene, medium has no SSD/HDD disk also skew,
            // cause rebalance exception
            return Lists.newArrayList();
        }

        List<PartitionMove> moves = Lists.newArrayList();
        for (int i = 0; i < maxMovesNum; ++i) {
            PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
            if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
                // 1. No replicas to move.
                // 2. Apply to info failed, it's useless to get next move from the same info.
                break;
            }
            moves.add(move);
        }

        return moves;
    }

    private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
                                      TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
        PartitionMove move = null;
        if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
            return null;
        }
        long maxPartitionSkew = skewMap.keySet().last();
        // don't make a global balance because beByTotalReplicaCount may contains tablets for other medium or tag
        if (maxPartitionSkew <= 1L) {
            return null;
        }

        // Among the partitions with maximum skew, attempt to pick a partition where there is
        // a move that improves the partition skew and the cluster skew, if possible. If
        // not, attempt to pick a move that improves the partition skew. If all partitions
        // are balanced, attempt to pick a move that preserves partition balance and
        // improves cluster skew.
        NavigableSet<PartitionBalanceInfo> maxSet = skewMap.get(maxPartitionSkew);
        for (PartitionBalanceInfo pbi : maxSet) {
            Preconditions.checkArgument(!pbi.beByReplicaCount.isEmpty(), "no information on replicas of "
                    + "partition " + pbi.partitionId + "-" + pbi.indexId);

            Long minReplicaCount = pbi.beByReplicaCount.keySet().first();
            Long maxReplicaCount = pbi.beByReplicaCount.keySet().last();
            if (LOG.isDebugEnabled()) {
                LOG.debug("balancing partition {}-{} with replica count skew {}"
                                + " (min_replica_count: {}, max_replica_count: {})",
                        pbi.partitionId, pbi.indexId, maxPartitionSkew,
                        minReplicaCount, maxReplicaCount);
            }

            // Compute the intersection of the bes most loaded for the table
            // with the bes most loaded overall, and likewise for least loaded.
            // These are our ideal candidates for moving from and to, respectively.
            IntersectionResult maxLoaded = getIntersection(ExtremumType.MAX,
                    pbi.beByReplicaCount, beByTotalReplicaCount);
            IntersectionResult minLoaded = getIntersection(ExtremumType.MIN,
                    pbi.beByReplicaCount, beByTotalReplicaCount);
            if (LOG.isDebugEnabled()) {
                LOG.debug("partition-wise: min_count: {}, max_count: {}",
                        minLoaded.replicaCountPartition, maxLoaded.replicaCountPartition);
                LOG.debug("cluster-wise: min_count: {}, max_count: {}",
                        minLoaded.replicaCountTotal, maxLoaded.replicaCountTotal);
                LOG.debug("min_loaded_intersection: {}, max_loaded_intersection: {}",
                        minLoaded.intersection.toString(), maxLoaded.intersection.toString());
            }

            // Do not move replicas of a balanced table if the least (most) loaded
            // servers overall do not intersect the servers hosting the least (most)
            // replicas of the table. Moving a replica in that case might keep the
            // cluster skew the same or make it worse while keeping the table balanced.
            if ((maxLoaded.replicaCountPartition <= minLoaded.replicaCountPartition + 1)
                    && (minLoaded.intersection.isEmpty() || maxLoaded.intersection.isEmpty())) {
                continue;
            }

            Long minLoadedBe;
            Long maxLoadedBe;
            if (equalSkewOption == EqualSkewOption.PICK_FIRST) {
                // beWithExtremumCount lists & intersection lists are natural ordering
                minLoadedBe = minLoaded.intersection.isEmpty()
                        ? minLoaded.beWithExtremumCount.get(0) : minLoaded.intersection.get(0);
                maxLoadedBe = maxLoaded.intersection.isEmpty()
                        ? maxLoaded.beWithExtremumCount.get(0) : maxLoaded.intersection.get(0);
            } else {
                minLoadedBe = minLoaded.intersection.isEmpty() ? getRandomListElement(minLoaded.beWithExtremumCount)
                        : getRandomListElement(minLoaded.intersection);
                maxLoadedBe = maxLoaded.intersection.isEmpty() ? getRandomListElement(maxLoaded.beWithExtremumCount)
                        : getRandomListElement(maxLoaded.intersection);
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("min_loaded_be: {}, max_loaded_be: {}", minLoadedBe, maxLoadedBe);
            }
            if (minLoadedBe.equals(maxLoadedBe)) {
                // Nothing to move.
                continue;
            }
            // Move a replica of the selected partition from a most loaded server to a
            // least loaded server.
            move = new PartitionMove(pbi.partitionId, pbi.indexId, maxLoadedBe, minLoadedBe);
            break;
        }
        return move;
    }

    public static <T> T getRandomListElement(List<T> items) {
        Preconditions.checkArgument(!items.isEmpty());
        return items.get(rand.nextInt(items.size()));
    }

    public static IntersectionResult getIntersection(ExtremumType extremumType,
            TreeMultimap<Long, Long> beByReplicaCount, TreeMultimap<Long, Long> beByTotalReplicaCount) {
        Pair<Long, Set<Long>> beSelectedByPartition = getMinMaxLoadedServers(beByReplicaCount, extremumType);
        Pair<Long, Set<Long>> beSelectedByTotal = getMinMaxLoadedServers(beByTotalReplicaCount, extremumType);
        Preconditions.checkNotNull(beSelectedByPartition);
        Preconditions.checkNotNull(beSelectedByTotal);

        IntersectionResult res = new IntersectionResult();
        res.replicaCountPartition = beSelectedByPartition.first;
        res.replicaCountTotal = beSelectedByTotal.first;
        res.beWithExtremumCount = Lists.newArrayList(beSelectedByPartition.second);
        res.intersection = Lists.newArrayList(
                Sets.intersection(beSelectedByPartition.second, beSelectedByTotal.second));
        return res;
    }

    private static Pair<Long, Set<Long>> getMinMaxLoadedServers(
            TreeMultimap<Long, Long> multimap, ExtremumType extremumType) {
        if (multimap.isEmpty()) {
            return null;
        }
        Long count = (extremumType == ExtremumType.MIN) ? multimap.keySet().first() : multimap.keySet().last();
        return Pair.of(count, multimap.get(count));
    }

    /** Update the balance state in 'ClusterBalanceInfo'(the two maps) with the outcome of the move 'move'.
     * To support apply in-progress moves to current cluster balance info,
     * if apply failed, the maps should not be modified.
     */
    public static boolean applyMove(PartitionMove move, TreeMultimap<Long, Long> beByTotalReplicaCount,
                                    TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
        try {
            // Update the total counts
            moveOneReplica(move.fromBe, move.toBe, beByTotalReplicaCount);
        } catch (IllegalStateException e) {
            LOG.info("{} apply failed, {}", move, e.getMessage());
            return false;
        }

        try {
            PartitionBalanceInfo partitionBalanceInfo = null;
            Long skew = -1L;
            for (Long key : skewMap.keySet()) {
                NavigableSet<PartitionBalanceInfo> pbiSet = skewMap.get(key);
                List<PartitionBalanceInfo> pbis = pbiSet.stream()
                        .filter(info -> info.partitionId.equals(move.partitionId) && info.indexId.equals(move.indexId))
                        .collect(Collectors.toList());
                Preconditions.checkState(pbis.size() <= 1, "skew map has dup partition info");
                if (pbis.size() == 1) {
                    partitionBalanceInfo = pbis.get(0);
                    skew = key;
                    break;
                }
            }

            Preconditions.checkState(skew != -1L, "partition is not in skew map");
            PartitionBalanceInfo newInfo = new PartitionBalanceInfo(partitionBalanceInfo);
            moveOneReplica(move.fromBe, move.toBe, newInfo.beByReplicaCount);

            skewMap.remove(skew, partitionBalanceInfo);
            long minCount = newInfo.beByReplicaCount.keySet().first();
            long maxCount = newInfo.beByReplicaCount.keySet().last();
            skewMap.put(maxCount - minCount, newInfo);
        } catch (IllegalStateException e) {
            // If touch IllegalState, the skew map doesn't be modified,
            // so we should rollback the move of beByTotalReplicaCount
            moveOneReplica(move.toBe, move.fromBe, beByTotalReplicaCount);
            LOG.info("{} apply failed, {}", move, e.getMessage());
            return false;
        } catch (Exception e) {
            // Rollback the move of beByTotalReplicaCount is meaningless here
            LOG.warn("got unexpected exception when apply {}, the skew may be broken. {}", move, e.toString());
            throw e;
        }
        return true;
    }

    // Applies to 'm' a move of a replica from the be with id 'src' to the be with id 'dst' by decrementing
    // the count of 'src' and incrementing the count of 'dst'.
    // If check failed, won't modify the map.
    private static void moveOneReplica(Long fromBe, Long toBe,
            TreeMultimap<Long, Long> m) throws IllegalStateException {
        boolean foundSrc = false;
        boolean foundDst = false;
        Long countSrc = 0L;
        Long countDst = 0L;
        for (Long key : m.keySet()) {
            // set is arbitrary ordering, need to convert
            Set<Long> values = m.get(key);
            if (values.contains(fromBe)) {
                foundSrc = true;
                countSrc = key;
            }
            if (values.contains(toBe)) {
                foundDst = true;
                countDst = key;
            }
        }

        Preconditions.checkState(foundSrc, "fromBe " + fromBe + " is not in the map");
        Preconditions.checkState(foundDst, "toBe " + toBe + " is not in the map");
        Preconditions.checkState(countSrc > 0, "fromBe has no replica in the map, can't move");

        m.remove(countSrc, fromBe);
        m.remove(countDst, toBe);
        m.put(countSrc - 1, fromBe);
        m.put(countDst + 1, toBe);
    }
}