CloudBrokerLoadJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.loadv2.BrokerLoadJob;
import org.apache.doris.load.loadv2.BrokerPendingTaskAttachment;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadLoadingTask;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
public class CloudBrokerLoadJob extends BrokerLoadJob {
private static final Logger LOG = LogManager.getLogger(CloudBrokerLoadJob.class);
protected static final String CLOUD_CLUSTER_ID = "clusterId";
protected String cloudClusterId;
protected int retryTimes = 3;
public CloudBrokerLoadJob() {
}
public CloudBrokerLoadJob(EtlJobType type) {
super(type);
}
public CloudBrokerLoadJob(EtlJobType type, long dbId, String label, BrokerDesc brokerDesc,
OriginStatement originStmt, UserIdentity userInfo)
throws MetaNotFoundException {
super(type, dbId, label, brokerDesc, originStmt, userInfo);
setCloudClusterId();
}
public CloudBrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt,
UserIdentity userInfo) throws MetaNotFoundException {
super(dbId, label, brokerDesc, originStmt, userInfo);
setCloudClusterId();
}
private void setCloudClusterId() throws MetaNotFoundException {
ConnectContext context = ConnectContext.get();
if (context != null) {
String clusterName = "";
try {
clusterName = context.getCloudCluster();
} catch (ComputeGroupException e) {
LOG.warn("failed to get compute group name", e);
throw new MetaNotFoundException("failed to get compute group name " + e);
}
if (Strings.isNullOrEmpty(clusterName)) {
LOG.warn("compute group name is empty");
throw new MetaNotFoundException("compute group name is empty");
}
this.cloudClusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterIdByName(clusterName);
if (Strings.isNullOrEmpty(this.cloudClusterId)) {
LOG.warn("can not find compute group: {}", clusterName);
throw new MetaNotFoundException("can not find compute group: " + clusterName);
}
sessionVariables.put(CLOUD_CLUSTER_ID, this.cloudClusterId);
}
}
private AutoCloseConnectContext buildConnectContext() throws UserException {
cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getClusterNameByClusterId(cloudClusterId);
if (Strings.isNullOrEmpty(clusterName)) {
LOG.warn("cluster name is empty, cluster id is {}", cloudClusterId);
throw new UserException("cluster name is empty, cluster id is: " + cloudClusterId);
}
// NOTE: set user info in context in for auth check in CloudReplica
if (ConnectContext.get() == null) {
ConnectContext connectContext = new ConnectContext();
connectContext.setCloudCluster(clusterName);
connectContext.setCurrentUserIdentity(this.userInfo);
connectContext.setQualifiedUser(this.userInfo.getQualifiedUser());
if (connectContext.getEnv() == null) {
connectContext.setEnv(Env.getCurrentEnv());
}
return new AutoCloseConnectContext(connectContext);
} else {
ConnectContext.get().setCloudCluster(clusterName);
ConnectContext.get().setCurrentUserIdentity(this.userInfo);
ConnectContext.get().setQualifiedUser(this.userInfo.getQualifiedUser());
if (ConnectContext.get().getEnv() == null) {
ConnectContext.get().setEnv(Env.getCurrentEnv());
}
return null;
}
}
// override BulkLoadJob.analyze
@Override
public void analyze() {
cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
super.analyze();
}
@Override
protected LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups,
boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey aggKey,
BrokerPendingTaskAttachment attachment) throws UserException {
cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
LoadLoadingTask task = new CloudLoadLoadingTask(this.userInfo, db, table, brokerDesc,
brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(),
getPriority(), isEnableMemtableOnSinkNode, batchSize, cloudClusterId);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
try (AutoCloseConnectContext r = buildConnectContext()) {
task.init(loadId, attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey), getUserInfo());
task.settWorkloadGroups(tWorkloadGroups);
} catch (UserException e) {
throw e;
}
return task;
}
@Override
protected void executeFinish() {
super.executeFinish();
// When replaying a load job, the state of the job can be obtained through replaying transaction
// status information in local mode. However, in cloud mode, there is no edit log of transaction
// in fe. So pint an edit log to save the status information of the job here.
logFinalOperation();
}
@Override
protected void afterLoadingTaskCommitTransaction(List<Table> tableList) {
ConnectContext ctx = null;
if (ConnectContext.get() == null) {
ctx = new ConnectContext();
ctx.setThreadLocalInfo();
} else {
ctx = ConnectContext.get();
}
if (ctx.getSessionVariable().enableMultiClusterSyncLoad()) {
// get the backends of each cluster expect the load cluster
CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo();
List<List<Backend>> backendsList = infoService.getCloudClusterIds()
.stream()
.filter(id -> !id.equals(cloudClusterId))
.map(id -> infoService.getBackendsByClusterId(id))
.collect(Collectors.toList());
// for each all load table, get its tablets
tableList.forEach(table -> {
List<Long> allTabletIds = ((OlapTable) table).getAllTabletIds();
StmtExecutor.syncLoadForTablets(backendsList, allTabletIds);
});
}
}
@Override
public void onTaskFailed(long taskId, FailMsg failMsg) {
if (Strings.isNullOrEmpty(this.cloudClusterId)) {
super.onTaskFailed(taskId, failMsg);
return;
}
try {
writeLock();
if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("label", label)
.add("transactionId", transactionId)
.add("state", state)
.add("error_msg", "this task will be ignored when job is: " + state)
.build());
return;
}
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
.add("label", label)
.add("transactionId", transactionId)
.add("state", state)
.add("retryTimes", retryTimes)
.add("failMsg", failMsg.getMsg())
.build());
this.retryTimes--;
if (this.retryTimes <= 0) {
boolean abortTxn = this.transactionId > 0 ? true : false;
unprotectedExecuteCancel(failMsg, abortTxn);
logFinalOperation();
return;
} else {
unprotectedExecuteRetry(failMsg);
}
} finally {
writeUnlock();
}
boolean allTaskDone = false;
while (!allTaskDone) {
try {
writeLock();
// check if all task has been done
// unprotectedExecuteRetry() will cancel all running task
allTaskDone = true;
for (Map.Entry<Long, LoadTask> entry : idToTasks.entrySet()) {
if (entry.getKey() != taskId && !entry.getValue().isDone()) {
LOG.info("LoadTask({}) has not been done", entry.getKey());
allTaskDone = false;
}
}
} finally {
writeUnlock();
}
if (!allTaskDone) {
try {
Thread.sleep(1000);
continue;
} catch (InterruptedException e) {
LOG.warn("", e);
}
}
}
try {
writeLock();
this.state = JobState.PENDING;
this.idToTasks.clear();
this.failMsg = null;
this.finishedTaskIds.clear();
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
LoadTask task = createPendingTask();
// retry default backoff 60 seconds, because `be restart` is slow
task.setStartTimeMs(System.currentTimeMillis() + 60 * 1000);
idToTasks.put(task.getSignature(), task);
Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
} finally {
writeUnlock();
}
}
protected void unprotectedExecuteRetry(FailMsg failMsg) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id", transactionId)
.add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()).build());
// get load ids of all loading tasks, we will cancel their coordinator process later
List<TUniqueId> loadIds = Lists.newArrayList();
for (LoadTask loadTask : idToTasks.values()) {
if (loadTask instanceof LoadLoadingTask) {
loadIds.add(((LoadLoadingTask) loadTask).getLoadId());
}
}
// set failMsg and state
this.failMsg = failMsg;
if (failMsg.getCancelType() == CancelType.TXN_UNKNOWN) {
// for bug fix, see LoadManager's fixLoadJobMetaBugs() method
finishTimestamp = createTimestamp;
} else {
finishTimestamp = System.currentTimeMillis();
}
// remove callback before abortTransaction(), so that the afterAborted() callback will not be called again
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
// abort txn by label, because transactionId here maybe -1
try {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id)
.add("label", label)
.add("msg", "begin to abort txn")
.build());
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, label, failMsg.getMsg());
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("label", label)
.add("error_msg", "failed to abort txn when job is cancelled. " + e.getMessage())
.build());
}
// cancel all running coordinators, so that the scheduler's worker thread will be released
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel(new Status(TStatusCode.CANCELLED, "load job failed"));
}
}
// change state
state = JobState.RETRY;
}
}