DiskRebalancer.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.BalanceType;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
/*
* This DiskBalancer is different from other Balancers which takes care of cluster-wide data balancing.
* This DiskBalancer chooses a backend and moves tablet from one disk to another.
* DiskRebalancer strategy:
* 1. only works while the cluster is balanced(which means the cluster has no high and mid load backends)
* 1.1 if user has given prio backends, then select tablets from prio backends no matter cluster is balanced or not.
* 2. selecting alternative tablets from mid load backends, and return them to tablet scheduler.
* 3. given a tablet which has src path(disk), find a path(disk) to migration.
*/
public class DiskRebalancer extends Rebalancer {
private static final Logger LOG = LogManager.getLogger(DiskRebalancer.class);
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
super(infoService, invertedIndex, backendsWorkingSlots);
canBalanceColocateTable = true;
}
public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) {
List<BackendLoadStatistic> stats = Lists.newArrayList();
for (BackendLoadStatistic backend : bes) {
long backendId = backend.getBeId();
Long timeoutS = prioBackends.getOrDefault(backendId, 0L);
if (timeoutS != 0) {
if (timeoutS > System.currentTimeMillis()) {
// remove backends from prio if timeout
prioBackends.remove(backendId);
continue;
}
stats.add(backend);
}
}
return stats;
}
// true means BE has low and high paths for balance after reclassification
private boolean checkAndReclassifyPaths(Set<Long> pathLow, Set<Long> pathMid, Set<Long> pathHigh) {
if (pathLow.isEmpty() && pathHigh.isEmpty()) {
// balanced
return false;
}
if (pathLow.isEmpty()) {
// mid => low
pathLow.addAll(pathMid);
} else if (pathHigh.isEmpty()) {
// mid => high
pathHigh.addAll(pathMid);
}
if (pathLow.isEmpty() || pathHigh.isEmpty()) {
// check again
return false;
}
return true;
}
/*
* Try to select alternative tablets to balance the disks.
* 1. Classify the backend into low, mid and high class by load score.
* 2. Try to select tablets from mid load backends.
* 1. Here we only select alternative tablets, without considering selected tablets' status,
* and whether it is benefit for balance (All these will be checked in tablet scheduler)
* 2. Only select tablets from 'mid' backends.
* 3. Only select tablets from 'high' paths.
* 3. Try to select tablets from prio backends.
*
* Here we only select tablets from mid load node, do not set its dest, all this will be set
* when this tablet is being scheduled in tablet scheduler.
*
* NOTICE that we may select any available tablets here, ignore their state.
* The state will be checked when being scheduled in tablet scheduler.
*/
@Override
protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LoadStatisticForTag clusterStat, TStorageMedium medium) {
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
// get classification of backends
List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
List<BackendLoadStatistic> midBEs = Lists.newArrayList();
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
Rebalancer rebalancer = FeConstants.runningUnitTest ? null
: Env.getCurrentEnv().getTabletScheduler().getRebalancer();
if (rebalancer != null && rebalancer.unPickOverLongTime(clusterStat.getTag(), medium)) {
midBEs.addAll(lowBEs);
midBEs.addAll(highBEs);
lowBEs.clear();
highBEs.clear();
}
if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
// the cluster is not balanced
if (prioBackends.isEmpty()) {
LOG.info("cluster is not balanced with medium: {}. skip", medium);
return alternativeTablets;
} else {
// prioBEs are not empty, we only schedule prioBEs' disk balance task
midBEs.addAll(lowBEs);
midBEs.addAll(highBEs);
midBEs = filterByPrioBackends(midBEs);
}
}
// first we should check if mid backends is available.
// if all mid backends is not available, we should not start balance
if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
if (LOG.isDebugEnabled()) {
LOG.debug("all mid load backends is dead: {} with medium: {}. skip",
midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
}
return alternativeTablets;
}
if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) {
LOG.info("all mid load backends {} have no available disk with medium: {}. skip",
midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
return alternativeTablets;
}
Set<Long> alternativeTabletIds = Sets.newHashSet();
Set<Long> unbalancedBEs = Sets.newHashSet();
// choose tablets from backends randomly.
Collections.shuffle(midBEs);
for (int i = midBEs.size() - 1; i >= 0; i--) {
BackendLoadStatistic beStat = midBEs.get(i);
PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
if (pathSlot == null) {
continue;
}
// classify the paths.
Set<Long> pathLow = Sets.newHashSet();
Set<Long> pathMid = Sets.newHashSet();
Set<Long> pathHigh = Sets.newHashSet();
// we only select tablets from available high load path
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
// check if BE has low and high paths for balance after reclassification
if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
continue;
}
// get all tablets on this backend, and shuffle them for random selection
List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium);
Collections.shuffle(tabletIds);
// for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets
Map<Long, Integer> remainingPaths = Maps.newHashMap();
for (Long pathHash : pathHigh) {
int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash);
if (availBalanceNum > 0) {
remainingPaths.put(pathHash, availBalanceNum);
}
}
if (remainingPaths.isEmpty()) {
continue;
}
// select tablet from shuffled tablets
for (Long tabletId : tabletIds) {
if (alternativeTabletIds.contains(tabletId)) {
continue;
}
if (!Config.enable_disk_balance_for_single_replica
&& invertedIndex.getReplicasByTabletId(tabletId).size() <= 1) {
continue;
}
Replica replica = null;
try {
replica = invertedIndex.getReplica(tabletId, beStat.getBeId());
} catch (IllegalStateException e) {
continue;
}
if (replica == null) {
continue;
}
// ignore empty replicas as they do not make disk more balance. (disk usage)
if (replica.getDataSize() == 0) {
continue;
}
// backend not support migrate cooldown tablets
TUniqueId cooldownMetaId = replica.getCooldownMetaId();
if (cooldownMetaId != null && (cooldownMetaId.getLo() != 0 || cooldownMetaId.getHi() != 0)) {
continue;
}
// check if replica's is on 'high' path.
// and only select it if the selected tablets num of this path
// does not exceed the limit (BALANCE_SLOT_NUM_FOR_PATH).
long replicaPathHash = replica.getPathHash();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (!canBalanceTablet(tabletMeta)) {
continue;
}
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/,
System.currentTimeMillis());
// we set temp src here to simplify completeSchedCtx method, and avoid take slot here
tabletCtx.setTempSrc(replica);
tabletCtx.setTag(clusterStat.getTag());
if (prioBackends.containsKey(beStat.getBeId())) {
// priority of balance task of prio BE is NORMAL
tabletCtx.setPriority(Priority.NORMAL);
} else {
// balance task's default priority is LOW
tabletCtx.setPriority(Priority.LOW);
}
// we must set balanceType to DISK_BALANCE for create migration task
tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);
alternativeTablets.add(tabletCtx);
alternativeTabletIds.add(tabletId);
unbalancedBEs.add(beStat.getBeId());
// update remaining paths
int remaining = remainingPaths.get(replicaPathHash) - 1;
if (remaining <= 0) {
remainingPaths.remove(replicaPathHash);
} else {
remainingPaths.put(replicaPathHash, remaining);
}
}
}
} // end for mid backends
// remove balanced BEs from prio backends
prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id));
if (!alternativeTablets.isEmpty()) {
LOG.info("select alternative tablets, medium: {}, num: {}, detail: {}",
medium, alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
}
return alternativeTablets;
}
/*
* Create a StorageMediaMigrationTask of this selected tablet for balance.
* 1. Check if the cluster is balanced. if not, the balance will be cancelled.
* 2. Check if the src replica still on high load path. If not, the balance will be cancelled.
* 3. Select a low load path from this backend as destination.
*/
@Override
public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
if (clusterStat == null) {
throw new SchedException(Status.UNRECOVERABLE,
String.format("tag %s does not exist", tabletCtx.getTag()));
}
if (tabletCtx.getTempSrcBackendId() == -1 || tabletCtx.getTempSrcPathHash() == -1) {
throw new SchedException(Status.UNRECOVERABLE,
"src does not appear to be set correctly, something goes wrong");
}
Replica replica = null;
try {
replica = invertedIndex.getReplica(tabletCtx.getTabletId(), tabletCtx.getTempSrcBackendId());
} catch (IllegalStateException e) {
replica = null;
}
// check src replica still there
if (replica == null || replica.getPathHash() != tabletCtx.getTempSrcPathHash()) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "src replica may be rebalanced");
}
// ignore empty replicas as they do not make disk more balance
if (replica.getDataSize() == 0) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "size of src replica is zero");
}
long beId = replica.getBackendIdWithoutException();
// check src slot
PathSlot slot = backendsWorkingSlots.get(beId);
if (slot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("BE does not have slot: {}", beId);
}
throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot");
}
long pathHash = slot.takeBalanceSlot(replica.getPathHash());
if (pathHash == -1) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to take src slot");
}
// after take src slot, we can set src replica now
tabletCtx.setSrc(replica);
BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(beId);
if (!beStat.isAvailable()) {
throw new SchedException(Status.UNRECOVERABLE, "the backend is not available");
}
// classify the paths.
// If src path is 'high', then we can select path from 'low' and 'mid'
// If src path is 'mid', then we can only select path from 'low'
// If src path is 'low', then we have nothing to do
Set<Long> pathLow = Sets.newHashSet();
Set<Long> pathMid = Sets.newHashSet();
Set<Long> pathHigh = Sets.newHashSet();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
if (pathHigh.contains(replica.getPathHash())) {
pathLow.addAll(pathMid);
} else if (!pathMid.contains(replica.getPathHash())) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "src path is low load");
}
// check if this migration task can make the be's disks more balance.
List<RootPathLoadStatistic> availPaths = Lists.newArrayList();
BalanceStatus bs;
if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths,
false /* not supplement */)) != BalanceStatus.OK) {
if (LOG.isDebugEnabled()) {
LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs());
}
throw new SchedException(Status.UNRECOVERABLE, "tablet not fit in BE");
}
// Select a low load path as destination.
boolean setDest = false;
for (RootPathLoadStatistic stat : availPaths) {
// check if avail path is src path
if (stat.getPathHash() == replica.getPathHash()) {
continue;
}
// check if avail path is low path
if (!pathLow.contains(stat.getPathHash())) {
if (LOG.isDebugEnabled()) {
LOG.debug("the path :{} is not low load", stat.getPathHash());
}
continue;
}
if (!beStat.isMoreBalanced(tabletCtx.getSrcPathHash(), stat.getPathHash(),
tabletCtx.getTabletId(), tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) {
if (LOG.isDebugEnabled()) {
LOG.debug("the path :{} can not make more balance", stat.getPathHash());
}
continue;
}
long destPathHash = slot.takeBalanceSlot(stat.getPathHash());
if (destPathHash == -1) {
continue;
}
tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath());
setDest = true;
break;
}
if (!setDest) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to find low load path");
}
}
}