CopyJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CopyStmt;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.proto.Cloud.FinishCopyRequest.Action;
import org.apache.doris.cloud.proto.Cloud.StagePB;
import org.apache.doris.cloud.proto.Cloud.StagePB.StageType;
import org.apache.doris.cloud.stage.StageUtil;
import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TUniqueId;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import com.google.gson.reflect.TypeToken;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
public class CopyJob extends CloudBrokerLoadJob {
private static final Logger LOG = LogManager.getLogger(CopyJob.class);
private static final String TABLE_NAME_KEY = "TableName";
private static final String USER_NAME_KEY = "UserName";
@Getter
private String stageId;
@Getter
private StagePB.StageType stageType;
@Getter
private String stagePrefix;
@Getter
private long sizeLimit;
@Getter
private String pattern;
@Getter
private ObjectInfo objectInfo;
@SerializedName("cid")
@Getter
private String copyId;
@Getter
private boolean forceCopy;
@SerializedName("lfp")
private String loadFilePaths = "";
@SerializedName("pty")
private Map<String, String> properties = new HashMap<>();
private volatile boolean abortedCopy = false;
private boolean isReplay = false;
private List<String> loadFiles = null;
public CopyJob() {
super(EtlJobType.COPY);
}
public CopyJob(long dbId, String label, TUniqueId queryId, BrokerDesc brokerDesc, OriginStatement originStmt,
UserIdentity userInfo, String stageId, StagePB.StageType stageType, String stagePrefix, long sizeLimit,
String pattern, ObjectInfo objectInfo, boolean forceCopy, String user) throws MetaNotFoundException {
super(EtlJobType.COPY, dbId, label, brokerDesc, originStmt, userInfo);
this.stageId = stageId;
this.stageType = stageType;
this.stagePrefix = stagePrefix;
this.sizeLimit = sizeLimit;
this.pattern = pattern;
this.objectInfo = objectInfo;
this.forceCopy = forceCopy;
this.copyId = DebugUtil.printId(queryId);
this.properties.put(USER_NAME_KEY, user);
}
@Override
public void checkAndSetDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
super.checkAndSetDataSourceInfo(db, dataDescriptions);
// now, copy into only support one table
for (DataDescription dataDescription : dataDescriptions) {
properties.put(TABLE_NAME_KEY, dataDescription.getTableName());
}
}
@Override
protected LoadTask createPendingTask() {
return new CopyLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc);
}
@Override
protected void afterCommit() throws DdlException {
super.afterCommit();
if (forceCopy) {
return;
}
for (Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups()
.entrySet()) {
long tableId = entry.getKey().getTableId();
LOG.debug("Start finish copy for stage={}, table={}, queryId={}", stageId, tableId, getCopyId());
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.finishCopy(stageId, stageType, tableId, getCopyId(), 0, Action.COMMIT);
// delete internal stage files and copy job
if (Config.cloud_delete_loaded_internal_stage_files && loadFiles != null && stageType == StageType.INTERNAL
&& !isForceCopy()) {
CleanCopyJobTask copyJobCleanTask = new CleanCopyJobTask(objectInfo, stageId, stageType, tableId,
copyId, loadFiles);
((CloudLoadManager) Env.getCurrentEnv().getLoadManager()).createCleanCopyJobTask(copyJobCleanTask);
}
}
}
@Override
public void cancelJob(FailMsg failMsg) throws DdlException {
super.cancelJob(failMsg);
loadFiles = null;
if (forceCopy || abortedCopy) {
return;
}
for (Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups()
.entrySet()) {
long tableId = entry.getKey().getTableId();
LOG.info("Cancel copy for stage={}, table={}, queryId={}", stageId, tableId, getCopyId());
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.finishCopy(stageId, stageType, tableId, getCopyId(), 0, Action.ABORT);
}
abortedCopy = true;
}
@Override
public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
loadFiles = null;
if (forceCopy || abortedCopy) {
return;
}
abortCopy();
}
@Override
protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
super.unprotectedExecuteCancel(failMsg, abortTxn);
loadFiles = null;
if (forceCopy || abortedCopy) {
return;
}
abortCopy();
}
private void abortCopy() {
for (Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups()
.entrySet()) {
long tableId = entry.getKey().getTableId();
try {
LOG.info("Cancel copy for stage={}, table={}, queryId={}", stageId, tableId, getCopyId());
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.finishCopy(stageId, stageType, tableId, getCopyId(), 0, Action.ABORT);
abortedCopy = true;
} catch (DdlException e) {
// if cancel copy failed, kvs in fdb will be cleaned when expired
LOG.warn("Failed to cancel copy for stage={}, table={}, queryId={}", stageId, tableId, getCopyId(), e);
}
}
}
public void setAbortedCopy(boolean abortedCopy) {
this.abortedCopy = abortedCopy;
}
@Override
protected List<Comparable> getShowInfoUnderLock() throws DdlException {
List<Comparable> showInfos = new ArrayList<>();
showInfos.add(getCopyId());
showInfos.addAll(super.getShowInfoUnderLock());
// table name
showInfos.add(getTableName());
showInfos.add(loadFilePaths);
return showInfos;
}
@Override
protected void logFinalOperation() {
Env.getCurrentEnv().getEditLog().logEndLoadJob(getLoadJobFinalOperation());
}
@Override
public void unprotectReadEndOperation(LoadJobFinalOperation loadJobFinalOperation) {
super.unprotectReadEndOperation(loadJobFinalOperation);
this.copyId = loadJobFinalOperation.getCopyId();
this.loadFilePaths = loadJobFinalOperation.getLoadFilePaths();
this.properties.putAll(loadJobFinalOperation.getProperties());
}
@Override
protected LoadJobFinalOperation getLoadJobFinalOperation() {
return new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg, copyId, loadFilePaths, properties);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
copyId = Text.readString(in);
loadFilePaths = Text.readString(in);
String property = Text.readString(in);
properties = property.isEmpty() ? new HashMap<>()
: (new Gson().fromJson(property, new TypeToken<Map<String, String>>() {
}.getType()));
}
@Override
protected void analyzeStmt(StatementBase stmtBase, Database db) throws UserException {
CopyStmt stmt = (CopyStmt) stmtBase;
stmt.analyzeWhenReplay(getUser(), db.getFullName());
// check if begin copy happened
checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
this.stageId = stmt.getStageId();
this.stageType = stmt.getStageType();
this.sizeLimit = stmt.getSizeLimit();
this.pattern = stmt.getPattern();
this.objectInfo = stmt.getObjectInfo();
this.forceCopy = stmt.isForce();
this.isReplay = true;
}
protected void setSelectedFiles(Map<FileGroupAggKey, List<List<TBrokerFileStatus>>> fileStatusMap) {
this.loadFilePaths = selectedFilesToJson(fileStatusMap);
}
private String selectedFilesToJson(Map<FileGroupAggKey, List<List<TBrokerFileStatus>>> selectedFiles) {
if (selectedFiles == null) {
return "";
}
List<String> paths = new ArrayList<>();
for (Entry<FileGroupAggKey, List<List<TBrokerFileStatus>>> entry : selectedFiles.entrySet()) {
for (List<TBrokerFileStatus> fileStatuses : entry.getValue()) {
paths.addAll(fileStatuses.stream().map(e -> e.path).collect(Collectors.toList()));
}
}
if (stageType == StageType.INTERNAL) {
loadFiles = StageUtil.parseLoadFiles(paths, objectInfo.getBucket(), stagePrefix);
}
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(paths);
}
public String getTableName() {
return properties.containsKey(TABLE_NAME_KEY) ? properties.get(TABLE_NAME_KEY) : "";
}
public String getFiles() {
return loadFilePaths == null ? "" : loadFilePaths;
}
public String getUser() {
return properties.get(USER_NAME_KEY);
}
protected boolean isReplay() {
return this.isReplay;
}
@Override
protected void unprotectedExecuteRetry(FailMsg failMsg) {
LOG.info("CopyJob.unprotectedExecuteRetry(): forceCopy={}, abortedCopy={}", forceCopy, abortedCopy);
super.unprotectedExecuteRetry(failMsg);
if (forceCopy || abortedCopy) {
return;
}
abortCopy();
}
}