TwoDimensionalGreedyRebalanceAlgo.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
import org.apache.doris.common.Pair;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.security.SecureRandom;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition.
* It'll generate multiple `PartitionMove`, only decide which partition to move, fromBe, toBe.
* The next step is to select a tablet to move.
*
* <p>From among moves that decrease the skew of a most skewed partition,
* it prefers ones that reduce the skew of the cluster.
* A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
* The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
* minimum total replica count over all bes.
*
* This class is modified from kudu TwoDimensionalGreedyAlgo.
*/
public class TwoDimensionalGreedyRebalanceAlgo {
private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);
private final EqualSkewOption equalSkewOption;
private static final Random rand = new SecureRandom();
public static class PartitionMove {
Long partitionId;
Long indexId;
Long fromBe;
Long toBe;
PartitionMove(Long p, Long i, Long f, Long t) {
this.partitionId = p;
this.indexId = i;
this.fromBe = f;
this.toBe = t;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PartitionMove that = (PartitionMove) o;
return Objects.equal(partitionId, that.partitionId)
&& Objects.equal(indexId, that.indexId)
&& Objects.equal(fromBe, that.fromBe)
&& Objects.equal(toBe, that.toBe);
}
@Override
public int hashCode() {
return Objects.hashCode(partitionId, indexId, fromBe, toBe);
}
@Override
public String toString() {
return "ReplicaMove{"
+ "pid=" + partitionId + "-" + indexId
+ ", from=" + fromBe
+ ", to=" + toBe
+ '}';
}
}
public enum EqualSkewOption {
// generally only be used on unit test
PICK_FIRST,
PICK_RANDOM
}
public enum ExtremumType {
MAX,
MIN
}
public static class IntersectionResult {
Long replicaCountPartition;
Long replicaCountTotal;
List<Long> beWithExtremumCount;
List<Long> intersection;
}
TwoDimensionalGreedyRebalanceAlgo() {
this(EqualSkewOption.PICK_RANDOM);
}
TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
this.equalSkewOption = equalSkewOption;
}
// maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
// May modify the ClusterBalanceInfo
public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
Preconditions.checkArgument(maxMovesNum >= 0);
if (maxMovesNum == 0) {
maxMovesNum = Integer.MAX_VALUE;
}
if (info.partitionInfoBySkew.isEmpty()) {
// Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
// the partition skew, partition count for all the be should be 0.
// Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
if (LOG.isDebugEnabled()) {
LOG.debug(keySet);
}
return Lists.newArrayList();
}
NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
if (keySet.isEmpty() || keySet.last() == 0L) {
// the number of replica on specified medium we get from getReplicaNumByBeIdAndStorageMedium() is
// defined by table properties,but in fact there may not has SSD/HDD disk on this backend.
// So if we found that no SSD/HDD disk on this backend, set the replica number to 0,
// but the partitionInfoBySkew doesn't consider this scene, medium has no SSD/HDD disk also skew,
// cause rebalance exception
return Lists.newArrayList();
}
List<PartitionMove> moves = Lists.newArrayList();
for (int i = 0; i < maxMovesNum; ++i) {
PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
// 1. No replicas to move.
// 2. Apply to info failed, it's useless to get next move from the same info.
break;
}
moves.add(move);
}
return moves;
}
private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
PartitionMove move = null;
if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
return null;
}
long maxPartitionSkew = skewMap.keySet().last();
// don't make a global balance because beByTotalReplicaCount may contains tablets for other medium or tag
if (maxPartitionSkew <= 1L) {
return null;
}
// Among the partitions with maximum skew, attempt to pick a partition where there is
// a move that improves the partition skew and the cluster skew, if possible. If
// not, attempt to pick a move that improves the partition skew. If all partitions
// are balanced, attempt to pick a move that preserves partition balance and
// improves cluster skew.
NavigableSet<PartitionBalanceInfo> maxSet = skewMap.get(maxPartitionSkew);
for (PartitionBalanceInfo pbi : maxSet) {
Preconditions.checkArgument(!pbi.beByReplicaCount.isEmpty(), "no information on replicas of "
+ "partition " + pbi.partitionId + "-" + pbi.indexId);
Long minReplicaCount = pbi.beByReplicaCount.keySet().first();
Long maxReplicaCount = pbi.beByReplicaCount.keySet().last();
if (LOG.isDebugEnabled()) {
LOG.debug("balancing partition {}-{} with replica count skew {}"
+ " (min_replica_count: {}, max_replica_count: {})",
pbi.partitionId, pbi.indexId, maxPartitionSkew,
minReplicaCount, maxReplicaCount);
}
// Compute the intersection of the bes most loaded for the table
// with the bes most loaded overall, and likewise for least loaded.
// These are our ideal candidates for moving from and to, respectively.
IntersectionResult maxLoaded = getIntersection(ExtremumType.MAX,
pbi.beByReplicaCount, beByTotalReplicaCount);
IntersectionResult minLoaded = getIntersection(ExtremumType.MIN,
pbi.beByReplicaCount, beByTotalReplicaCount);
if (LOG.isDebugEnabled()) {
LOG.debug("partition-wise: min_count: {}, max_count: {}",
minLoaded.replicaCountPartition, maxLoaded.replicaCountPartition);
LOG.debug("cluster-wise: min_count: {}, max_count: {}",
minLoaded.replicaCountTotal, maxLoaded.replicaCountTotal);
LOG.debug("min_loaded_intersection: {}, max_loaded_intersection: {}",
minLoaded.intersection.toString(), maxLoaded.intersection.toString());
}
// Do not move replicas of a balanced table if the least (most) loaded
// servers overall do not intersect the servers hosting the least (most)
// replicas of the table. Moving a replica in that case might keep the
// cluster skew the same or make it worse while keeping the table balanced.
if ((maxLoaded.replicaCountPartition <= minLoaded.replicaCountPartition + 1)
&& (minLoaded.intersection.isEmpty() || maxLoaded.intersection.isEmpty())) {
continue;
}
Long minLoadedBe;
Long maxLoadedBe;
if (equalSkewOption == EqualSkewOption.PICK_FIRST) {
// beWithExtremumCount lists & intersection lists are natural ordering
minLoadedBe = minLoaded.intersection.isEmpty()
? minLoaded.beWithExtremumCount.get(0) : minLoaded.intersection.get(0);
maxLoadedBe = maxLoaded.intersection.isEmpty()
? maxLoaded.beWithExtremumCount.get(0) : maxLoaded.intersection.get(0);
} else {
minLoadedBe = minLoaded.intersection.isEmpty() ? getRandomListElement(minLoaded.beWithExtremumCount)
: getRandomListElement(minLoaded.intersection);
maxLoadedBe = maxLoaded.intersection.isEmpty() ? getRandomListElement(maxLoaded.beWithExtremumCount)
: getRandomListElement(maxLoaded.intersection);
}
if (LOG.isDebugEnabled()) {
LOG.debug("min_loaded_be: {}, max_loaded_be: {}", minLoadedBe, maxLoadedBe);
}
if (minLoadedBe.equals(maxLoadedBe)) {
// Nothing to move.
continue;
}
// Move a replica of the selected partition from a most loaded server to a
// least loaded server.
move = new PartitionMove(pbi.partitionId, pbi.indexId, maxLoadedBe, minLoadedBe);
break;
}
return move;
}
public static <T> T getRandomListElement(List<T> items) {
Preconditions.checkArgument(!items.isEmpty());
return items.get(rand.nextInt(items.size()));
}
public static IntersectionResult getIntersection(ExtremumType extremumType,
TreeMultimap<Long, Long> beByReplicaCount, TreeMultimap<Long, Long> beByTotalReplicaCount) {
Pair<Long, Set<Long>> beSelectedByPartition = getMinMaxLoadedServers(beByReplicaCount, extremumType);
Pair<Long, Set<Long>> beSelectedByTotal = getMinMaxLoadedServers(beByTotalReplicaCount, extremumType);
Preconditions.checkNotNull(beSelectedByPartition);
Preconditions.checkNotNull(beSelectedByTotal);
IntersectionResult res = new IntersectionResult();
res.replicaCountPartition = beSelectedByPartition.first;
res.replicaCountTotal = beSelectedByTotal.first;
res.beWithExtremumCount = Lists.newArrayList(beSelectedByPartition.second);
res.intersection = Lists.newArrayList(
Sets.intersection(beSelectedByPartition.second, beSelectedByTotal.second));
return res;
}
private static Pair<Long, Set<Long>> getMinMaxLoadedServers(
TreeMultimap<Long, Long> multimap, ExtremumType extremumType) {
if (multimap.isEmpty()) {
return null;
}
Long count = (extremumType == ExtremumType.MIN) ? multimap.keySet().first() : multimap.keySet().last();
return Pair.of(count, multimap.get(count));
}
/** Update the balance state in 'ClusterBalanceInfo'(the two maps) with the outcome of the move 'move'.
* To support apply in-progress moves to current cluster balance info,
* if apply failed, the maps should not be modified.
*/
public static boolean applyMove(PartitionMove move, TreeMultimap<Long, Long> beByTotalReplicaCount,
TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
try {
// Update the total counts
moveOneReplica(move.fromBe, move.toBe, beByTotalReplicaCount);
} catch (IllegalStateException e) {
LOG.info("{} apply failed, {}", move, e.getMessage());
return false;
}
try {
PartitionBalanceInfo partitionBalanceInfo = null;
Long skew = -1L;
for (Long key : skewMap.keySet()) {
NavigableSet<PartitionBalanceInfo> pbiSet = skewMap.get(key);
List<PartitionBalanceInfo> pbis = pbiSet.stream()
.filter(info -> info.partitionId.equals(move.partitionId) && info.indexId.equals(move.indexId))
.collect(Collectors.toList());
Preconditions.checkState(pbis.size() <= 1, "skew map has dup partition info");
if (pbis.size() == 1) {
partitionBalanceInfo = pbis.get(0);
skew = key;
break;
}
}
Preconditions.checkState(skew != -1L, "partition is not in skew map");
PartitionBalanceInfo newInfo = new PartitionBalanceInfo(partitionBalanceInfo);
moveOneReplica(move.fromBe, move.toBe, newInfo.beByReplicaCount);
skewMap.remove(skew, partitionBalanceInfo);
long minCount = newInfo.beByReplicaCount.keySet().first();
long maxCount = newInfo.beByReplicaCount.keySet().last();
skewMap.put(maxCount - minCount, newInfo);
} catch (IllegalStateException e) {
// If touch IllegalState, the skew map doesn't be modified,
// so we should rollback the move of beByTotalReplicaCount
moveOneReplica(move.toBe, move.fromBe, beByTotalReplicaCount);
LOG.info("{} apply failed, {}", move, e.getMessage());
return false;
} catch (Exception e) {
// Rollback the move of beByTotalReplicaCount is meaningless here
LOG.warn("got unexpected exception when apply {}, the skew may be broken. {}", move, e.toString());
throw e;
}
return true;
}
// Applies to 'm' a move of a replica from the be with id 'src' to the be with id 'dst' by decrementing
// the count of 'src' and incrementing the count of 'dst'.
// If check failed, won't modify the map.
private static void moveOneReplica(Long fromBe, Long toBe,
TreeMultimap<Long, Long> m) throws IllegalStateException {
boolean foundSrc = false;
boolean foundDst = false;
Long countSrc = 0L;
Long countDst = 0L;
for (Long key : m.keySet()) {
// set is arbitrary ordering, need to convert
Set<Long> values = m.get(key);
if (values.contains(fromBe)) {
foundSrc = true;
countSrc = key;
}
if (values.contains(toBe)) {
foundDst = true;
countDst = key;
}
}
Preconditions.checkState(foundSrc, "fromBe " + fromBe + " is not in the map");
Preconditions.checkState(foundDst, "toBe " + toBe + " is not in the map");
Preconditions.checkState(countSrc > 0, "fromBe has no replica in the map, can't move");
m.remove(countSrc, fromBe);
m.remove(countDst, toBe);
m.put(countSrc - 1, fromBe);
m.put(countDst + 1, toBe);
}
}