PartitionRebalancer.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
/*
* PartitionRebalancer will decrease the skew of partitions. The skew of the partition is defined as the difference
* between the maximum replica count of the partition over all bes and the minimum replica count over all bes.
* Only consider about the replica count for each partition, never consider the replica size(disk usage).
*
* We use TwoDimensionalGreedyRebalanceAlgo to get partition moves(one PartitionMove is <partition id, from be, to be>).
* It prefers a move that reduce the skew of the cluster when we want to rebalance a max skew partition.
*
* selectAlternativeTabletsForCluster() must set the tablet id, so we need to select tablet for each move in this phase
* (as TabletMove).
*/
public class PartitionRebalancer extends Rebalancer {
private static final Logger LOG = LogManager.getLogger(PartitionRebalancer.class);
private final TwoDimensionalGreedyRebalanceAlgo algo = new TwoDimensionalGreedyRebalanceAlgo();
private final MovesCacheMap movesCacheMap = new MovesCacheMap();
private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
super(infoService, invertedIndex, backendsWorkingSlots);
}
@Override
protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LoadStatisticForTag clusterStat, TStorageMedium medium) {
MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(clusterStat.getTag(), medium);
Preconditions.checkNotNull(movesInProgress,
"clusterStat is got from statisticMap, movesCacheMap should have the same entry");
// Iterating through Cache.asMap().values() does not reset access time for the entries you retrieve.
List<TabletMove> movesInProgressList = movesInProgress.get().asMap().values()
.stream().map(p -> p.first).collect(Collectors.toList());
List<Long> toDeleteKeys = Lists.newArrayList();
// The problematic movements will be found in buildClusterInfo(), so here is a simply move completion check
// of moves which have valid ToDeleteReplica.
List<TabletMove> movesNeedCheck = movesInProgress.get().asMap().values()
.stream().filter(p -> p.second != -1L).map(p -> p.first).collect(Collectors.toList());
checkMovesCompleted(movesNeedCheck, toDeleteKeys);
ClusterBalanceInfo clusterBalanceInfo = new ClusterBalanceInfo();
// We should assume the in-progress moves have been succeeded to avoid producing the same moves.
// Apply in-progress moves to current cluster stats, use TwoDimensionalGreedyAlgo.ApplyMove for simplicity.
if (!buildClusterInfo(clusterStat, medium, movesInProgressList, clusterBalanceInfo, toDeleteKeys)) {
return Lists.newArrayList();
}
// Just delete the completed or problematic moves
if (!toDeleteKeys.isEmpty()) {
movesInProgress.get().invalidateAll(toDeleteKeys);
movesInProgressList = movesInProgressList.stream()
.filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
}
// The balancing tasks of other cluster or medium might have failed. We use the upper limit value
// `total num of in-progress moves` to avoid useless selections.
if (movesCacheMap.size() > Config.max_balancing_tablets) {
if (LOG.isDebugEnabled()) {
LOG.debug("Total in-progress moves > {}", Config.max_balancing_tablets);
}
return Lists.newArrayList();
}
NavigableSet<Long> skews = clusterBalanceInfo.partitionInfoBySkew.keySet();
if (LOG.isDebugEnabled()) {
LOG.debug("Medium {}: peek max skew {}, assume {} in-progress moves are succeeded {}", medium,
skews.isEmpty() ? 0 : skews.last(), movesInProgressList.size(), movesInProgressList);
}
List<TwoDimensionalGreedyRebalanceAlgo.PartitionMove> moves
= algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection);
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
Set<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toSet());
Random rand = new SecureRandom();
for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
// Find all tablets of the specified partition that would have a replica at the source be,
// but would not have a replica at the destination be. That is to satisfy the restriction
// of having no more than one replica of the same tablet per be.
List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
if (tabletIds.isEmpty()) {
continue;
}
Set<Long> invalidIds = Sets.newHashSet(
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));
BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> {
return canBalanceTablet(tabletMeta)
&& tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId
&& !invalidIds.contains(tabletId)
&& !inProgressIds.contains(tabletId);
};
// Random pick one candidate to create tabletSchedCtx
int startIdx = rand.nextInt(tabletIds.size());
long pickedTabletId = -1L;
TabletMeta pickedTabletMeta = null;
for (int i = startIdx; i < tabletIds.size(); i++) {
long tabletId = tabletIds.get(i);
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (canMoveTablet.test(tabletId, tabletMeta)) {
pickedTabletId = tabletId;
pickedTabletMeta = tabletMeta;
break;
}
}
if (pickedTabletId == -1L) {
for (int i = 0; i < startIdx; i++) {
long tabletId = tabletIds.get(i);
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (canMoveTablet.test(tabletId, tabletMeta)) {
pickedTabletId = tabletId;
pickedTabletMeta = tabletMeta;
break;
}
}
}
if (pickedTabletId == -1L) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cann't picked tablet id for move {}", move);
}
continue;
}
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
pickedTabletMeta.getDbId(), pickedTabletMeta.getTableId(), pickedTabletMeta.getPartitionId(),
pickedTabletMeta.getIndexId(), pickedTabletId, null /* replica alloc is not used for balance*/,
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// Balance task's priority is always LOW
tabletCtx.setPriority(TabletSchedCtx.Priority.LOW);
alternativeTablets.add(tabletCtx);
// Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
movesInProgress.get().put(pickedTabletId,
Pair.of(new TabletMove(pickedTabletId, move.fromBe, move.toBe), -1L));
counterBalanceMoveCreated.incrementAndGet();
// Synchronize with movesInProgress
inProgressIds.add(pickedTabletId);
}
if (moves.isEmpty()) {
// Balanced cluster should not print too much log messages, so we log it with level debug.
if (LOG.isDebugEnabled()) {
LOG.debug("Medium {}: cluster is balanced.", medium);
}
} else {
LOG.info("Medium {}: get {} moves, actually select {} alternative tablets to move. Tablets detail: {}",
medium, moves.size(), alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
}
return alternativeTablets;
}
private boolean buildClusterInfo(LoadStatisticForTag clusterStat, TStorageMedium medium,
List<TabletMove> movesInProgress, ClusterBalanceInfo info, List<Long> toDeleteKeys) {
Preconditions.checkState(info.beByTotalReplicaCount.isEmpty() && info.partitionInfoBySkew.isEmpty(), "");
// If we wanna modify the PartitionBalanceInfo in info.beByTotalReplicaCount, deep-copy it
info.beByTotalReplicaCount.putAll(clusterStat.getBeByTotalReplicaMap(medium));
info.partitionInfoBySkew.putAll(clusterStat.getSkewMap(medium));
// Skip the toDeleteKeys
List<TabletMove> filteredMoves = movesInProgress.stream()
.filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
for (TabletMove move : filteredMoves) {
TabletMeta meta = invertedIndex.getTabletMeta(move.tabletId);
if (meta == null) {
// Move's tablet is invalid, need delete it
toDeleteKeys.add(move.tabletId);
continue;
}
TwoDimensionalGreedyRebalanceAlgo.PartitionMove partitionMove
= new TwoDimensionalGreedyRebalanceAlgo.PartitionMove(
meta.getPartitionId(), meta.getIndexId(), move.fromBe, move.toBe);
boolean st = TwoDimensionalGreedyRebalanceAlgo.applyMove(
partitionMove, info.beByTotalReplicaCount, info.partitionInfoBySkew);
if (!st) {
// Can't apply this move, mark it failed, continue to apply the next.
toDeleteKeys.add(move.tabletId);
}
}
return true;
}
private void checkMovesCompleted(List<TabletMove> moves, List<Long> toDeleteKeys) {
boolean moveIsComplete;
for (TabletMove move : moves) {
moveIsComplete = checkMoveCompleted(move);
// If the move was completed, remove it
if (moveIsComplete) {
toDeleteKeys.add(move.tabletId);
if (LOG.isDebugEnabled()) {
LOG.debug("Move {} is completed. The cur dist: {}", move,
invertedIndex.getReplicasByTabletId(move.tabletId).stream()
.map(Replica::getBackendIdWithoutException).collect(Collectors.toList()));
}
counterBalanceMoveSucceeded.incrementAndGet();
}
}
}
// Move completed: fromBe doesn't have a replica and toBe has a replica
private boolean checkMoveCompleted(TabletMove move) {
Long tabletId = move.tabletId;
List<Long> bes = invertedIndex.getReplicasByTabletId(tabletId).stream()
.map(Replica::getBackendIdWithoutException).collect(Collectors.toList());
return !bes.contains(move.fromBe) && bes.contains(move.toBe);
}
@Override
protected void completeSchedCtx(TabletSchedCtx tabletCtx)
throws SchedException {
MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(tabletCtx.getTag(),
tabletCtx.getStorageMedium());
Preconditions.checkNotNull(movesInProgress,
"clusterStat is got from statisticMap, movesInProgressMap should have the same entry");
try {
Pair<TabletMove, Long> pair = movesInProgress.get().getIfPresent(tabletCtx.getTabletId());
Preconditions.checkNotNull(pair, "No cached move for tablet: " + tabletCtx.getTabletId());
TabletMove move = pair.first;
checkMoveValidation(move);
// Check src replica's validation
Replica srcReplica = tabletCtx.getTablet().getReplicaByBackendId(move.fromBe);
Preconditions.checkNotNull(srcReplica);
long beId = srcReplica.getBackendIdWithoutException();
TabletScheduler.PathSlot slot = backendsWorkingSlots.get(beId);
Preconditions.checkNotNull(slot, "unable to get fromBe "
+ beId + " slot");
if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
tabletCtx.setSrc(srcReplica);
} else {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"no slot for src replica " + srcReplica + ", pathHash " + srcReplica.getPathHash());
}
// Choose a path in destination
LoadStatisticForTag loadStat = statisticMap.get(tabletCtx.getTag());
Preconditions.checkNotNull(loadStat, "tag does not exist: " + tabletCtx.getTag());
BackendLoadStatistic beStat = loadStat.getBackendLoadStatistic(move.toBe);
Preconditions.checkNotNull(beStat);
slot = backendsWorkingSlots.get(move.toBe);
Preconditions.checkNotNull(slot, "unable to get slot of toBe " + move.toBe);
List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getTag(),
tabletCtx.getStorageMedium());
if (pathHash == -1) {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
}
tabletCtx.setDest(beStat.getBeId(), pathHash);
// ToDeleteReplica is the source replica
pair.second = srcReplica.getId();
} catch (IllegalStateException | NullPointerException e) {
// Problematic move should be invalidated immediately
movesInProgress.get().invalidate(tabletCtx.getTabletId());
throw new SchedException(SchedException.Status.UNRECOVERABLE, e.getMessage());
}
}
// The validation check cannot be accurate, cuz the production of moves do have ordering.
// If some moves failed, the cluster & partition skew is different to the skew when we getNextMove.
// So we can't do skew check.
// Just do some basic checks, e.g. server available.
private void checkMoveValidation(TabletMove move) throws IllegalStateException {
boolean fromAvailable = infoService.checkBackendScheduleAvailable(move.fromBe);
boolean toAvailable = infoService.checkBackendScheduleAvailable(move.toBe);
Preconditions.checkState(fromAvailable && toAvailable,
move + "'s bes are not all available: from " + fromAvailable + ", to " + toAvailable);
// To be improved
}
@Override
public void onTabletFailed(TabletSchedCtx tabletCtx) {
movesCacheMap.invalidateTablet(tabletCtx);
}
@Override
public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
// We don't invalidate the cached move here, cuz the redundant repair progress is just started.
// The move should be invalidated by TTL or Algo.CheckMoveCompleted()
Pair<TabletMove, Long> pair = movesCacheMap.getTabletMove(tabletCtx);
if (pair != null) {
//Preconditions.checkState(pair.second != -1L);
return pair.second;
} else {
return (long) -1;
}
}
@Override
public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
movesCacheMap.invalidateTablet(tabletCtx);
}
@Override
public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> statisticMap) {
super.updateLoadStatistic(statisticMap);
movesCacheMap.updateMapping(statisticMap, Config.partition_rebalance_move_expire_after_access);
// Perform cache maintenance
movesCacheMap.maintain();
if (LOG.isDebugEnabled()) {
LOG.debug("Move succeeded/total :{}/{}, current {}",
counterBalanceMoveSucceeded.get(), counterBalanceMoveCreated.get(), movesCacheMap);
}
}
public Map<Long, Pair<TabletMove, Long>> getMovesInProgress() {
Map<Long, Pair<TabletMove, Long>> moves = Maps.newHashMap();
movesCacheMap.getCacheMap().values().forEach(
m -> m.values().forEach(cache -> moves.putAll(cache.get().asMap())));
return moves;
}
// Represents a concrete move of a tablet from one be to another.
// Formed logically from a PartitionMove by specifying a tablet for the move.
public static class TabletMove {
public Long tabletId;
public Long fromBe;
public Long toBe;
TabletMove(Long id, Long from, Long to) {
this.tabletId = id;
this.fromBe = from;
this.toBe = to;
}
@Override
public String toString() {
return "ReplicaMove{"
+ "tabletId=" + tabletId
+ ", fromBe=" + fromBe
+ ", toBe=" + toBe
+ '}';
}
}
// Balance information for a cluster(one medium), excluding decommissioned/dead bes and replicas on them.
// Natural ordering, so the last key is the max key.
public static class ClusterBalanceInfo {
TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo> partitionInfoBySkew
= TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
@Override
public String toString() {
return "[partitionSkew=" + partitionInfoBySkew + ", totalReplicaNum2Be=" + beByTotalReplicaCount + "]";
}
}
}