ExportJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FromClause;
import org.apache.doris.analysis.LimitElement;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SelectList;
import org.apache.doris.analysis.SelectListItem;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundStar;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
@Data
public class ExportJob implements Writable {
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
private static final String BROKER_PROPERTY_PREFIXES = "broker.";
public static final String CONSISTENT_NONE = "none";
public static final String CONSISTENT_PARTITION = "partition";
@SerializedName("id")
private long id;
@SerializedName("label")
private String label;
@SerializedName("dbId")
private long dbId;
@SerializedName("tableId")
private long tableId;
@SerializedName("brokerDesc")
private BrokerDesc brokerDesc;
@SerializedName("exportPath")
private String exportPath;
@SerializedName("columnSeparator")
private String columnSeparator;
@SerializedName("lineDelimiter")
private String lineDelimiter;
@SerializedName(value = "partitionNames", alternate = {"partitions"})
private List<String> partitionNames;
@SerializedName("tableName")
private TableName tableName;
@SerializedName("state")
private ExportJobState state;
@SerializedName("createTimeMs")
private long createTimeMs;
// this is the origin stmt of ExportStmt, we use it to persist where expr of Export job,
// because we can not serialize the Expressions contained in job.
@SerializedName("origStmt")
private OriginStatement origStmt;
@SerializedName("qualifiedUser")
private String qualifiedUser;
@SerializedName("userIdentity")
private UserIdentity userIdentity;
@SerializedName("columns")
private String columns;
@SerializedName("format")
private String format;
@SerializedName("timeoutSecond")
private int timeoutSecond;
@SerializedName("maxFileSize")
private String maxFileSize;
@SerializedName("deleteExistingFiles")
private String deleteExistingFiles;
@SerializedName("startTimeMs")
private long startTimeMs;
@SerializedName("finishTimeMs")
private long finishTimeMs;
@SerializedName("failMsg")
private ExportFailMsg failMsg;
@SerializedName("outfileInfo")
private String outfileInfo;
// progress has two functions at EXPORTING stage:
// 1. when progress < 100, it indicates exporting
// 2. set progress = 100 ONLY when exporting progress is completely done
@SerializedName("progress")
private int progress;
@SerializedName("tabletsNum")
private Integer tabletsNum;
@SerializedName("withBom")
private String withBom;
@SerializedName("dataConsistency")
private String dataConsistency;
@SerializedName("compressType")
private String compressType;
private TableRef tableRef;
private Expr whereExpr;
private Optional<Expression> whereExpression;
private int parallelism;
private Map<String, Long> partitionToVersion = Maps.newHashMap();
/**
* Each parallel has an associated Outfile list
* which are organized into a two-dimensional list.
* to get the outfile logical plans list responsible for each parallel task.
*/
private List<Optional<StatementBase>> selectStmtPerParallel = Lists.newArrayList();
private List<String> exportColumns = Lists.newArrayList();
private TableIf exportTable;
// when set to true, means this job instance is created by replay thread(FE restarted or master changed)
private boolean isReplayed = false;
private SessionVariable sessionVariables;
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();
private List<ExportTaskExecutor> jobExecutorList = Lists.newArrayList();
private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
public ExportJob() {
this.id = -1;
this.dbId = -1;
this.tableId = -1;
this.state = ExportJobState.PENDING;
this.progress = 0;
this.createTimeMs = System.currentTimeMillis();
this.startTimeMs = -1;
this.finishTimeMs = -1;
this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, "");
this.outfileInfo = "";
this.exportPath = "";
this.columnSeparator = "\t";
this.lineDelimiter = "\n";
this.columns = "";
this.withBom = "false";
this.dataConsistency = CONSISTENT_PARTITION;
}
public ExportJob(long jobId) {
this();
this.id = jobId;
}
public boolean isPartitionConsistency() {
return dataConsistency != null && dataConsistency.equals(CONSISTENT_PARTITION);
}
public void generateOutfileStatement() throws UserException {
exportTable.readLock();
try {
generateQueryStmt();
} finally {
exportTable.readUnlock();
}
generateExportJobExecutor();
}
/**
* For an ExportJob:
* The ExportJob is divided into multiple 'ExportTaskExecutor'
* according to the 'parallelism' set by the user.
* The tablets which will be exported by this ExportJob are divided into 'parallelism' copies,
* and each ExportTaskExecutor is responsible for a list of tablets.
*
* @throws UserException
*/
public void generateOutfileLogicalPlans(List<String> qualifiedTableName)
throws UserException {
String catalogType = Env.getCurrentEnv().getCatalogMgr().getCatalog(this.tableName.getCtl()).getType();
exportTable.readLock();
try {
if (InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalogType)) {
if (exportTable.getType() == TableType.VIEW) {
// view table
generateViewOrExternalTableOutfile(qualifiedTableName);
} else if (exportTable.isManagedTable()) {
// olap table
generateOlapTableOutfile(qualifiedTableName);
} else {
throw new UserException("Do not support export table type [" + exportTable.getType() + "]");
}
} else {
// external table
generateViewOrExternalTableOutfile(qualifiedTableName);
}
// debug LOG output
if (LOG.isDebugEnabled()) {
for (int i = 0; i < selectStmtPerParallel.size(); ++i) {
if (LOG.isDebugEnabled()) {
LOG.debug("ExportTaskExecutor {} is responsible for outfile:", i);
LOG.debug("outfile sql: [{}]", selectStmtPerParallel.get(i).get().toSql());
}
}
}
} finally {
exportTable.readUnlock();
}
generateExportJobExecutor();
}
private void generateOlapTableOutfile(List<String> qualifiedTableName) throws UserException {
// build source columns
List<NamedExpression> selectLists = Lists.newArrayList();
if (exportColumns.isEmpty()) {
selectLists.add(new UnboundStar(ImmutableList.of()));
} else {
this.exportColumns.stream().forEach(col -> {
selectLists.add(new UnboundSlot(this.tableName.getTbl(), col));
});
}
// get all tablets
List<List<Long>> tabletsListPerParallel = splitTablets();
// Each Outfile clause responsible for MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
for (List<Long> tabletsList : tabletsListPerParallel) {
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, tabletsList,
this.partitionNames, selectLists);
// generate LogicalPlanAdapter
StatementBase statementBase = generateLogicalPlanAdapter(plan);
selectStmtPerParallel.add(Optional.of(statementBase));
}
}
/**
* This method used to generate outfile sql for view table or external table.
* @throws UserException
*/
private void generateViewOrExternalTableOutfile(List<String> qualifiedTableName) {
// Because there is no division of tablets in view and external table
// we set parallelism = 1;
this.parallelism = 1;
if (LOG.isDebugEnabled()) {
LOG.debug("Because there is no division of tablets in view and external table, we set parallelism = 1");
}
// build source columns
List<NamedExpression> selectLists = Lists.newArrayList();
if (exportColumns.isEmpty()) {
selectLists.add(new UnboundStar(ImmutableList.of()));
} else {
this.exportColumns.stream().forEach(col -> {
selectLists.add(new UnboundSlot(this.tableName.getTbl(), col));
});
}
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, ImmutableList.of(),
ImmutableList.of(), selectLists);
// generate LogicalPlanAdapter
StatementBase statementBase = generateLogicalPlanAdapter(plan);
selectStmtPerParallel.add(Optional.of(statementBase));
}
private LogicalPlan generateOneLogicalPlan(List<String> qualifiedTableName, List<Long> tabletIds,
List<String> partitions, List<NamedExpression> selectLists) {
// UnboundRelation
LogicalPlan plan = new UnboundRelation(StatementScopeIdGenerator.newRelationId(), qualifiedTableName,
partitions, false, tabletIds, ImmutableList.of(), Optional.empty(), Optional.empty());
// LogicalCheckPolicy
plan = new LogicalCheckPolicy<>(plan);
// LogicalFilter
if (this.whereExpression.isPresent()) {
plan = new LogicalFilter<>(ExpressionUtils.extractConjunctionToSet(this.whereExpression.get()), plan);
}
// LogicalFilter
plan = new LogicalProject(selectLists, plan);
// LogicalFileSink
plan = new LogicalFileSink<>(this.exportPath, this.format, convertOutfileProperties(),
ImmutableList.of(), plan);
return plan;
}
private StatementBase generateLogicalPlanAdapter(LogicalPlan outfileLogicalPlan) {
StatementContext statementContext = new StatementContext();
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null) {
statementContext.setConnectContext(connectContext);
}
StatementBase statementBase = new LogicalPlanAdapter(outfileLogicalPlan, statementContext);
statementBase.setOrigStmt(new OriginStatement(statementBase.toSql(), 0));
return statementBase;
}
public List<? extends TransientTaskExecutor> getCopiedTaskExecutors() {
return Lists.newArrayList(jobExecutorList);
}
private void generateExportJobExecutor() {
jobExecutorList = Lists.newArrayList();
for (Optional<StatementBase> selectStmt : selectStmtPerParallel) {
ExportTaskExecutor executor = new ExportTaskExecutor(selectStmt, this);
jobExecutorList.add(executor);
}
// add empty task to make export job could be finished finally if jobExecutorList is empty
// which means that export table without data
if (jobExecutorList.isEmpty()) {
ExportTaskExecutor executor = new ExportTaskExecutor(Optional.empty(), this);
jobExecutorList.add(executor);
}
}
/**
* Generate outfile select stmt
* @throws UserException
*/
private void generateQueryStmt() throws UserException {
SelectList list = new SelectList();
if (exportColumns.isEmpty()) {
list.addItem(SelectListItem.createStarItem(this.tableName));
} else {
for (Column column : exportTable.getBaseSchema()) {
String colName = column.getName();
if (exportColumns.contains(colName.toLowerCase())) {
SlotRef slotRef = new SlotRef(this.tableName, colName);
SelectListItem selectListItem = new SelectListItem(slotRef, null);
list.addItem(selectListItem);
}
}
}
List<TableRef> tableRefPerParallel = getTableRefListPerParallel();
LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, tableRefPerParallel.size());
// debug LOG output
if (LOG.isDebugEnabled()) {
for (int i = 0; i < tableRefPerParallel.size(); i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("ExportTaskExecutor {} is responsible for tablets:", i);
LOG.debug("Tablet id: [{}]", tableRefPerParallel.get(i).getSampleTabletIds());
}
}
}
// generate 'select..outfile..' statement
for (TableRef tableReferences : tableRefPerParallel) {
FromClause fromClause = new FromClause(Lists.newArrayList(tableReferences));
// generate outfile clause
OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties());
SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null,
null, null, LimitElement.NO_LIMIT);
selectStmt.setOutFileClause(outfile);
selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
selectStmtPerParallel.add(Optional.of(selectStmt));
}
// debug LOG output
if (LOG.isDebugEnabled()) {
for (int i = 0; i < selectStmtPerParallel.size(); ++i) {
if (LOG.isDebugEnabled()) {
LOG.debug("ExportTaskExecutor {} is responsible for outfile:", i);
LOG.debug("outfile sql: [{}]", selectStmtPerParallel.get(i).get().toSql());
}
}
}
}
private List<TableRef> getTableRefListPerParallel() throws UserException {
List<List<Long>> tabletsListPerParallel = splitTablets();
List<TableRef> tableRefPerParallel = Lists.newArrayList();
for (List<Long> tabletsList : tabletsListPerParallel) {
// Since export does not support the alias, here we pass the null value.
// we can not use this.tableRef.getAlias(),
// because the constructor of `Tableref` will convert this.tableRef.getAlias()
// into lower case when lower_case_table_names = 1
TableRef tblRef = new TableRef(this.tableRef.getName(), null,
this.tableRef.getPartitionNames(), (ArrayList) tabletsList,
this.tableRef.getTableSample(), this.tableRef.getCommonHints());
tableRefPerParallel.add(tblRef);
}
return tableRefPerParallel;
}
private List<List<Long>> splitTablets() throws UserException {
// get tablets
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
OlapTable table = db.getOlapTableOrAnalysisException(this.tableName.getTbl());
Integer tabletsAllNum = 0;
List<List<Long>> tabletIdList = Lists.newArrayList();
table.readLock();
try {
final Collection<Partition> partitions = new ArrayList<Partition>();
// get partitions
// user specifies partitions, already checked in ExportCommand
if (!this.partitionNames.isEmpty()) {
this.partitionNames.forEach(partitionName -> {
Partition partition = table.getPartition(partitionName);
if (partition.hasData()) {
partitions.add(partition);
}
});
} else {
table.getPartitions().forEach(partition -> {
if (partition.hasData()) {
partitions.add(partition);
}
});
}
if (partitions.size() > Config.maximum_number_of_export_partitions) {
throw new UserException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by a export job");
}
// get tablets
for (Partition partition : partitions) {
// Partition data consistency is not need to verify partition version.
if (!isPartitionConsistency()) {
partitionToVersion.put(partition.getName(), partition.getVisibleVersion());
}
MaterializedIndex index = partition.getBaseIndex();
List<Long> tablets = index.getTabletIdsInOrder();
tabletsAllNum += tablets.size();
tabletIdList.add(tablets);
}
} finally {
table.readUnlock();
}
if (isPartitionConsistency()) {
// Assign tablets of a partition to per parallel.
int totalPartitions = tabletIdList.size();
int numPerParallel = totalPartitions / this.parallelism;
int numPerQueryRemainder = totalPartitions - numPerParallel * this.parallelism;
int realParallelism = this.parallelism;
if (totalPartitions < this.parallelism) {
realParallelism = totalPartitions;
LOG.warn("Export Job [{}]: The number of partitions ({}) is smaller than parallelism ({}), "
+ "set parallelism to partition num.", id, totalPartitions, this.parallelism);
}
int start = 0;
List<List<Long>> tabletsListPerParallel = new ArrayList<>();
for (int i = 0; i < realParallelism; ++i) {
int partitionNum = numPerParallel;
if (numPerQueryRemainder > 0) {
partitionNum += 1;
--numPerQueryRemainder;
}
List<List<Long>> tablets = new ArrayList<>(tabletIdList.subList(start, start + partitionNum));
List<Long> flatTablets = tablets.stream().flatMap(List::stream).collect(Collectors.toList());
start += partitionNum;
tabletsListPerParallel.add(flatTablets);
}
return tabletsListPerParallel;
}
/**
* Assign tablets to per parallel, for example:
* If the number of all tablets if 10, and the real parallelism is 4,
* then, the number of tablets of per parallel should be: 3 3 2 2.
*/
tabletsNum = tabletsAllNum;
Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerParallel * this.parallelism;
List<List<Long>> tabletsListPerParallel = Lists.newArrayList();
Integer realParallelism = this.parallelism;
if (tabletsAllNum < this.parallelism) {
realParallelism = tabletsAllNum;
LOG.warn("Export Job [{}]: The number of tablets ({}) is smaller than parallelism ({}), "
+ "set parallelism to tablets num.", id, tabletsAllNum, this.parallelism);
}
Integer start = 0;
List<Long> flatTabletIdList = tabletIdList.stream().flatMap(List::stream).collect(Collectors.toList());
for (int j = 0; j < realParallelism; ++j) {
Integer tabletsNum = tabletsNumPerParallel;
if (tabletsNumPerQueryRemainder > 0) {
tabletsNum = tabletsNum + 1;
--tabletsNumPerQueryRemainder;
}
List<Long> tabletsList = new ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
start += tabletsNum;
tabletsListPerParallel.add(tabletsList);
}
return tabletsListPerParallel;
}
private Map<String, String> convertOutfileProperties() {
final Map<String, String> outfileProperties = Maps.newHashMap();
// file properties
if (format.equals("csv") || format.equals("csv_with_names") || format.equals("csv_with_names_and_types")) {
outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR, columnSeparator);
outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER, lineDelimiter);
} else {
// orc / parquet
// compressType == null means outfile will use default compression type
if (compressType != null) {
outfileProperties.put(ExportCommand.COMPRESS_TYPE, compressType);
}
}
if (!maxFileSize.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize);
}
outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom);
// broker properties
// outfile clause's broker properties need 'broker.' prefix
if (brokerDesc.getStorageType() == StorageType.BROKER) {
outfileProperties.put(BROKER_PROPERTY_PREFIXES + "name", brokerDesc.getName());
brokerDesc.getProperties().forEach((k, v) -> outfileProperties.put(BROKER_PROPERTY_PREFIXES + k, v));
} else {
for (Entry<String, String> kv : brokerDesc.getBackendConfigProperties().entrySet()) {
outfileProperties.put(kv.getKey(), kv.getValue());
}
}
return outfileProperties;
}
public synchronized ExportJobState getState() {
return state;
}
private void setExportJobState(ExportJobState newState) {
this.state = newState;
}
public synchronized void updateExportJobState(ExportJobState newState, Long taskId,
List<OutfileInfo> outfileInfoList, ExportFailMsg.CancelType type, String msg) throws JobException {
switch (newState) {
case PENDING:
throw new JobException("Can not update ExportJob state to 'PENDING', job id: [{}], task id: [{}]",
id, taskId);
case EXPORTING:
exportExportJob();
break;
case CANCELLED:
cancelExportTask(type, msg);
break;
case FINISHED:
finishExportTask(taskId, outfileInfoList);
break;
default:
return;
}
}
private void cancelExportTask(ExportFailMsg.CancelType type, String msg) throws JobException {
if (getState() == ExportJobState.CANCELLED) {
return;
}
if (getState() == ExportJobState.FINISHED) {
throw new JobException("Job {} has finished, can not been cancelled", id);
}
if (getState() == ExportJobState.PENDING) {
startTimeMs = System.currentTimeMillis();
}
// we need cancel all task
jobExecutorList.forEach(executor -> {
try {
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId());
} catch (JobException e) {
LOG.warn("cancel export task {} exception: {}", executor.getId(), e);
}
});
cancelExportJobUnprotected(type, msg);
}
private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String msg) {
setExportJobState(ExportJobState.CANCELLED);
finishTimeMs = System.currentTimeMillis();
failMsg = new ExportFailMsg(type, msg);
jobExecutorList.clear();
selectStmtPerParallel.clear();
allOutfileInfo.clear();
partitionToVersion.clear();
if (FeConstants.runningUnitTest) {
return;
}
Env.getCurrentEnv().getEditLog().logExportUpdateState(this, ExportJobState.CANCELLED);
LOG.info("cancel export job {}", id);
}
private void exportExportJob() throws JobException {
if (getState() == ExportJobState.CANCELLED || getState() == ExportJobState.FINISHED) {
throw new JobException("export job has been {}, can not be update to `EXPORTING` state", getState());
}
// The first exportTaskExecutor will set state to EXPORTING,
// other exportTaskExecutors do not need to set up state.
if (getState() == ExportJobState.EXPORTING) {
return;
}
setExportJobState(ExportJobState.EXPORTING);
// if isReplay == true, startTimeMs will be read from LOG
startTimeMs = System.currentTimeMillis();
}
private void finishExportTask(Long taskId, List<OutfileInfo> outfileInfoList) throws JobException {
if (getState() == ExportJobState.CANCELLED) {
throw new JobException("Job [{}] has been cancelled, can not finish this task: {}", id, taskId);
}
allOutfileInfo.add(outfileInfoList);
++finishedTaskCount;
// calculate progress
int tmpProgress = finishedTaskCount * 100 / jobExecutorList.size();
if (finishedTaskCount * 100 / jobExecutorList.size() >= 100) {
progress = 99;
} else {
progress = tmpProgress;
}
// if all task finished
if (finishedTaskCount == jobExecutorList.size()) {
finishExportJobUnprotected();
}
}
private void finishExportJobUnprotected() {
progress = 100;
setExportJobState(ExportJobState.FINISHED);
finishTimeMs = System.currentTimeMillis();
outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
// Clear the jobExecutorList to release memory.
jobExecutorList.clear();
selectStmtPerParallel.clear();
allOutfileInfo.clear();
partitionToVersion.clear();
Env.getCurrentEnv().getEditLog().logExportUpdateState(this, ExportJobState.FINISHED);
LOG.info("finish export job {}", id);
}
public void replayExportJobState(ExportJobState newState) {
switch (newState) {
// We do not persist EXPORTING state in new version of metadata,
// but EXPORTING state may still exist in older versions of metadata.
// So if isReplay == true and newState == EXPORTING, we set newState = CANCELLED.
case EXPORTING:
// We do not need IN_QUEUE state in new version of export
// but IN_QUEUE state may still exist in older versions of metadata.
// So if isReplay == true and newState == IN_QUEUE, we set newState = CANCELLED.
case IN_QUEUE:
newState = ExportJobState.CANCELLED;
break;
case PENDING:
case CANCELLED:
progress = 0;
break;
case FINISHED:
progress = 100;
break;
default:
Preconditions.checkState(false, "wrong job state: " + newState.name());
break;
}
setExportJobState(newState);
}
/**
* If there are export which state is PENDING or EXPORTING or IN_QUEUE
* in checkpoint, we translate their state to CANCELLED.
*
* This function is only used in replay catalog phase.
*/
public void cancelReplayedExportJob() {
if (state == ExportJobState.PENDING || state == ExportJobState.EXPORTING || state == ExportJobState.IN_QUEUE) {
final String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
setExportJobState(ExportJobState.CANCELLED);
}
}
public synchronized boolean isFinalState() {
return this.state == ExportJobState.CANCELLED || this.state == ExportJobState.FINISHED;
}
public boolean isExpired(long curTime) {
return (curTime - createTimeMs) / 1000 > Config.history_job_keep_max_second
&& (state == ExportJobState.CANCELLED || state == ExportJobState.FINISHED);
}
@Override
public String toString() {
return "ExportJob [jobId=" + id
+ ", label=" + label
+ ", dbId=" + dbId
+ ", tableId=" + tableId
+ ", state=" + state
+ ", path=" + exportPath
+ ", partitions=(" + StringUtils.join(partitionNames, ",") + ")"
+ ", progress=" + progress
+ ", createTimeMs=" + TimeUtils.longToTimeString(createTimeMs)
+ ", exportStartTimeMs=" + TimeUtils.longToTimeString(startTimeMs)
+ ", exportFinishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs)
+ ", failMsg=" + failMsg
+ "]";
}
public static ExportJob read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) {
ExportJob job = new ExportJob();
job.readFields(in);
return job;
}
String json = Text.readString(in);
ExportJob job = GsonUtils.GSON.fromJson(json, ExportJob.class);
job.isReplayed = true;
return job;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
@Deprecated
private void readFields(DataInput in) throws IOException {
isReplayed = true;
id = in.readLong();
dbId = in.readLong();
tableId = in.readLong();
exportPath = Text.readString(in);
columnSeparator = Text.readString(in);
lineDelimiter = Text.readString(in);
// properties
Map<String, String> properties = Maps.newHashMap();
int count = in.readInt();
for (int i = 0; i < count; i++) {
String propertyKey = Text.readString(in);
String propertyValue = Text.readString(in);
properties.put(propertyKey, propertyValue);
}
// Because before 0.15, export does not contain label information.
// So for compatibility, a label will be added for historical jobs.
// This label must be guaranteed to be a certain value to prevent
// the label from being different each time.
properties.putIfAbsent(ExportStmt.LABEL, "export_" + id);
this.label = properties.get(ExportStmt.LABEL);
this.columns = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
if (!Strings.isNullOrEmpty(this.columns)) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
this.exportColumns = split.splitToList(this.columns.toLowerCase());
}
boolean hasPartition = in.readBoolean();
if (hasPartition) {
partitionNames = Lists.newArrayList();
int partitionSize = in.readInt();
for (int i = 0; i < partitionSize; ++i) {
String partitionName = Text.readString(in);
partitionNames.add(partitionName);
}
}
state = ExportJobState.valueOf(Text.readString(in));
createTimeMs = in.readLong();
startTimeMs = in.readLong();
finishTimeMs = in.readLong();
progress = in.readInt();
failMsg.readFields(in);
if (in.readBoolean()) {
brokerDesc = BrokerDesc.read(in);
}
tableName = new TableName();
tableName.readFields(in);
origStmt = OriginStatement.read(in);
Map<String, String> tmpSessionVariables = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
tmpSessionVariables.put(key, value);
}
if (origStmt.originStmt.isEmpty()) {
return;
}
// parse the origin stmt to get where expr
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt),
Long.valueOf(tmpSessionVariables.get(SessionVariable.SQL_MODE))));
ExportStmt stmt = null;
try {
stmt = (ExportStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
this.whereExpr = stmt.getWhereExpr();
} catch (Exception e) {
throw new IOException("error happens when parsing export stmt: " + origStmt, e);
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof ExportJob)) {
return false;
}
ExportJob job = (ExportJob) obj;
if (this.id == job.id) {
return true;
}
return false;
}
}