CloudTabletRebalancer.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
public class CloudTabletRebalancer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(CloudTabletRebalancer.class);
private volatile ConcurrentHashMap<Long, List<Tablet>> beToTabletsGlobal =
new ConcurrentHashMap<Long, List<Tablet>>();
private volatile ConcurrentHashMap<Long, List<Tablet>> beToColocateTabletsGlobal =
new ConcurrentHashMap<Long, List<Tablet>>();
// used for cloud tablet report
private volatile ConcurrentHashMap<Long, List<Tablet>> beToTabletsGlobalInSecondary =
new ConcurrentHashMap<Long, List<Tablet>>();
private Map<Long, List<Tablet>> futureBeToTabletsGlobal;
private Map<String, List<Long>> clusterToBes;
private Set<Long> allBes;
// partitionId -> indexId -> be -> tablet
private Map<Long, Map<Long, Map<Long, List<Tablet>>>> partitionToTablets;
private Map<Long, Map<Long, Map<Long, List<Tablet>>>> futurePartitionToTablets;
// tableId -> be -> tablet
private Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable;
private Map<Long, Map<Long, List<Tablet>>> futureBeToTabletsInTable;
private Map<Long, Long> beToDecommissionedTime = new HashMap<Long, Long>();
private Random rand = new Random();
private boolean indexBalanced = true;
private boolean tableBalanced = true;
private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new LinkedBlockingQueue<Pair<Long, Long>>();
private Map<InfightTablet, InfightTask> tabletToInfightTask = new HashMap<>();
private CloudSystemInfoService cloudSystemInfoService;
public CloudTabletRebalancer(CloudSystemInfoService cloudSystemInfoService) {
super("cloud tablet rebalancer", Config.cloud_tablet_rebalancer_interval_second * 1000);
this.cloudSystemInfoService = cloudSystemInfoService;
}
private interface Operator {
void op(Database db, Table table, Partition partition, MaterializedIndex index, String cluster);
}
public enum BalanceType {
GLOBAL,
TABLE,
PARTITION
}
@Getter
private class InfightTablet {
private final Long tabletId;
private final String clusterId;
public InfightTablet(Long tabletId, String clusterId) {
this.tabletId = tabletId;
this.clusterId = clusterId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InfightTablet that = (InfightTablet) o;
return tabletId.equals(that.tabletId) && clusterId.equals(that.clusterId);
}
@Override
public int hashCode() {
return Objects.hash(tabletId, clusterId);
}
}
private class InfightTask {
public Tablet pickedTablet;
public long srcBe;
public long destBe;
public boolean isGlobal;
public Map<Long, List<Tablet>> beToTablets;
public long startTimestamp;
BalanceType balanceType;
}
private class TransferPairInfo {
public long srcBe;
public long destBe;
public long minTabletsNum;
public long maxTabletsNum;
public boolean srcDecommissioned;
}
public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
List<Tablet> tablets = beToTabletsGlobal.get(beId);
if (tablets != null) {
for (Tablet tablet : tablets) {
tabletIds.add(tablet.getId());
}
}
tablets = beToColocateTabletsGlobal.get(beId);
if (tablets != null) {
for (Tablet tablet : tablets) {
tabletIds.add(tablet.getId());
}
}
return tabletIds;
}
public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
List<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
if (tablets != null) {
for (Tablet tablet : tablets) {
tabletIds.add(tablet.getId());
}
}
return tabletIds;
}
public Set<Long> getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
tabletIds.addAll(getSnapshotTabletsInPrimaryByBeId(beId));
tabletIds.addAll(getSnapshotTabletsInSecondaryByBeId(beId));
return tabletIds;
}
public int getTabletNumByBackendId(long beId) {
List<Tablet> tablets = beToTabletsGlobal.get(beId);
List<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
return (tablets == null ? 0 : tablets.size())
+ (colocateTablets == null ? 0 : colocateTablets.size());
}
// 1 build cluster to backends info
// 2 complete route info
// 3 check whether the inflight preheating task has been completed
// 4 migrate tablet for smooth upgrade
// 5 statistics be to tablets mapping information
// 6 partition-level balance
// 7 if tablets in partition-level already balanced, perform table balance
// 8 if tablets in partition-level and table-level already balanced, perform global balance
// 9 check whether all tablets of decomission node have been migrated
@Override
protected void runAfterCatalogReady() {
if (Config.enable_cloud_multi_replica) {
LOG.info("Tablet balance is temporarily not supported when multi replica enabled");
return;
}
LOG.info("cloud tablet rebalance begin");
clusterToBes = new HashMap<String, List<Long>>();
allBes = new HashSet<Long>();
long start = System.currentTimeMillis();
// 1 build cluster to backend info
for (Long beId : cloudSystemInfoService.getAllBackendIds()) {
Backend be = cloudSystemInfoService.getBackend(beId);
if (be == null) {
LOG.info("backend {} not found", beId);
continue;
}
clusterToBes.putIfAbsent(be.getCloudClusterId(), new ArrayList<Long>());
clusterToBes.get(be.getCloudClusterId()).add(beId);
allBes.add(beId);
}
LOG.info("cluster to backends {}", clusterToBes);
// 2 complete route info
if (!completeRouteInfo()) {
return;
}
// 3 check whether the inflight preheating task has been completed
checkInflghtWarmUpCacheAsync();
// 4 migrate tablet for smooth upgrade
Pair<Long, Long> pair;
statRouteInfo();
while (!tabletsMigrateTasks.isEmpty()) {
try {
pair = tabletsMigrateTasks.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("begin tablets migration from be {} to be {}", pair.first, pair.second);
}
migrateTablets(pair.first, pair.second);
}
// 5 statistics be to tablets mapping information
statRouteInfo();
indexBalanced = true;
tableBalanced = true;
// 6 partition-level balance
if (Config.enable_cloud_partition_balance) {
balanceAllPartitions();
}
// 7 if tablets in partition-level already balanced, perform table balance
if (Config.enable_cloud_table_balance && indexBalanced) {
balanceAllTables();
}
// 8 if tablets in partition-level and table-level already balanced, perform global balance
if (Config.enable_cloud_global_balance && indexBalanced && tableBalanced) {
globalBalance();
}
// 9 check whether all tablets of decomission have been migrated
checkDecommissionState(clusterToBes);
LOG.info("finished to rebalancer. cost: {} ms", (System.currentTimeMillis() - start));
}
public void balanceAllPartitions() {
for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
LOG.info("before partition balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
}
for (Map.Entry<Long, List<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
LOG.info("before partition balance be {} tablet num(current + pre heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceInPartition(entry.getValue(), entry.getKey(), infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog partitions before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}
for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
LOG.info("after partition balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
}
for (Map.Entry<Long, List<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
LOG.info("after partition balance be {} tablet num(current + pre heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
}
public void balanceAllTables() {
for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
LOG.info("before table balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
}
for (Map.Entry<Long, List<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
LOG.info("before table balance be {} tablet num(current + pre heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceInTable(entry.getValue(), entry.getKey(), infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog table before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}
for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
LOG.info("after table balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
}
for (Map.Entry<Long, List<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
LOG.info("after table balance be {} tablet num(current + pre heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
}
public void globalBalance() {
for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
LOG.info("before global balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
}
for (Map.Entry<Long, List<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
LOG.info("before global balance be {} tablet num(current + pre heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog global before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}
for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
LOG.info("after global balance be {} tablet num {}", entry.getKey(), entry.getValue().size());
}
for (Map.Entry<Long, List<Tablet>> entry : futureBeToTabletsGlobal.entrySet()) {
LOG.info("after global balance be {} tablet num(current + pre heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
}
public void checkInflghtWarmUpCacheAsync() {
Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long, List<InfightTask>>();
for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
if (destBackend == null || (!destBackend.isAlive() && destBackend.getLastUpdateMs() < needRehashDeadTime)) {
for (InfightTask task : entry.getValue()) {
for (InfightTablet key : tabletToInfightTask.keySet()) {
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId));
}
}
continue;
}
if (!destBackend.isAlive()) {
continue;
}
List<Long> tablets = entry.getValue().stream()
.map(task -> task.pickedTablet.getId()).collect(Collectors.toList());
Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
if (taskDone == null) {
LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}",
entry.getKey(), entry.getValue());
continue;
}
String clusterId = cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId();
for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
InfightTask task = tabletToInfightTask
.getOrDefault(new InfightTablet(result.getKey(), clusterId), null);
if (task != null && (result.getValue() || System.currentTimeMillis() / 1000 - task.startTimestamp
> Config.cloud_pre_heating_time_limit_sec)) {
if (!result.getValue()) {
LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey());
}
updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId));
}
}
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog warmup before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}
// recalculate inflight beToTablets, just for print the log
beToInfightTasks.clear();
for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
}
for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
LOG.info("after pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
}
}
public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
List<Long> beList = entry.getValue();
for (long beId : beList) {
List<Tablet> tablets = beToTabletsGlobal.get(beId);
int tabletNum = tablets == null ? 0 : tablets.size();
Backend backend = cloudSystemInfoService.getBackend(beId);
if (backend == null) {
LOG.info("backend {} not found", beId);
continue;
}
if (!backend.isDecommissioning()) {
continue;
}
// here check wal
long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
LOG.info("check decommissioning be {} state {} tabletNum {} isActive {} beList {}, wal num {}",
backend.getId(), backend.isDecommissioning(), tabletNum, backend.isActive(), beList, walNum);
if ((tabletNum != 0 || backend.isActive() || walNum != 0) && beList.size() != 1) {
continue;
}
if (beToDecommissionedTime.containsKey(beId)) {
continue;
}
LOG.info("prepare to notify meta service be {} decommissioned", backend.getAddress());
Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);
Cloud.ClusterPB.Builder clusterBuilder =
Cloud.ClusterPB.newBuilder();
clusterBuilder.setClusterName(backend.getCloudClusterName());
clusterBuilder.setClusterId(backend.getCloudClusterId());
clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);
Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
nodeBuilder.setIp(backend.getHost());
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);
clusterBuilder.addNodes(nodeBuilder);
builder.setCluster(clusterBuilder);
Cloud.AlterClusterResponse response;
try {
response = MetaServiceProxy.getInstance().alterCluster(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("notify decommission response: {}", response);
continue;
}
LOG.info("notify decommission response: {} ", response);
} catch (RpcException e) {
LOG.warn("failed to notify decommission", e);
continue;
}
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
}
}
}
private boolean completeRouteInfo() {
List<UpdateCloudReplicaInfo> updateReplicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
long[] assignedErrNum = {0L};
long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L;
loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> {
boolean assigned = false;
List<Long> beIds = new ArrayList<Long>();
List<Long> tabletIds = new ArrayList<Long>();
boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId());
for (Tablet tablet : index.getTablets()) {
for (Replica r : tablet.getReplicas()) {
CloudReplica replica = (CloudReplica) r;
// clean secondary map
replica.checkAndClearSecondaryClusterToBe(cluster, needRehashDeadTime);
InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster);
// colocate table no need to update primary backends
if (isColocated) {
replica.clearClusterToBe(cluster);
tabletToInfightTask.remove(taskKey);
continue;
}
// primary backend is alive or dead not long
Backend be = replica.getPrimaryBackend(cluster);
if (be != null && (be.isQueryAvailable()
|| (!be.isQueryDisabled() && be.getLastUpdateMs() > needRehashDeadTime))) {
beIds.add(be.getId());
tabletIds.add(tablet.getId());
continue;
}
// primary backend not available too long, change one
long beId = -1L;
be = replica.getSecondaryBackend(cluster);
if (be != null && be.isQueryAvailable()) {
beId = be.getId();
} else {
InfightTask task = tabletToInfightTask.get(taskKey);
be = task == null ? null : Env.getCurrentSystemInfo().getBackend(task.destBe);
if (be != null && be.isQueryAvailable()) {
beId = be.getId();
} else {
try {
beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true);
} catch (ComputeGroupException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to hash replica to be {}", cluster, e);
}
beId = -1;
}
}
}
if (beId <= 0) {
assignedErrNum[0]++;
continue;
}
tabletToInfightTask.remove(taskKey);
((CloudReplica) replica).updateClusterToPrimaryBe(cluster, beId);
beIds.add(beId);
tabletIds.add(tablet.getId());
assigned = true;
}
}
if (assigned) {
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(db.getId(), table.getId(),
partition.getId(), index.getId(), cluster, beIds, tabletIds);
updateReplicaInfos.add(info);
}
});
LOG.info("collect to editlog route {} infos, error num {}", updateReplicaInfos.size(), assignedErrNum[0]);
if (updateReplicaInfos.isEmpty()) {
return true;
}
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(updateReplicaInfos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return false;
}
return true;
}
public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet,
Map<Long, List<Tablet>> globalBeToTablets,
Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable,
Map<Long, Map<Long, Map<Long, List<Tablet>>>> partToTablets) {
// global
globalBeToTablets.putIfAbsent(be, new ArrayList<Tablet>());
globalBeToTablets.get(be).add(tablet);
// table
beToTabletsInTable.putIfAbsent(tableId, new HashMap<Long, List<Tablet>>());
Map<Long, List<Tablet>> beToTabletsOfTable = beToTabletsInTable.get(tableId);
beToTabletsOfTable.putIfAbsent(be, new ArrayList<Tablet>());
beToTabletsOfTable.get(be).add(tablet);
// partition
partToTablets.putIfAbsent(partId, new HashMap<Long, Map<Long, List<Tablet>>>());
Map<Long, Map<Long, List<Tablet>>> indexToTablets = partToTablets.get(partId);
indexToTablets.putIfAbsent(indexId, new HashMap<Long, List<Tablet>>());
Map<Long, List<Tablet>> beToTabletsOfIndex = indexToTablets.get(indexId);
beToTabletsOfIndex.putIfAbsent(be, new ArrayList<Tablet>());
beToTabletsOfIndex.get(be).add(tablet);
}
public void statRouteInfo() {
ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobal = new ConcurrentHashMap<Long, List<Tablet>>();
ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobalInSecondary
= new ConcurrentHashMap<Long, List<Tablet>>();
ConcurrentHashMap<Long, List<Tablet>> tmpBeToColocateTabletsGlobal
= new ConcurrentHashMap<Long, List<Tablet>>();
futureBeToTabletsGlobal = new HashMap<Long, List<Tablet>>();
partitionToTablets = new HashMap<Long, Map<Long, Map<Long, List<Tablet>>>>();
futurePartitionToTablets = new HashMap<Long, Map<Long, Map<Long, List<Tablet>>>>();
beToTabletsInTable = new HashMap<Long, Map<Long, List<Tablet>>>();
futureBeToTabletsInTable = new HashMap<Long, Map<Long, List<Tablet>>>();
loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> {
boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId());
for (Tablet tablet : index.getTablets()) {
for (Replica r : tablet.getReplicas()) {
CloudReplica replica = (CloudReplica) r;
if (isColocated) {
long beId = -1L;
try {
beId = replica.getColocatedBeId(cluster);
} catch (ComputeGroupException e) {
continue;
}
if (allBes.contains(beId)) {
List<Tablet> colocateTablets =
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>());
colocateTablets.add(tablet);
}
continue;
}
Backend be = replica.getPrimaryBackend(cluster);
long beId = be == null ? -1L : be.getId();
if (!allBes.contains(beId)) {
continue;
}
Backend secondaryBe = replica.getSecondaryBackend(cluster);
long secondaryBeId = secondaryBe == null ? -1L : secondaryBe.getId();
if (allBes.contains(secondaryBeId)) {
List<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
.computeIfAbsent(secondaryBeId, k -> new ArrayList<>());
tablets.add(tablet);
}
InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster);
InfightTask task = tabletToInfightTask.get(taskKey);
long futureBeId = task == null ? beId : task.destBe;
fillBeToTablets(beId, table.getId(), partition.getId(), index.getId(), tablet,
tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets);
fillBeToTablets(futureBeId, table.getId(), partition.getId(), index.getId(), tablet,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
}
}
});
beToTabletsGlobal = tmpBeToTabletsGlobal;
beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary;
beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal;
}
public void loopCloudReplica(Operator operator) {
List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
for (Long dbId : dbIds) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
List<Table> tableList = db.getTables();
for (Table table : tableList) {
if (!table.isManagedTable()) {
continue;
}
OlapTable olapTable = (OlapTable) table;
table.readLock();
try {
for (Partition partition : olapTable.getAllPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
String cluster = entry.getKey();
operator.op(db, table, partition, index, cluster);
}
} // end for indices
} // end for partitions
} finally {
table.readUnlock();
}
}
}
}
public void balanceInPartition(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos) {
// balance all partition
for (Map.Entry<Long, Map<Long, Map<Long, List<Tablet>>>> partitionEntry : futurePartitionToTablets.entrySet()) {
Map<Long, Map<Long, List<Tablet>>> indexToTablets = partitionEntry.getValue();
// balance all index of a partition
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : indexToTablets.entrySet()) {
// balance a index
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION, infos);
}
}
}
public void balanceInTable(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos) {
// balance all tables
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : futureBeToTabletsInTable.entrySet()) {
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, infos);
}
}
private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) throws Exception {
BackendService.Client client = null;
TNetworkAddress address = null;
Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
Backend destBackend = cloudSystemInfoService.getBackend(destBe);
boolean ok = true;
try {
address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
TWarmUpCacheAsyncRequest req = new TWarmUpCacheAsyncRequest();
req.setHost(srcBackend.getHost());
req.setBrpcPort(srcBackend.getBrpcPort());
List<Long> tablets = new ArrayList<Long>();
tablets.add(pickedTablet.getId());
req.setTabletIds(tablets);
TWarmUpCacheAsyncResponse result = client.warmUpCacheAsync(req);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
LOG.warn("pre cache failed status {} {}", result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
}
} catch (Exception e) {
LOG.warn("send pre heating rpc error. backend[{}]", destBackend.getId(), e);
ok = false;
throw e;
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
}
private Map<Long, Boolean> sendCheckWarmUpCacheAsyncRpc(List<Long> tabletIds, long be) {
BackendService.Client client = null;
TNetworkAddress address = null;
Backend destBackend = cloudSystemInfoService.getBackend(be);
boolean ok = true;
try {
address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
TCheckWarmUpCacheAsyncRequest req = new TCheckWarmUpCacheAsyncRequest();
req.setTablets(tabletIds);
TCheckWarmUpCacheAsyncResponse result = client.checkWarmUpCacheAsync(req);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
LOG.warn("check pre cache status {} {}", result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
} else {
LOG.info("check pre cache succ status {} {}", result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
}
return result.getTaskDone();
} catch (Exception e) {
LOG.warn("send check pre cache rpc error. backend[{}]", destBackend.getId(), e);
ok = false;
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
return null;
}
private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, BalanceType balanceType,
Map<Long, List<Tablet>> globalBeToTablets,
Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable,
Map<Long, Map<Long, Map<Long, List<Tablet>>>> partToTablets) {
CloudReplica replica = (CloudReplica) pickedTablet.getReplicas().get(0);
long tableId = replica.getTableId();
long partId = replica.getPartitionId();
long indexId = replica.getIndexId();
globalBeToTablets.get(srcBe).remove(pickedTablet);
beToTabletsInTable.get(tableId).get(srcBe).remove(pickedTablet);
partToTablets.get(partId).get(indexId).get(srcBe).remove(pickedTablet);
fillBeToTablets(destBe, tableId, partId, indexId, pickedTablet, globalBeToTablets, beToTabletsInTable,
partToTablets);
}
private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
if (db == null) {
return;
}
OlapTable table = (OlapTable) db.getTableNullable(cloudReplica.getTableId());
if (table == null) {
return;
}
table.readLock();
try {
if (db.getTableNullable(cloudReplica.getTableId()) == null) {
return;
}
cloudReplica.updateClusterToPrimaryBe(clusterId, destBe);
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
pickedTablet.getId(), cloudReplica.getId(), clusterId, destBe);
infos.add(info);
} finally {
table.readUnlock();
}
}
private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> beToTablets, long avgNum,
TransferPairInfo pairInfo) {
long destBe = bes.get(0);
long srcBe = bes.get(0);
long minTabletsNum = Long.MAX_VALUE;
long maxTabletsNum = 0;
boolean srcDecommissioned = false;
for (Long be : bes) {
long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size();
if (tabletNum > maxTabletsNum) {
srcBe = be;
maxTabletsNum = tabletNum;
}
Backend backend = cloudSystemInfoService.getBackend(be);
if (backend == null) {
LOG.info("backend {} not found", be);
continue;
}
if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioning()
&& !backend.isSmoothUpgradeSrc()) {
destBe = be;
minTabletsNum = tabletNum;
}
}
for (Long be : bes) {
long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size();
Backend backend = cloudSystemInfoService.getBackend(be);
if (backend == null) {
LOG.info("backend {} not found", be);
continue;
}
if (backend.isDecommissioning() && tabletNum > 0) {
srcBe = be;
srcDecommissioned = true;
break;
}
}
if (!srcDecommissioned) {
if ((maxTabletsNum < avgNum * (1 + Config.cloud_rebalance_percent_threshold)
&& minTabletsNum > avgNum * (1 - Config.cloud_rebalance_percent_threshold))
|| minTabletsNum > maxTabletsNum - Config.cloud_rebalance_number_threshold) {
return false;
}
}
pairInfo.srcBe = srcBe;
pairInfo.destBe = destBe;
pairInfo.minTabletsNum = minTabletsNum;
pairInfo.maxTabletsNum = maxTabletsNum;
return true;
}
private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, BalanceType balanceType,
Map<Long, Map<Long, Map<Long, List<Tablet>>>> beToTabletsInParts,
Map<Long, Map<Long, List<Tablet>>> beToTabletsInTables) {
if (balanceType == balanceType.GLOBAL) {
// check is conflict with partition balance
long maxBeSize = beToTabletsInParts.get(cloudReplica.getPartitionId())
.get(cloudReplica.getIndexId()).get(srcBe).size();
List<Tablet> destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId())
.get(cloudReplica.getIndexId()).get(destBe);
long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
if (minBeSize >= maxBeSize) {
return true;
}
// check is conflict with table balance
maxBeSize = beToTabletsInTables.get(cloudReplica.getTableId()).get(srcBe).size();
destBeTablets = beToTabletsInTables.get(cloudReplica.getTableId()).get(destBe);
minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
if (minBeSize >= maxBeSize) {
return true;
}
}
if (balanceType == balanceType.TABLE) {
// check is conflict with partition balance
long maxBeSize = beToTabletsInParts.get(cloudReplica.getPartitionId())
.get(cloudReplica.getIndexId()).get(srcBe).size();
List<Tablet> destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId())
.get(cloudReplica.getIndexId()).get(destBe);
long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
return minBeSize >= maxBeSize;
}
return false;
}
private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet>> beToTablets,
BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
if (bes == null || bes.isEmpty() || beToTablets == null || beToTablets.isEmpty()) {
return;
}
long totalTabletsNum = 0;
long beNum = 0;
for (Long be : bes) {
long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size();
Backend backend = cloudSystemInfoService.getBackend(be);
if (backend != null && !backend.isDecommissioning()) {
beNum++;
}
totalTabletsNum += tabletNum;
}
if (beNum == 0) {
LOG.warn("zero be, but want balance, skip");
return;
}
long avgNum = totalTabletsNum / beNum;
long transferNum = Math.max(Math.round(avgNum * Config.cloud_balance_tablet_percent_per_run),
Config.cloud_min_balance_tablet_num_per_run);
for (int i = 0; i < transferNum; i++) {
TransferPairInfo pairInfo = new TransferPairInfo();
if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) {
// no need balance;
break;
}
if (balanceType == balanceType.PARTITION) {
indexBalanced = false;
}
if (balanceType == balanceType.TABLE) {
tableBalanced = false;
}
long srcBe = pairInfo.srcBe;
long destBe = pairInfo.destBe;
long minTabletsNum = pairInfo.minTabletsNum;
long maxTabletsNum = pairInfo.maxTabletsNum;
int randomIndex = rand.nextInt(beToTablets.get(srcBe).size());
Tablet pickedTablet = beToTablets.get(srcBe).get(randomIndex);
CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
// if srcBe is dead, destBe cann't download cache from it, preheating will failed
if (Config.enable_cloud_warm_up_for_rebalance && srcBackend != null && srcBackend.isAlive()) {
if (isConflict(srcBe, destBe, cloudReplica, balanceType, futurePartitionToTablets,
futureBeToTabletsInTable)) {
continue;
}
try {
sendPreHeatingRpc(pickedTablet, srcBe, destBe);
} catch (Exception e) {
break;
}
InfightTask task = new InfightTask();
task.pickedTablet = pickedTablet;
task.srcBe = srcBe;
task.destBe = destBe;
task.balanceType = balanceType;
task.beToTablets = beToTablets;
task.startTimestamp = System.currentTimeMillis() / 1000;
tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), clusterId), task);
LOG.info("pre cache {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}",
pickedTablet.getId(), srcBe, destBe, clusterId,
minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, cloudReplica.getPartitionId());
updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
} else {
if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) {
continue;
}
LOG.info("transfer {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}",
pickedTablet.getId(), srcBe, destBe, clusterId,
minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, cloudReplica.getPartitionId());
updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, beToTabletsGlobal,
beToTabletsInTable, partitionToTablets);
updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
}
}
}
public void addTabletMigrationTask(Long srcBe, Long dstBe) {
tabletsMigrateTasks.offer(Pair.of(srcBe, dstBe));
}
/* Migrate tablet replicas from srcBe to dstBe
* replica location info will be updated in both master and follower FEs.
*/
private void migrateTablets(Long srcBe, Long dstBe) {
// get tablets
List<Tablet> tablets = beToTabletsGlobal.get(srcBe);
if (tablets == null || tablets.isEmpty()) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Tablet tablet : tablets) {
// get replica
CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0);
Backend be = cloudSystemInfoService.getBackend(srcBe);
if (be == null) {
LOG.info("src backend {} not found", srcBe);
continue;
}
// populate to followers
Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
if (db == null) {
long beId;
try {
beId = cloudReplica.getBackendId();
} catch (ComputeGroupException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("get backend failed cloudReplica {}", cloudReplica, e);
}
beId = -1;
}
LOG.error("get null db from replica, tabletId={}, partitionId={}, beId={}",
cloudReplica.getTableId(), cloudReplica.getPartitionId(), beId);
continue;
}
OlapTable table = (OlapTable) db.getTableNullable(cloudReplica.getTableId());
if (table == null) {
continue;
}
String clusterId = be.getCloudClusterId();
String clusterName = be.getCloudClusterName();
table.readLock();
try {
if (db.getTableNullable(cloudReplica.getTableId()) == null) {
continue;
}
// update replica location info
cloudReplica.updateClusterToPrimaryBe(clusterId, dstBe);
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
tablet.getId(), cloudReplica.getId(), clusterId, dstBe);
infos.add(info);
} finally {
table.readUnlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}",
tablet.getId(), srcBe, dstBe, clusterId, clusterName);
}
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog migrate before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("update cloud replicas failed", e);
// edit log failed, try next time
throw new RuntimeException(e);
}
try {
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
} catch (UserException e) {
LOG.warn("registerWaterShedTxnId get exception", e);
throw new RuntimeException(e);
}
}
private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos) {
long start = System.currentTimeMillis();
List<UpdateCloudReplicaInfo> rets = new ArrayList<>();
// clusterId, infos
Map<String, List<UpdateCloudReplicaInfo>> clusterIdToInfos = infos.stream()
.collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId));
for (Map.Entry<String, List<UpdateCloudReplicaInfo>> entry : clusterIdToInfos.entrySet()) {
// same cluster
String clusterId = entry.getKey();
List<UpdateCloudReplicaInfo> infoList = entry.getValue();
Map<Long, List<UpdateCloudReplicaInfo>> sameLocationInfos = infoList.stream()
.collect(Collectors.groupingBy(
info -> info.getDbId()
+ info.getTableId() + info.getPartitionId() + info.getIndexId()));
sameLocationInfos.forEach((location, locationInfos) -> {
UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo();
long dbId = -1;
long tableId = -1;
long partitionId = -1;
long indexId = -1;
for (UpdateCloudReplicaInfo info : locationInfos) {
Preconditions.checkState(clusterId.equals(info.getClusterId()),
"impossible, cluster id not eq outer=" + clusterId + ", inner=" + info.getClusterId());
dbId = info.getDbId();
tableId = info.getTableId();
partitionId = info.getPartitionId();
indexId = info.getIndexId();
StringBuilder sb = new StringBuilder("impossible, some locations do not match location");
sb.append(", location=").append(location).append(", dbId=").append(dbId)
.append(", tableId=").append(tableId).append(", partitionId=").append(partitionId)
.append(", indexId=").append(indexId);
Preconditions.checkState(location == dbId + tableId + partitionId + indexId, sb.toString());
long tabletId = info.getTabletId();
long replicaId = info.getReplicaId();
long beId = info.getBeId();
newInfo.getTabletIds().add(tabletId);
newInfo.getReplicaIds().add(replicaId);
newInfo.getBeIds().add(beId);
}
newInfo.setDbId(dbId);
newInfo.setTableId(tableId);
newInfo.setPartitionId(partitionId);
newInfo.setIndexId(indexId);
newInfo.setClusterId(clusterId);
// ATTN: in unprotectUpdateCloudReplica, use batch must set tabletId = -1
newInfo.setTabletId(-1);
rets.add(newInfo);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur size {} cost {} ms",
infos.size(), rets.size(), System.currentTimeMillis() - start);
}
return rets;
}
}