ColocateTableCheckerAndBalancer.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletHealth;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletChecker.CheckerCounter;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* ColocateTableBalancer is responsible for tablets' repair and balance of colocated tables.
*/
public class ColocateTableCheckerAndBalancer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(ColocateTableCheckerAndBalancer.class);
private ColocateTableCheckerAndBalancer(long intervalMs) {
super("colocate group clone checker", intervalMs);
}
private static volatile ColocateTableCheckerAndBalancer INSTANCE = null;
public static ColocateTableCheckerAndBalancer getInstance() {
if (INSTANCE == null) {
synchronized (ColocateTableCheckerAndBalancer.class) {
if (INSTANCE == null) {
INSTANCE = new ColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
}
}
}
return INSTANCE;
}
public static class BucketStatistic {
public int tabletOrderIdx;
public int totalReplicaNum;
public long totalReplicaDataSize;
public BucketStatistic(int tabletOrderIdx, int totalReplicaNum, long totalReplicaDataSize) {
this.tabletOrderIdx = tabletOrderIdx;
this.totalReplicaNum = totalReplicaNum;
this.totalReplicaDataSize = totalReplicaDataSize;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof BucketStatistic)) {
return false;
}
BucketStatistic other = (BucketStatistic) obj;
return tabletOrderIdx == other.tabletOrderIdx && totalReplicaNum == other.totalReplicaNum
&& totalReplicaDataSize == other.totalReplicaDataSize;
}
@Override
public String toString() {
return "{ orderIdx: " + tabletOrderIdx + ", total replica num: " + totalReplicaNum
+ ", total data size: " + totalReplicaDataSize + " }";
}
}
public static class BackendBuckets {
private long beId;
private Map<GroupId, List<Integer>> groupTabletOrderIndices = Maps.newHashMap();
public BackendBuckets(long beId) {
this.beId = beId;
}
// for test
public Map<GroupId, List<Integer>> getGroupTabletOrderIndices() {
return groupTabletOrderIndices;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof BackendBuckets)) {
return false;
}
BackendBuckets other = (BackendBuckets) obj;
return beId == other.beId && groupTabletOrderIndices.equals(other.groupTabletOrderIndices);
}
@Override
public String toString() {
return "{ backendId: " + beId + ", group order index: " + groupTabletOrderIndices + " }";
}
public void addGroupTablet(GroupId groupId, int tabletOrderIdx) {
List<Integer> indices = groupTabletOrderIndices.get(groupId);
if (indices == null) {
indices = Lists.newArrayList();
groupTabletOrderIndices.put(groupId, indices);
}
indices.add(tabletOrderIdx);
}
public void removeGroupTablet(GroupId groupId, int tabletOrderIdx) {
List<Integer> indices = groupTabletOrderIndices.get(groupId);
if (indices == null) {
return;
}
indices.remove(Integer.valueOf(tabletOrderIdx));
if (indices.isEmpty()) {
groupTabletOrderIndices.remove(groupId);
}
}
public boolean containsGroupTablet(GroupId groupId, int tabletOrderIdx) {
List<Integer> indices = groupTabletOrderIndices.get(groupId);
if (indices == null) {
return false;
}
return indices.indexOf(Integer.valueOf(tabletOrderIdx)) >= 0;
}
public int getTotalReplicaNum(Map<GroupId, List<BucketStatistic>> allGroupBucketsMap) {
int totalReplicaNum = 0;
for (Map.Entry<GroupId, List<Integer>> entry : groupTabletOrderIndices.entrySet()) {
List<BucketStatistic> bucketStatistics = allGroupBucketsMap.get(entry.getKey());
if (bucketStatistics != null) {
for (int tabletOrderIdx : entry.getValue()) {
if (tabletOrderIdx < bucketStatistics.size()) {
totalReplicaNum += bucketStatistics.get(tabletOrderIdx).totalReplicaNum;
}
}
}
}
return totalReplicaNum;
}
public long getTotalReplicaDataSize(Map<GroupId, List<BucketStatistic>> allGroupBucketsMap) {
long totalReplicaDataSize = 0;
for (Map.Entry<GroupId, List<Integer>> entry : groupTabletOrderIndices.entrySet()) {
List<BucketStatistic> bucketStatistics = allGroupBucketsMap.get(entry.getKey());
if (bucketStatistics != null) {
for (int tabletOrderIdx : entry.getValue()) {
if (tabletOrderIdx < bucketStatistics.size()) {
totalReplicaDataSize += bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
}
}
}
}
return totalReplicaDataSize;
}
public int getTotalBucketsNum() {
return groupTabletOrderIndices.values().stream().mapToInt(indices -> indices.size()).sum();
}
public int getGroupBucketsNum(GroupId groupId) {
List<Integer> indices = groupTabletOrderIndices.get(groupId);
if (indices == null) {
return 0;
} else {
return indices.size();
}
}
}
public static class GlobalColocateStatistic {
private Map<Long, BackendBuckets> backendBucketsMap = Maps.newHashMap();
private Map<GroupId, List<BucketStatistic>> allGroupBucketsMap = Maps.newHashMap();
private Map<Tag, Integer> allTagBucketNum = Maps.newHashMap();
private static final BackendBuckets DUMMY_BE = new BackendBuckets(0);
public GlobalColocateStatistic() {
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof GlobalColocateStatistic)) {
return false;
}
GlobalColocateStatistic other = (GlobalColocateStatistic) obj;
return backendBucketsMap.equals(other.backendBucketsMap)
&& allGroupBucketsMap.equals(other.allGroupBucketsMap)
&& allTagBucketNum.equals(other.allTagBucketNum);
}
@Override
public String toString() {
return "{ backends: " + backendBucketsMap + ", groups: " + allGroupBucketsMap
+ ", tag bucket num: " + allTagBucketNum + " }";
}
Map<Long, BackendBuckets> getBackendBucketsMap() {
return backendBucketsMap;
}
Map<GroupId, List<BucketStatistic>> getAllGroupBucketsMap() {
return allGroupBucketsMap;
}
Map<Tag, Integer> getAllTagBucketNum() {
return allTagBucketNum;
}
public boolean moveTablet(GroupId groupId, int tabletOrderIdx,
long srcBeId, long destBeId) {
BackendBuckets srcBackendBuckets = backendBucketsMap.get(srcBeId);
if (srcBackendBuckets == null || !srcBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) {
return false;
}
BackendBuckets destBackendBuckets = backendBucketsMap.get(destBeId);
if (destBackendBuckets == null) {
destBackendBuckets = new BackendBuckets(destBeId);
backendBucketsMap.put(destBeId, destBackendBuckets);
}
if (destBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) {
return false;
}
srcBackendBuckets.removeGroupTablet(groupId, tabletOrderIdx);
destBackendBuckets.addGroupTablet(groupId, tabletOrderIdx);
if (srcBackendBuckets.getTotalBucketsNum() == 0) {
backendBucketsMap.remove(srcBeId);
}
return true;
}
public int getBackendTotalBucketNum(long backendId) {
return backendBucketsMap.getOrDefault(backendId, DUMMY_BE).getTotalBucketsNum();
}
public long getBackendTotalReplicaDataSize(long backendId) {
return backendBucketsMap.getOrDefault(backendId, DUMMY_BE)
.getTotalReplicaDataSize(allGroupBucketsMap);
}
public long getBucketTotalReplicaDataSize(GroupId groupId, int tabletOrderIdx) {
List<BucketStatistic> bucketStatistics = allGroupBucketsMap.get(groupId);
if (bucketStatistics != null && tabletOrderIdx < bucketStatistics.size()) {
return bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
} else {
return 0L;
}
}
public void addGroup(GroupId groupId, ReplicaAllocation replicaAlloc, List<Set<Long>> backendBucketsSeq,
List<Long> totalReplicaDataSizes, int totalReplicaNumPerBucket) {
Preconditions.checkState(backendBucketsSeq.size() == totalReplicaDataSizes.size(),
backendBucketsSeq.size() + " vs. " + totalReplicaDataSizes.size());
List<BucketStatistic> bucketStatistics = Lists.newArrayList();
for (int tabletOrderIdx = 0; tabletOrderIdx < backendBucketsSeq.size(); tabletOrderIdx++) {
BucketStatistic bucket = new BucketStatistic(tabletOrderIdx, totalReplicaNumPerBucket,
totalReplicaDataSizes.get(tabletOrderIdx));
bucketStatistics.add(bucket);
for (long backendId : backendBucketsSeq.get(tabletOrderIdx)) {
BackendBuckets backendBuckets = backendBucketsMap.get(backendId);
if (backendBuckets == null) {
backendBuckets = new BackendBuckets(backendId);
backendBucketsMap.put(backendId, backendBuckets);
}
backendBuckets.addGroupTablet(groupId, tabletOrderIdx);
}
}
int bucketNum = backendBucketsSeq.size();
replicaAlloc.getAllocMap().forEach((tag, count) -> {
allTagBucketNum.put(tag, allTagBucketNum.getOrDefault(tag, 0) + bucketNum * count);
});
allGroupBucketsMap.put(groupId, bucketStatistics);
}
}
@Override
/*
* Each round, we do 2 steps:
* 1. Relocate and balance group:
* Backend is not available, find a new backend to replace it.
* and after all unavailable has been replaced, balance the group
*
* 2. Match group:
* If replica mismatch backends in a group, that group will be marked as unstable, and pass that
* tablet to TabletScheduler.
* Otherwise, mark the group as stable
*/
public void runAfterCatalogReady() {
relocateAndBalanceGroups();
matchGroups();
}
/*
* relocate and balance group
* here we just let replicas in colocate table evenly distributed in cluster, not consider the
* cluster load statistic.
* for example:
* currently there are 4 backends A B C D with following load:
*
* +-+
* | |
* +-+ +-+ +-+ | |
* | | | | | | | |
* +-+ +-+ +-+ +-+
* A B C D
*
* And colocate group balancer will still evenly distribute the replicas to all 4 backends, not
* just 3 low load backends.
*
* X
* X
* X X X +-+
* X X X | |
* +-+ +-+ +-+ | |
* | | | | | | | |
* +-+ +-+ +-+ +-+
* A B C D
*
* So After colocate balance, the cluster may still 'unbalanced' from a global perspective.
* And the LoadBalancer will balance the non-colocate table's replicas to make the
* cluster balance, eventually.
*
* X X X X
* X X X X
* +-+ +-+ +-+ +-+
* | | | | | | | |
* | | | | | | | |
* +-+ +-+ +-+ +-+
* A B C D
*/
private void relocateAndBalanceGroups() {
Set<GroupId> groupIds = Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds();
// balance only inside each group, excluded balance between all groups
Set<GroupId> changeGroups = relocateAndBalanceGroup(groupIds, false);
if (!Config.disable_colocate_balance_between_groups
&& !changeGroups.isEmpty()) {
// balance both inside each group and between all groups
relocateAndBalanceGroup(changeGroups, true);
}
}
private Set<GroupId> relocateAndBalanceGroup(Set<GroupId> groupIds, boolean balanceBetweenGroups) {
Set<GroupId> changeGroups = Sets.newHashSet();
if (Config.disable_colocate_balance) {
return changeGroups;
}
Env env = Env.getCurrentEnv();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
SystemInfoService infoService = Env.getCurrentSystemInfo();
GlobalColocateStatistic globalColocateStatistic = buildGlobalColocateStatistic();
// get all groups
for (GroupId groupId : groupIds) {
Map<Tag, LoadStatisticForTag> statisticMap = env.getTabletScheduler().getStatisticMap();
if (statisticMap == null) {
continue;
}
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
if (groupSchema == null) {
LOG.info("Not found colocate group {}, maybe delete", groupId);
continue;
}
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
try {
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
} catch (DdlException e) {
colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
continue;
}
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
Tag tag = entry.getKey();
LoadStatisticForTag statistic = statisticMap.get(tag);
if (statistic == null) {
continue;
}
List<List<Long>> backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeqByTag(groupId, tag);
if (backendsPerBucketSeq.isEmpty()) {
continue;
}
// get all unavailable backends in the backend bucket sequence of this group
Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(
infoService, colocateIndex, groupId, tag);
// get all available backends for this group
Set<Long> beIdsInOtherTag = colocateIndex.getBackendIdsExceptForTag(groupId, tag);
List<Long> availableBeIds = getAvailableBeIds(tag, beIdsInOtherTag,
infoService);
// try relocate or balance this group for specified tag
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex,
infoService, statistic, globalColocateStatistic, balancedBackendsPerBucketSeq,
balanceBetweenGroups)) {
if (!colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, balancedBackendsPerBucketSeq,
replicaAlloc)) {
LOG.warn("relocate group {} succ, but replica allocation has change, old replica alloc {}",
groupId, replicaAlloc);
continue;
}
changeGroups.add(groupId);
Map<Tag, List<List<Long>>> balancedBackendsPerBucketSeqMap = Maps.newHashMap();
balancedBackendsPerBucketSeqMap.put(tag, balancedBackendsPerBucketSeq);
ColocatePersistInfo info = ColocatePersistInfo
.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeqMap);
env.getEditLog().logColocateBackendsPerBucketSeq(info);
LOG.info("balance group {}. now backends per bucket sequence for tag {} is: {}",
groupId, tag, balancedBackendsPerBucketSeq);
}
}
}
return changeGroups;
}
/*
* Check every tablet of a group, if replica's location does not match backends in group, relocating those
* replicas, and mark that group as unstable.
* If every replicas match the backends in group, mark that group as stable.
*/
private void matchGroups() {
long start = System.currentTimeMillis();
CheckerCounter counter = new CheckerCounter();
Env env = Env.getCurrentEnv();
SystemInfoService infoService = Env.getCurrentSystemInfo();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
TabletScheduler tabletScheduler = env.getTabletScheduler();
// check each group
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
for (GroupId groupId : groupIds) {
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
if (groupSchema == null) {
LOG.info("Not found colocate group {}, maybe delete", groupId);
continue;
}
List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
List<Set<Long>> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
if (backendBucketsSeq.isEmpty()) {
continue;
}
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
String unstableReason = null;
OUT:
for (Long tableId : tableIds) {
long dbId = groupId.dbId;
if (dbId == 0) {
dbId = groupId.getDbIdByTblId(tableId);
}
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) {
continue;
}
boolean isUniqKeyMergeOnWrite = olapTable.isUniqKeyMergeOnWrite();
olapTable.readLock();
try {
for (Partition partition : olapTable.getPartitions()) {
short replicationNum = replicaAlloc.getTotalReplicaNum();
long visibleVersion = partition.getVisibleVersion();
// Here we only get VISIBLE indexes. All other indexes are not queryable.
// So it does not matter if tablets of other indexes are not matched.
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
Preconditions.checkState(backendBucketsSeq.size() == index.getTablets().size(),
backendBucketsSeq.size() + " vs. " + index.getTablets().size());
List<Long> tabletIdsInOrder = index.getTabletIdsInOrder();
for (int idx = 0; idx < tabletIdsInOrder.size(); idx++) {
Long tabletId = tabletIdsInOrder.get(idx);
counter.totalTabletNum++;
Set<Long> bucketsSeq = backendBucketsSeq.get(idx);
Preconditions.checkState(bucketsSeq.size() == replicationNum,
bucketsSeq.size() + " vs. " + replicationNum);
Tablet tablet = index.getTablet(tabletId);
TabletHealth tabletHealth = tablet.getColocateHealth(
visibleVersion, replicaAlloc, bucketsSeq);
if (tabletHealth.status != TabletStatus.HEALTHY) {
counter.unhealthyTabletNum++;
unstableReason = String.format("get unhealthy tablet %d in colocate table."
+ " status: %s", tablet.getId(), tabletHealth.status);
if (LOG.isDebugEnabled()) {
LOG.debug(unstableReason);
}
if (tabletHealth.status == TabletStatus.UNRECOVERABLE) {
continue;
}
if (!tablet.readyToBeRepaired(infoService, Priority.NORMAL)) {
counter.tabletNotReady++;
continue;
}
TabletSchedCtx tabletCtx = new TabletSchedCtx(
TabletSchedCtx.Type.REPAIR,
db.getId(), tableId, partition.getId(), index.getId(), tablet.getId(),
replicaAlloc, System.currentTimeMillis());
// the tablet status will be set again when being scheduled
tabletCtx.setTabletHealth(tabletHealth);
tabletCtx.setTabletOrderIdx(idx);
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
if (res == AddResult.DISABLED) {
// tablet in scheduler exceed limit, or scheduler is disabled,
// skip this group and check next one.
LOG.info("tablet scheduler return: {}. stop colocate table check", res.name());
break OUT;
} else if (res == AddResult.ADDED) {
counter.addToSchedulerTabletNum++;
} else if (res == AddResult.ALREADY_IN) {
counter.tabletInScheduler++;
} else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) {
counter.tabletExceedLimit++;
}
}
}
}
}
} finally {
olapTable.readUnlock();
}
} // end for tables
// mark group as stable or unstable
if (Strings.isNullOrEmpty(unstableReason)) {
colocateIndex.markGroupStable(groupId, true, replicaAlloc);
} else {
colocateIndex.markGroupUnstable(groupId, unstableReason, true);
}
} // end for groups
long cost = System.currentTimeMillis() - start;
LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+ "cost: {} ms",
counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum,
counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost);
}
private GlobalColocateStatistic buildGlobalColocateStatistic() {
Env env = Env.getCurrentEnv();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
GlobalColocateStatistic globalColocateStatistic = new GlobalColocateStatistic();
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
for (GroupId groupId : groupIds) {
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
if (groupSchema == null) {
LOG.info("Not found colocate group {}, maybe delete", groupId);
continue;
}
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
List<Set<Long>> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
if (backendBucketsSeq.isEmpty()) {
continue;
}
int totalReplicaNumPerBucket = 0;
ArrayList<Long> totalReplicaDataSizes = Lists.newArrayList();
for (int i = 0; i < backendBucketsSeq.size(); i++) {
totalReplicaDataSizes.add(0L);
}
for (Long tableId : tableIds) {
long dbId = groupId.dbId;
if (dbId == 0) {
dbId = groupId.getDbIdByTblId(tableId);
}
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) {
continue;
}
olapTable.readLock();
try {
for (Partition partition : olapTable.getPartitions()) {
short replicationNum = replicaAlloc.getTotalReplicaNum();
// Here we only get VISIBLE indexes. All other indexes are not queryable.
// So it does not matter if tablets of other indexes are not matched.
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
Preconditions.checkState(backendBucketsSeq.size() == index.getTablets().size(),
backendBucketsSeq.size() + " vs. " + index.getTablets().size());
int tabletOrderIdx = 0;
totalReplicaNumPerBucket++;
for (Long tabletId : index.getTabletIdsInOrder()) {
Set<Long> bucketsSeq = backendBucketsSeq.get(tabletOrderIdx);
Preconditions.checkState(bucketsSeq.size() == replicationNum,
bucketsSeq.size() + " vs. " + replicationNum);
Tablet tablet = index.getTablet(tabletId);
totalReplicaDataSizes.set(tabletOrderIdx,
totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true));
tabletOrderIdx++;
}
}
}
} catch (Exception e) {
LOG.warn("build group {} colocate statistic error", groupId, e);
continue;
} finally {
olapTable.readUnlock();
}
}
globalColocateStatistic.addGroup(groupId, replicaAlloc, backendBucketsSeq, totalReplicaDataSizes,
totalReplicaNumPerBucket);
}
return globalColocateStatistic;
}
/*
* Each balance is performed for a single workload group in a colocate group.
* For example, if the replica allocation of a colocate group is {TagA: 2, TagB: 1},
* So the backend bucket seq may be like:
*
* 0 1 2 3
* TagA A B C A
* TagA B C A B
* TagB D D D D
*
* First, we will handle workload group of TagA, then TagB.
*
* For a single workload group, the balance logic is as follow
* (Suppose there is only one workload group with 3 replicas):
*
* All backends: A,B,C,D,E,F,G,H,I,J
*
* One group's buckets sequence:
*
* Buckets sequence: 0 1 2 3
* Backend set: A A A A
* B D F H
* C E G I
*
* Then each backend has different replica num:
*
* Backends: A B C D E F G H I J
* Replica num: 4 1 1 1 1 1 1 1 1 0
*
* The goal of balance is to evenly distribute replicas on all backends. For this example, we want the
* following result (one possible result):
*
* Backends: A B C D E F G H I J
* Replica num: 2 2 1 1 1 1 1 1 1 1
*
* Algorithm:
* 0. Generate the flat list of backends per bucket sequence:
* A B C A D E A F G A H I
* 1. Sort backends order by replication num and load score for same replication num backends, descending:
* A B C D E F G H I J
* 2. Check the diff of the first backend(A)'s replica num and last backend(J)'s replica num.
* If diff is less or equal than 1, we consider this group as balance. Jump to step 5.
* 3. Else, Replace the first occurrence of Backend A in flat list with Backend J.
* J B C A D E A F G A H I
* 4. Recalculate the replica num of each backend and go to step 1.
* 5. We should get the following flat list(one possible result):
* J B C J D E A F G A H I
* Partition this flat list by replication num:
* [J B C] [J D E] [A F G] [A H I]
* And this is our new balanced backends per bucket sequence.
*
* relocate is similar to balance, but choosing unavailable be as src, and move all bucketIds on unavailable be to
* low be
*
* Return true if backends per bucket sequence change and new sequence is saved in balancedBackendsPerBucketSeq.
* Return false if nothing changed.
*/
private boolean relocateAndBalance(GroupId groupId, Tag tag, Set<Long> unavailableBeIds, List<Long> availableBeIds,
ColocateTableIndex colocateIndex, SystemInfoService infoService, LoadStatisticForTag statistic,
GlobalColocateStatistic globalColocateStatistic, List<List<Long>> balancedBackendsPerBucketSeq,
boolean balanceBetweenGroups) {
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
if (groupSchema == null) {
LOG.info("Not found colocate group {}, maybe delete", groupId);
return false;
}
short replicaNum = groupSchema.getReplicaAlloc().getReplicaNumByTag(tag);
List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(
colocateIndex.getBackendsPerBucketSeqByTag(groupId, tag));
// [[A,B,C],[B,C,D]] -> [A,B,C,B,C,D]
List<Long> flatBackendsPerBucketSeq = backendsPerBucketSeq.stream()
.flatMap(List::stream).collect(Collectors.toList());
int tagTotalBucketNum = globalColocateStatistic.getAllTagBucketNum().getOrDefault(tag, 0);
int availableBeNum = availableBeIds.size();
int highTotalBucketNumPerBe = availableBeNum == 0 ? 0 :
(tagTotalBucketNum + availableBeNum - 1) / availableBeNum;
int lowTotalBucketNumPerBe = availableBeNum == 0 ? 0 : tagTotalBucketNum / availableBeNum;
boolean isChanged = false;
int times = 0;
List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
OUT:
while (true) {
// update backends and hosts at each round
backendsPerBucketSeq = Lists.partition(flatBackendsPerBucketSeq, replicaNum);
List<List<String>> hostsPerBucketSeq = getHostsPerBucketSeq(backendsPerBucketSeq, infoService);
if (hostsPerBucketSeq == null) {
// error happens, change nothing
return false;
}
Preconditions.checkState(backendsPerBucketSeq.size() == hostsPerBucketSeq.size());
times++;
if (times > 10 * backendsPerBucketSeq.size()) {
LOG.warn("iterate too many times for relocate group: {}, times: {}, bucket num: {}",
groupId, times, backendsPerBucketSeq.size());
break;
}
long srcBeId = -1;
List<Integer> seqIndexes = null;
boolean srcBeUnavailable = false;
// first choose the unavailable be as src be
for (Long beId : unavailableBeIds) {
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId);
if (!seqIndexes.isEmpty()) {
srcBeId = beId;
srcBeUnavailable = true;
LOG.info("find unavailable backend {} in colocate group: {}", beId, groupId);
break;
}
}
// sort backends with replica num in desc order
List<Map.Entry<Long, Long>> backendWithReplicaNum =
getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic,
globalColocateStatistic, flatBackendsPerBucketSeq);
// if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
if (backendWithReplicaNum.size() <= 1 && !srcBeUnavailable) {
break;
}
if (seqIndexes == null || seqIndexes.isEmpty()) {
// choose max bucketId num be as src be
Preconditions.checkState(backendsPerBucketSeq.size() > 0);
srcBeId = backendWithReplicaNum.get(0).getKey();
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, srcBeId);
}
boolean isThisRoundChanged = false;
for (int j = backendWithReplicaNum.size() - 1; j >= 0; j--) {
// we try to use a low backend to replace the src backend.
// if replace failed(eg: both backends are on some host), select next low backend and try(j--)
Map.Entry<Long, Long> lowBackend = backendWithReplicaNum.get(j);
long destBeId = lowBackend.getKey();
if (!srcBeUnavailable) {
long diffThisGroup = seqIndexes.size() - lowBackend.getValue();
if (diffThisGroup < 1) {
// balanced
break OUT;
}
// src's group bucket num = dest's group bucket num + 1
// if move group bucket from src to dest, dest will be one more group num than src.
// check global view
//
// suppose bucket num = 3, three BE A/B/C, two group group1/group2, then we have:
//
// A [ group1:bucket0, group2:bucket0]
// B [ group1:bucket1, group2:bucket1]
// C [ group1:bucket2, group2:bucket2]
//
// if we add a new BE D, for each group: bucketNum(A)=bucketNum(B)=bucketNum(C)=1, bucketNum(D)=0
// so each group is balance, but in global groups view, it's not balance.
// we should move one of the buckets to D
if (diffThisGroup == 1) {
if (!balanceBetweenGroups) {
break OUT;
}
int srcTotalBucketNum = globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
int destTotalBucketNum = globalColocateStatistic.getBackendTotalBucketNum(destBeId);
if (srcTotalBucketNum <= highTotalBucketNumPerBe
|| destTotalBucketNum >= lowTotalBucketNumPerBe) {
continue;
}
}
}
Backend destBe = infoService.getBackend(destBeId);
if (destBe == null) {
LOG.info("backend {} does not exist", destBeId);
return false;
}
// if we found src_id == dst_id we skip to next
if (srcBeId == destBeId) {
continue;
}
// Unavailable be has been removed from backendWithReplicaNum,
// but the conditions for judging unavailable be by
// getUnavailableBeIdsInGroup may be too loose. Under the
// default configuration (colocate_group_relocate_delay_second =
// 1800), a be that has been out of contact for 20 minutes can
// still be selected as the dest be.
if (!destBe.isAlive()) {
LOG.info("{} is not alive, not suitable as a dest be", destBe);
continue;
}
BackendLoadStatistic beStat = statistic.getBackendLoadStatistic(destBeId);
if (beStat == null) {
LOG.warn("not found backend {} statistic", destBeId);
continue;
}
int targetSeqIndex = -1;
long minDataSizeDiff = Long.MAX_VALUE;
boolean destBeContainsAllBuckets = true;
boolean theSameHostContainsAllBuckets = true;
for (int seqIndex : seqIndexes) {
// the bucket index.
// eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0.
int bucketIndex = seqIndex / replicaNum;
List<Long> backendsSet = backendsPerBucketSeq.get(bucketIndex);
List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex);
// the replicas of a tablet can not locate in same Backend or same host
if (backendsSet.contains(destBeId)) {
continue;
}
destBeContainsAllBuckets = false;
if (!Config.allow_replica_on_same_host && hostsSet.contains(destBe.getHost())) {
continue;
}
theSameHostContainsAllBuckets = false;
Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId);
long bucketDataSize =
globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex);
resultPaths.clear();
BalanceStatus st = beStat.isFit(bucketDataSize, null, resultPaths, false);
if (!st.ok()) {
if (LOG.isDebugEnabled()) {
LOG.debug("backend {} is unable to fit in group {}, tablet order idx {}, data size {}",
destBeId, groupId, bucketIndex, bucketDataSize);
}
continue;
}
long newSrcBeTotalReplicaDataSize = globalColocateStatistic.getBackendTotalReplicaDataSize(srcBeId)
- bucketDataSize;
long newDestBeTotalReplicaDataSize =
globalColocateStatistic.getBackendTotalReplicaDataSize(destBeId) + bucketDataSize;
long dataSizeDiff = Math.abs(newSrcBeTotalReplicaDataSize - newDestBeTotalReplicaDataSize);
if (targetSeqIndex < 0 || dataSizeDiff < minDataSizeDiff) {
targetSeqIndex = seqIndex;
minDataSizeDiff = dataSizeDiff;
}
}
if (targetSeqIndex < 0) {
// we use next node as dst node
String failedReason;
if (destBeContainsAllBuckets) {
failedReason = "dest be contains all the same buckets";
} else if (theSameHostContainsAllBuckets) {
failedReason = "dest be's host contains all the same buckets "
+ "and Config.allow_replica_on_same_host=false";
} else {
failedReason = "dest be has no fit path, maybe disk usage is exceeds "
+ "Config.storage_high_watermark_usage_percent";
}
LOG.info("unable to replace backend {} with dest backend {} in colocate group {}, "
+ "failed reason: {}",
srcBeId, destBeId, groupId, failedReason);
continue;
}
int tabletOrderIdx = targetSeqIndex / replicaNum;
int oldSrcThisGroup = seqIndexes.size();
long oldDestThisGroup = lowBackend.getValue();
int oldSrcBucketNum = globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
int oldDestBucketNum = globalColocateStatistic.getBackendTotalBucketNum(destBeId);
if (LOG.isDebugEnabled()) {
LOG.debug("OneMove: group {}, src {}, this group {}, all group {}, dest {}, this group {}, "
+ "all group {}", groupId, srcBeId, oldSrcThisGroup, oldSrcBucketNum, destBeId,
oldDestThisGroup, oldDestBucketNum);
}
Preconditions.checkState(
globalColocateStatistic.moveTablet(groupId, tabletOrderIdx, srcBeId, destBeId));
Preconditions.checkState(oldSrcBucketNum - 1
== globalColocateStatistic.getBackendTotalBucketNum(srcBeId));
Preconditions.checkState(oldDestBucketNum + 1
== globalColocateStatistic.getBackendTotalBucketNum(destBeId));
flatBackendsPerBucketSeq.set(targetSeqIndex, destBeId);
// just replace one backend at a time, src and dest BE id should be recalculated because
// flatBackendsPerBucketSeq is changed.
isChanged = true;
isThisRoundChanged = true;
LOG.info("replace backend {} with backend {} in colocate group {}, idx: {}",
srcBeId, destBeId, groupId, targetSeqIndex);
break;
}
if (!isThisRoundChanged) {
// if all backends are checked but this round is not changed,
// we should end the loop
LOG.info("all backends are checked but this round is not changed, "
+ "end outer loop in colocate group {}", groupId);
break;
}
// end inner loop
}
if (isChanged) {
balancedBackendsPerBucketSeq.addAll(Lists.partition(flatBackendsPerBucketSeq, replicaNum));
}
return isChanged;
}
// change the backend id to backend host
// return null if some of backends do not exist
private List<List<String>> getHostsPerBucketSeq(List<List<Long>> backendsPerBucketSeq,
SystemInfoService infoService) {
List<List<String>> hostsPerBucketSeq = Lists.newArrayList();
for (List<Long> backendIds : backendsPerBucketSeq) {
List<String> hosts = Lists.newArrayList();
for (Long beId : backendIds) {
Backend be = infoService.getBackend(beId);
if (be == null) {
// For non-exist BE(maybe dropped), add a ip 0.0.0.0
// And the following logic will handle the non-exist host.
hosts.add(Backend.DUMMY_IP);
} else {
hosts.add(be.getHost());
}
}
hostsPerBucketSeq.add(hosts);
}
return hostsPerBucketSeq;
}
public static void modifyGroupReplicaAllocation(ReplicaAllocation replicaAlloc,
Map<Tag, List<List<Long>>> backendBucketsSeq, int bucketNum) throws Exception {
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
List<Tag> deleteTags = Lists.newArrayList();
for (Tag tag : backendBucketsSeq.keySet()) {
if (!allocMap.containsKey(tag)) {
deleteTags.add(tag);
}
Preconditions.checkState(bucketNum == backendBucketsSeq.get(tag).size(),
bucketNum + " vs " + backendBucketsSeq.get(tag).size());
}
deleteTags.forEach(tag -> backendBucketsSeq.remove(tag));
for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
if (!backendBucketsSeq.containsKey(tag)) {
List<List<Long>> tagBackendBucketsSeq = Lists.newArrayList();
for (int i = 0; i < bucketNum; i++) {
tagBackendBucketsSeq.add(Lists.newArrayList());
}
backendBucketsSeq.put(tag, tagBackendBucketsSeq);
}
}
Map<Long, Integer> backendToBucketNum = Maps.newHashMap();
backendBucketsSeq.values().forEach(tagBackendIds ->
tagBackendIds.forEach(backendIds ->
backendIds.forEach(backendId -> backendToBucketNum.put(
backendId, backendToBucketNum.getOrDefault(backendId, 0) + 1))));
for (Tag tag : backendBucketsSeq.keySet()) {
List<List<Long>> tagBackendBucketsSeq = backendBucketsSeq.get(tag);
int oldReplicaNum = tagBackendBucketsSeq.get(0).size();
for (List<Long> backendIdsOneBucket : tagBackendBucketsSeq) {
Preconditions.checkState(backendIdsOneBucket.size() == oldReplicaNum,
backendIdsOneBucket.size() + " vs " + oldReplicaNum);
}
int newReplicaNum = allocMap.get(tag);
if (newReplicaNum == oldReplicaNum) {
continue;
}
List<Backend> backends = Env.getCurrentSystemInfo().getBackendsByTag(tag);
Set<Long> availableBeIds = backends.stream().filter(be -> be.isScheduleAvailable())
.map(be -> be.getId()).collect(Collectors.toSet());
for (Long backendId : availableBeIds) {
if (!backendToBucketNum.containsKey(backendId)) {
backendToBucketNum.put(backendId, 0);
}
}
for (int i = 0; i < tagBackendBucketsSeq.size(); i++) {
modifyGroupBucketReplicas(tag, newReplicaNum, tagBackendBucketsSeq.get(i),
availableBeIds, backendToBucketNum);
}
}
}
private static void modifyGroupBucketReplicas(Tag tag, int newReplicaNum, List<Long> backendIds,
Set<Long> availableBeIds, Map<Long, Integer> backendToBucketNum) throws Exception {
final boolean smallIdFirst = Math.random() < 0.5;
if (backendIds.size() > newReplicaNum) {
backendIds.sort((id1, id2) -> {
boolean alive1 = availableBeIds.contains(id1);
boolean alive2 = availableBeIds.contains(id2);
if (alive1 != alive2) {
return alive1 ? -1 : 1;
}
int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
if (bucketNum1 != bucketNum2) {
return Integer.compare(bucketNum1, bucketNum2);
}
return smallIdFirst ? Long.compare(id1, id2) : Long.compare(id2, id1);
});
for (int i = backendIds.size() - 1; i >= newReplicaNum; i--) {
long backendId = backendIds.get(i);
backendIds.remove(i);
backendToBucketNum.put(backendId, backendToBucketNum.getOrDefault(backendId, 0) - 1);
}
}
if (backendIds.size() < newReplicaNum) {
Set<Long> candBackendSet = Sets.newHashSet();
candBackendSet.addAll(availableBeIds);
candBackendSet.removeAll(backendIds);
if (backendIds.size() + candBackendSet.size() < newReplicaNum) {
throw new UserException("Can not add backend for tag: " + tag);
}
List<Long> candBackendList = Lists.newArrayList(candBackendSet);
candBackendList.sort((id1, id2) -> {
int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
if (bucketNum1 != bucketNum2) {
return Integer.compare(bucketNum1, bucketNum2);
}
return smallIdFirst ? Long.compare(id1, id2) : Long.compare(id2, id1);
});
int addNum = newReplicaNum - backendIds.size();
for (int i = 0; i < addNum; i++) {
long backendId = candBackendList.get(i);
backendIds.add(backendId);
backendToBucketNum.put(backendId, backendToBucketNum.getOrDefault(backendId, 0) + 1);
}
}
Preconditions.checkState(newReplicaNum == backendIds.size(),
newReplicaNum + " vs " + backendIds.size());
}
private List<Map.Entry<Long, Long>> getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
Set<Long> unavailBackendIds, LoadStatisticForTag statistic,
GlobalColocateStatistic globalColocateStatistic,
List<Long> flatBackendsPerBucketSeq) {
// backend id -> replica num, and sorted by replica num, descending.
Map<Long, Long> backendToReplicaNum = flatBackendsPerBucketSeq.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
// remove unavailable backend
for (Long backendId : unavailBackendIds) {
backendToReplicaNum.remove(backendId);
}
// add backends which are not in flatBackendsPerBucketSeq, with replication number 0
for (Long backendId : allAvailBackendIds) {
if (!backendToReplicaNum.containsKey(backendId)) {
backendToReplicaNum.put(backendId, 0L);
}
}
return backendToReplicaNum
.entrySet()
.stream()
.sorted((entry1, entry2) -> {
if (!entry1.getValue().equals(entry2.getValue())) {
return (int) (entry2.getValue() - entry1.getValue());
}
// From java 7, sorting needs to satisfy reflexivity, transitivity and symmetry.
// Otherwise it will raise exception "Comparison method violates its general contract".
BackendLoadStatistic beStat1 = statistic.getBackendLoadStatistic(entry1.getKey());
BackendLoadStatistic beStat2 = statistic.getBackendLoadStatistic(entry2.getKey());
if (beStat1 == null || beStat2 == null) {
if (beStat1 == null && beStat2 == null) {
return 0;
} else {
return beStat1 == null ? 1 : -1;
}
}
double loadScore1 = beStat1.getMixLoadScore();
double loadScore2 = beStat2.getMixLoadScore();
int cmp = Double.compare(loadScore2, loadScore1);
if (cmp != 0) {
return cmp;
}
return Long.compare(entry1.getKey(), entry2.getKey());
})
.collect(Collectors.toList());
}
/*
* get the array indexes of elements in flatBackendsPerBucketSeq which equals to beId
* eg:
* flatBackendsPerBucketSeq:
* A B C A D E A F G A H I
* and srcBeId is A.
* so seqIndexes is:
* 0 3 6 9
*/
private List<Integer> getBeSeqIndexes(List<Long> flatBackendsPerBucketSeq, long beId) {
return IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed().filter(
idx -> flatBackendsPerBucketSeq.get(idx).equals(beId)).collect(Collectors.toList());
}
private Set<Long> getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex,
GroupId groupId, Tag tag) {
Set<Long> backends = colocateIndex.getBackendsByGroup(groupId, tag);
Set<Long> unavailableBeIds = Sets.newHashSet();
for (Long backendId : backends) {
if (!checkBackendAvailable(backendId, tag, Sets.newHashSet(), infoService,
Config.colocate_group_relocate_delay_second)) {
unavailableBeIds.add(backendId);
}
}
return unavailableBeIds;
}
private List<Long> getAvailableBeIds(Tag tag, Set<Long> excludedBeIds,
SystemInfoService infoService) {
// get all backends to allBackendIds, and check be availability using checkBackendAvailable
// backend stopped for a short period of time is still considered available
List<Long> allBackendIds = infoService.getAllBackendIds(false);
List<Long> availableBeIds = Lists.newArrayList();
for (Long backendId : allBackendIds) {
if (checkBackendAvailable(backendId, tag, excludedBeIds, infoService,
Config.colocate_group_relocate_delay_second)) {
availableBeIds.add(backendId);
}
}
return availableBeIds;
}
/**
* check backend available
* backend stopped within "delaySecond" is still considered available
*/
private boolean checkBackendAvailable(Long backendId, Tag tag, Set<Long> excludedBeIds,
SystemInfoService infoService, long delaySecond) {
long currTime = System.currentTimeMillis();
Backend be = infoService.getBackend(backendId);
if (be == null) {
return false;
} else if (!be.isMixNode()) {
return false;
} else if (!be.getLocationTag().equals(tag) || excludedBeIds.contains(be.getId())) {
return false;
} else if (!be.isScheduleAvailable()) {
// 1. BE is dead longer than "delaySecond"
// 2. BE is under decommission
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > delaySecond * 1000L) || be.isDecommissioned()) {
return false;
}
}
return true;
}
}