GroupCommitManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.SlidingWindowCounter;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class GroupCommitManager {
private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class);
private Set<Long> blockedTableIds = new HashSet<>();
// Encoded <Cluster and Table id> to BE id map. Only for group commit.
private final Map<String, Long> tableToBeMap = new ConcurrentHashMap<>();
// Table id to pressure map. Only for group commit.
private final Map<Long, SlidingWindowCounter> tableToPressureMap = new ConcurrentHashMap<>();
public boolean isBlock(long tableId) {
return blockedTableIds.contains(tableId);
}
public void blockTable(long tableId) {
LOG.info("block group commit for table={} when schema change", tableId);
blockedTableIds.add(tableId);
}
public void unblockTable(long tableId) {
blockedTableIds.remove(tableId);
LOG.info("unblock group commit for table={} when schema change", tableId);
}
/**
* Waiting All WAL files to be deleted.
*/
public void waitWalFinished(long tableId) {
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
while (true) {
LOG.info("wait for wal queue size to be empty");
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
LOG.info("all wal is finished for table={}", tableId);
break;
} else if (System.currentTimeMillis() > expireTime) {
LOG.warn("waitWalFinished time out for table={}", tableId);
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie);
}
}
}
}
/**
* Check the wal before the endTransactionId is finished or not.
*/
private boolean isPreviousWalFinished(long tableId, List<Long> aliveBeIds) {
boolean empty = true;
for (int i = 0; i < aliveBeIds.size(); i++) {
Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
// in ut port is -1, skip checking
if (backend.getBrpcPort() < 0) {
return true;
}
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(tableId)
.build();
long size = getWalQueueSize(backend, request);
if (size > 0) {
LOG.info("backend id:" + backend.getId() + ",wal size:" + size);
empty = false;
}
}
return empty;
}
public long getAllWalQueueSize(Backend backend) {
long getAllWalQueueSizeDP = DebugPointUtil.getDebugParamOrDefault("FE.GET_ALL_WAL_QUEUE_SIZE", -1L);
if (getAllWalQueueSizeDP > 0) {
LOG.info("backend id:" + backend.getHost() + ",use dp all wal size:" + getAllWalQueueSizeDP);
return getAllWalQueueSizeDP;
}
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(-1)
.build();
long size = getWalQueueSize(backend, request);
if (size > 0) {
LOG.info("backend id:" + backend.getId() + ",all wal size:" + size);
}
return size;
}
private long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
if (FeConstants.runningUnitTest) {
return 0;
}
PGetWalQueueSizeResponse response = null;
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
long size = 0;
while (System.currentTimeMillis() <= expireTime) {
if (!backend.isAlive()) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.info("group commit manager sleep wait InterruptedException: ", ie);
}
continue;
}
try {
Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance()
.getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
response = future.get();
} catch (Exception e) {
LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId()
+ ",exception:" + e);
String msg = e.getMessage();
if (msg.contains("Method") && msg.contains("unimplemented")) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.info("group commit manager sleep wait InterruptedException: ", ie);
}
continue;
}
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
String msg = "get all queue size fail,backend id: " + backend.getId() + ", status: "
+ response.getStatus();
LOG.warn(msg);
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.info("group commit manager sleep wait InterruptedException: ", ie);
}
continue;
}
size = response.getSize();
break;
}
return size;
}
public Backend selectBackendForGroupCommit(long tableId, ConnectContext context)
throws LoadException, DdlException {
// If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE
// can select a BE and return this BE id to follower FE.
String clusterName = "";
if (Config.isCloudMode()) {
try {
clusterName = context.getCloudCluster();
} catch (Exception e) {
LOG.warn("failed to get cluster name", e);
throw new LoadException(e.getMessage());
}
}
if (!Env.getCurrentEnv().isMaster()) {
try {
long backendId = new MasterOpExecutor(context)
.getGroupCommitLoadBeId(tableId, clusterName);
return Env.getCurrentSystemInfo().getBackend(backendId);
} catch (Exception e) {
throw new LoadException(e.getMessage());
}
} else {
try {
// Master FE will select BE by itself.
return Env.getCurrentSystemInfo()
.getBackend(selectBackendForGroupCommitInternal(tableId, clusterName));
} catch (Exception e) {
LOG.warn("get backend failed, tableId: {}, exception", tableId, e);
throw new LoadException(e.getMessage());
}
}
}
public long selectBackendForGroupCommitInternal(long tableId, String cluster)
throws LoadException, DdlException {
// Understanding Group Commit and Backend Selection Logic
//
// Group commit is a server-side technique used for batching data imports.
// The primary purpose of group commit is to enhance import performance by
// reducing the number of versions created for high-frequency, small-batch imports.
// Without batching, each import operation creates a separate version, similar to a rowset in an LSM Tree,
// which can consume significant compaction resources and degrade system performance.
// By batching data, fewer versions are generated from the same amount of data,
// thus minimizing compaction and improving performance. For detailed usage,
// you can refer to the Group Commit Manual
// (https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/) .
//
// The specific backend (BE) selection logic for group commits aims to
// direct data belonging to the same table to the same BE for batching.
// This is because group commit batches data imported to the same table
// on the same BE into a single version, which is then flushed periodically.
// For example, if data for the same table is distributed across three BEs,
// it will result in three versions.
// Conversely, if data for four different tables is directed to the same BE,
// it will create four versions. However,
// directing all data for the same table to a single BE will only produce one version.
//
// To optimize performance and avoid overloading a single BE, the strategy for selecting a BE works as follows:
//
// If a BE is already handling imports for table A and is not under significant load,
// the data is sent to this BE.
// If the BE is overloaded or if there is no existing record of a BE handling imports for table A,
// a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level.
// This approach ensures that group commits can effectively batch data together
// while managing the load on each BE efficiently.
return Config.isCloudMode() ? selectBackendForCloudGroupCommitInternal(tableId, cluster)
: selectBackendForLocalGroupCommitInternal(tableId);
}
private long selectBackendForCloudGroupCommitInternal(long tableId, String cluster)
throws DdlException, LoadException {
LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}",
tableToBeMap.toString(), tableToPressureMap.toString());
if (Strings.isNullOrEmpty(cluster)) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
}
Long cachedBackendId = getCachedBackend(cluster, tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}
List<Backend> backends = new ArrayList<>(
((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster)
.values());
if (backends.isEmpty()) {
throw new LoadException("No alive backend");
}
// If the cached backend is not active or decommissioned, select a random new backend.
Long randomBackendId = getRandomBackend(cluster, tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
List<String> backendsInfo = backends.stream()
.map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive()
+ ", decommission=" + be.isDecommissioned() + " }")
.collect(Collectors.toList());
throw new LoadException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo);
}
private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException {
LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(),
tableToPressureMap.toString());
Long cachedBackendId = getCachedBackend(null, tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}
List<Backend> backends = new ArrayList<>();
try {
backends = new ArrayList<>(Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList());
} catch (AnalysisException e) {
LOG.warn("failed to get backends by all cluster", e);
throw new LoadException(e.getMessage());
}
if (backends.isEmpty()) {
throw new LoadException("No alive backend");
}
// If the cached backend is not active or decommissioned, select a random new backend.
Long randomBackendId = getRandomBackend(null, tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
List<String> backendsInfo = backends.stream()
.map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive()
+ ", decommission=" + be.isDecommissioned() + " }")
.collect(Collectors.toList());
throw new LoadException("No suitable backend " + ", backends = " + backendsInfo);
}
@Nullable
private Long getCachedBackend(String cluster, long tableId) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
if (tableToBeMap.containsKey(encode(cluster, tableId))) {
if (tableToPressureMap.get(tableId) == null) {
return null;
} else if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) {
// There are multiple threads getting cached backends for the same table.
// Maybe one thread removes the tableId from the tableToBeMap.
// Another thread gets the same tableId but can not find this tableId.
// So another thread needs to get the random backend.
Long backendId = tableToBeMap.get(encode(cluster, tableId));
if (backendId == null) {
return null;
}
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend != null && backend.isAlive() && !backend.isDecommissioned()) {
return backend.getId();
} else {
tableToBeMap.remove(encode(cluster, tableId));
}
} else {
tableToBeMap.remove(encode(cluster, tableId));
}
}
return null;
}
@Nullable
private Long getRandomBackend(String cluster, long tableId, List<Backend> backends) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
Collections.shuffle(backends);
for (Backend backend : backends) {
if (backend.isAlive() && !backend.isDecommissioned()) {
tableToBeMap.put(encode(cluster, tableId), backend.getId());
tableToPressureMap.put(tableId,
new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
return backend.getId();
}
}
return null;
}
private String encode(String cluster, long tableId) {
if (cluster == null) {
return String.valueOf(tableId);
} else {
return cluster + tableId;
}
}
public void updateLoadData(long tableId, long receiveData) {
if (tableId == -1) {
LOG.warn("invalid table id: " + tableId);
}
if (!Env.getCurrentEnv().isMaster()) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
// set user to ADMIN_USER, so that we can get the proper resource tag
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();
try {
new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
updateLoadDataInternal(tableId, receiveData);
}
}
private void updateLoadDataInternal(long tableId, long receiveData) {
if (tableToPressureMap.containsKey(tableId)) {
tableToPressureMap.get(tableId).add(receiveData);
LOG.info("Update load data for table {}, receiveData {}, tablePressureMap {}", tableId, receiveData,
tableToPressureMap.toString());
} else if (LOG.isDebugEnabled()) {
LOG.debug("can not find table id {}", tableId);
}
}
}