SparkLoadJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TBrokerScanRangeParams;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TabletQuorumFailedException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
/**
* There are 4 steps in SparkLoadJob:
* Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method and submit spark etl job.
* Step2: LoadEtlChecker will check spark etl job status periodically
* and send push tasks to be when spark etl job is finished.
* Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished.
* Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
*/
@Deprecated
public class SparkLoadJob extends BulkLoadJob {
private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
// --- members below need persist ---
// create from resourceDesc when job created
@SerializedName(value = "sr")
private SparkResource sparkResource;
// members below updated when job state changed to etl
@SerializedName(value = "est")
private long etlStartTimestamp = -1;
// for spark yarn
@SerializedName(value = "appid")
private String appId = "";
// spark job outputPath
@SerializedName(value = "etlop")
private String etlOutputPath = "";
// members below updated when job state changed to loading
// { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) }
@SerializedName(value = "tm2fi")
private Map<String, Pair<String, Long>> tabletMetaToFileInfo = Maps.newHashMap();
// --- members below not persist ---
private ResourceDesc resourceDesc;
// for spark standalone
@SerializedName(value = "slah")
private SparkLoadAppHandle sparkLoadAppHandle = new SparkLoadAppHandle();
// for straggler wait long time to commit transaction
private long quorumFinishTimestamp = -1;
// below for push task
private Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap();
private Map<Long, PushBrokerReaderParams> indexToPushBrokerReaderParams = Maps.newHashMap();
private Map<Long, Integer> indexToSchemaHash = Maps.newHashMap();
private Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = Maps.newHashMap();
private Set<Long> finishedReplicas = Sets.newHashSet();
private Set<Long> quorumTablets = Sets.newHashSet();
private Set<Long> fullTablets = Sets.newHashSet();
// only for log replay
public SparkLoadJob() {
super(EtlJobType.SPARK);
}
public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginStatement originStmt,
UserIdentity userInfo) throws MetaNotFoundException {
super(EtlJobType.SPARK, dbId, label, originStmt, userInfo);
this.resourceDesc = resourceDesc;
}
@Override
public void setJobProperties(Map<String, String> properties) throws DdlException {
super.setJobProperties(properties);
// set spark resource and broker desc
setResourceInfo();
}
/**
* merge system conf with load stmt
*
* @throws DdlException
*/
private void setResourceInfo() throws DdlException {
if (resourceDesc == null) {
// resourceDesc is null means this is a replay thread.
// And resourceDesc is not persisted, so it should be null.
// sparkResource and brokerDesc are both persisted, so no need to handle them
// in replay process.
return;
}
// set sparkResource and brokerDesc
String resourceName = resourceDesc.getName();
Resource oriResource = Env.getCurrentEnv().getResourceMgr().getResource(resourceName);
if (oriResource == null) {
throw new DdlException("Resource does not exist. name: " + resourceName);
}
sparkResource = ((SparkResource) oriResource).getCopiedResource();
sparkResource.update(resourceDesc);
Map<String, String> brokerProperties = sparkResource.getBrokerPropertiesWithoutPrefix();
brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties);
}
@Override
public void beginTxn()
throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException,
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.FRONTEND, id, getTimeout());
}
@Override
protected void unprotectedExecuteJob() throws LoadException {
try {
beginTxn();
} catch (UserException e) {
LOG.warn("failed to begin transaction for spark load job {}", id, e);
throw new LoadException(e.getMessage());
}
// create pending task
LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), sparkResource,
brokerDesc, getPriority());
task.init();
idToTasks.put(task.getSignature(), task);
Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
}
@Override
public void onTaskFinished(TaskAttachment attachment) {
if (attachment instanceof SparkPendingTaskAttachment) {
onPendingTaskFinished((SparkPendingTaskAttachment) attachment);
}
}
private void onPendingTaskFinished(SparkPendingTaskAttachment attachment) {
writeLock();
try {
// check if job has been cancelled
if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("state", state)
.add("error_msg", "this task will be ignored when job is: " + state).build());
return;
}
if (finishedTaskIds.contains(attachment.getTaskId())) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("task_id", attachment.getTaskId()).add("error_msg",
"this is a duplicated callback of pending task "
+ "when broker already has loading task")
.build());
return;
}
// add task id into finishedTaskIds
finishedTaskIds.add(attachment.getTaskId());
sparkLoadAppHandle = attachment.getHandle();
appId = attachment.getAppId();
etlOutputPath = attachment.getOutputPath();
executeEtl();
// log etl state
unprotectedLogUpdateStateInfo();
} finally {
writeUnlock();
}
}
/**
* update etl start time and state in spark load job
*/
private void executeEtl() {
etlStartTimestamp = System.currentTimeMillis();
state = JobState.ETL;
LOG.info("update to {} state success. job id: {}", state, id);
}
private boolean checkState(JobState expectState) {
readLock();
try {
if (state == expectState) {
return true;
}
return false;
} finally {
readUnlock();
}
}
/**
* Check the status of etl job regularly
* 1. RUNNING, update etl job progress
* 2. CANCELLED, cancel load job
* 3. FINISHED, get the etl output file paths, update job state to LOADING and log job update info
* <p>
* Send push tasks if job state changed to LOADING
*/
public void updateEtlStatus() throws Exception {
if (!checkState(JobState.ETL)) {
return;
}
// get etl status
SparkEtlJobHandler handler = new SparkEtlJobHandler();
EtlStatus status = handler.getEtlJobStatus(sparkLoadAppHandle, appId, id, etlOutputPath, sparkResource,
brokerDesc);
writeLock();
try {
switch (status.getState()) {
case RUNNING:
unprotectedUpdateEtlStatusInternal(status);
break;
case FINISHED:
unprotectedProcessEtlFinish(status, handler);
break;
case CANCELLED:
throw new LoadException("spark etl job failed. msg: " + status.getFailMsg());
default:
LOG.warn("unknown etl state: {}", status.getState().name());
break;
}
} finally {
writeUnlock();
}
if (checkState(JobState.LOADING)) {
// create and send push tasks
submitPushTasks();
}
}
private void unprotectedUpdateEtlStatusInternal(EtlStatus etlStatus) {
loadingStatus = etlStatus;
progress = etlStatus.getProgress();
if (!sparkResource.isYarnMaster()) {
loadingStatus.setTrackingUrl(appId);
}
DppResult dppResult = etlStatus.getDppResult();
if (dppResult != null) {
// update load statistic and counters when spark etl job finished
// fe gets these infos from spark dpp, so we use dummy load id and dummy backend id here
loadStatistic.fileNum = (int) dppResult.fileNumber;
loadStatistic.totalFileSizeB = dppResult.fileSize;
TUniqueId dummyId = new TUniqueId(0, 0);
long dummyBackendId = -1L;
loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), Lists.newArrayList(dummyBackendId));
loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows,
dppResult.scannedBytes, true);
Map<String, String> counters = loadingStatus.getCounters();
counters.put(DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows));
counters.put(DPP_ABNORMAL_ALL, String.valueOf(dppResult.abnormalRows));
counters.put(UNSELECTED_ROWS, String.valueOf(dppResult.unselectRows));
}
}
private void unprotectedProcessEtlFinish(EtlStatus etlStatus, SparkEtlJobHandler handler) throws Exception {
unprotectedUpdateEtlStatusInternal(etlStatus);
// checkDataQuality
if (!checkDataQuality()) {
throw new DataQualityException(DataQualityException.QUALITY_FAIL_MSG);
}
// get etl output files and update loading state
unprotectedUpdateToLoadingState(etlStatus, handler.getEtlFilePaths(etlOutputPath, brokerDesc));
// log loading state
unprotectedLogUpdateStateInfo();
// prepare loading infos
unprotectedPrepareLoadingInfos();
}
private void unprotectedUpdateToLoadingState(EtlStatus etlStatus, Map<String, Long> filePathToSize)
throws LoadException {
try {
for (Map.Entry<String, Long> entry : filePathToSize.entrySet()) {
String filePath = entry.getKey();
if (!filePath.endsWith(EtlJobConfig.ETL_OUTPUT_FILE_FORMAT)) {
continue;
}
String tabletMetaStr = EtlJobConfig.getTabletMetaStr(filePath);
tabletMetaToFileInfo.put(tabletMetaStr, Pair.of(filePath, entry.getValue()));
}
loadingStatus = etlStatus;
progress = 0;
unprotectedUpdateState(JobState.LOADING);
LOG.info("update to {} state success. job id: {}", state, id);
} catch (Exception e) {
LOG.warn("update to {} state failed. job id: {}", state, id, e);
throw new LoadException(e.getMessage(), e);
}
}
private void unprotectedPrepareLoadingInfos() {
for (String tabletMetaStr : tabletMetaToFileInfo.keySet()) {
String[] fileNameArr = tabletMetaStr.split("\\.");
// tableId.partitionId.indexId.bucket.schemaHash
Preconditions.checkState(fileNameArr.length == 5);
long tableId = Long.parseLong(fileNameArr[0]);
long partitionId = Long.parseLong(fileNameArr[1]);
long indexId = Long.parseLong(fileNameArr[2]);
int schemaHash = Integer.parseInt(fileNameArr[4]);
if (!tableToLoadPartitions.containsKey(tableId)) {
tableToLoadPartitions.put(tableId, Sets.newHashSet());
}
tableToLoadPartitions.get(tableId).add(partitionId);
indexToSchemaHash.put(indexId, schemaHash);
}
}
private PushBrokerReaderParams getPushBrokerReaderParams(OlapTable table, long indexId) throws UserException {
if (!indexToPushBrokerReaderParams.containsKey(indexId)) {
PushBrokerReaderParams pushBrokerReaderParams = new PushBrokerReaderParams();
List<Column> columns = new ArrayList<>();
table.getSchemaByIndexId(indexId).forEach(col -> {
Column column = new Column(col);
column.setName(col.getName().toLowerCase(Locale.ROOT));
columns.add(column);
});
pushBrokerReaderParams.init(columns, brokerDesc);
indexToPushBrokerReaderParams.put(indexId, pushBrokerReaderParams);
}
return indexToPushBrokerReaderParams.get(indexId);
}
private Set<Long> submitPushTasks() throws UserException {
// check db exist
Database db = null;
try {
db = getDb();
} catch (MetaNotFoundException e) {
String errMsg = new LogBuilder(LogKey.LOAD_JOB, id).add("database_id", dbId).add("label", label)
.add("error_msg", "db has been deleted when job is loading").build();
throw new MetaNotFoundException(errMsg);
}
AgentBatchTask batchTask = new AgentBatchTask();
boolean hasLoadPartitions = false;
Set<Long> totalTablets = Sets.newHashSet();
List<? extends TableIf> tableList = db.getTablesOnIdOrderOrThrowException(
Lists.newArrayList(tableToLoadPartitions.keySet()));
MetaLockUtils.readLockTables(tableList);
try {
writeLock();
try {
// check state is still loading. If state is cancelled or finished, return.
// if state is cancelled or finished and not return,
// this would throw all partitions have no load data exception,
// because tableToLoadPartitions was already cleaned up,
if (state != JobState.LOADING) {
LOG.warn("job state is not loading. job id: {}, state: {}", id, state);
return totalTablets;
}
for (TableIf table : tableList) {
Set<Long> partitionIds = tableToLoadPartitions.get(table.getId());
OlapTable olapTable = (OlapTable) table;
String vaultId = olapTable.getStorageVaultId();
for (long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
LOG.warn("partition does not exist. id: {}", partitionId);
continue;
}
hasLoadPartitions = true;
int quorumReplicaNum =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum() / 2
+ 1;
List<MaterializedIndex> indexes = partition.getMaterializedIndices(IndexExtState.ALL);
for (MaterializedIndex index : indexes) {
long indexId = index.getId();
MaterializedIndexMeta indexMeta = olapTable.getIndexMetaByIndexId(indexId);
int schemaVersion = indexMeta.getSchemaVersion();
int schemaHash = indexMeta.getSchemaHash();
List<TColumn> columnsDesc = new ArrayList<TColumn>();
for (Column column : indexMeta.getSchema(true)) {
TColumn tColumn = column.toThrift();
tColumn.setColumnName(tColumn.getColumnName().toLowerCase(Locale.ROOT));
columnsDesc.add(tColumn);
}
int bucket = 0;
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
totalTablets.add(tabletId);
String tabletMetaStr = String.format("%d.%d.%d.%d.%d", olapTable.getId(), partitionId,
indexId, bucket++, schemaHash);
Set<Long> tabletAllReplicas = Sets.newHashSet();
Set<Long> tabletFinishedReplicas = Sets.newHashSet();
for (Replica replica : tablet.getReplicas()) {
long replicaId = replica.getId();
tabletAllReplicas.add(replicaId);
if (!tabletToSentReplicaPushTask.containsKey(tabletId)
|| !tabletToSentReplicaPushTask.get(tabletId).containsKey(replicaId)) {
long backendId = replica.getBackendId();
long taskSignature = Env.getCurrentGlobalTransactionMgr()
.getNextTransactionId();
PushBrokerReaderParams params = getPushBrokerReaderParams(olapTable, indexId);
// deep copy TBrokerScanRange because filePath and fileSize will be updated
// in different tablet push task
TBrokerScanRange tBrokerScanRange = new TBrokerScanRange(
params.tBrokerScanRange);
// update filePath fileSize
TBrokerRangeDesc tBrokerRangeDesc = tBrokerScanRange.getRanges().get(0);
tBrokerRangeDesc.setPath("");
tBrokerRangeDesc.setFileSize(-1);
if (tabletMetaToFileInfo.containsKey(tabletMetaStr)) {
Pair<String, Long> fileInfo = tabletMetaToFileInfo.get(tabletMetaStr);
tBrokerRangeDesc.setPath(fileInfo.first);
tBrokerRangeDesc.setFileSize(fileInfo.second);
}
// update broker address
Backend backend = Env.getCurrentEnv().getCurrentSystemInfo()
.getBackend(backendId);
FsBroker fsBroker = Env.getCurrentEnv().getBrokerMgr().getBroker(
brokerDesc.getName(), backend.getHost());
tBrokerScanRange.getBrokerAddresses().add(
new TNetworkAddress(fsBroker.host, fsBroker.port));
if (LOG.isDebugEnabled()) {
LOG.debug("push task for replica {}, broker {}:{},"
+ " backendId {}, filePath {}, fileSize {}",
replicaId, fsBroker.host,
fsBroker.port, backendId, tBrokerRangeDesc.path,
tBrokerRangeDesc.file_size);
}
PushTask pushTask = new PushTask(backendId, dbId, olapTable.getId(),
partitionId, indexId, tabletId, replicaId, schemaHash, 0, id,
TPushType.LOAD_V2, TPriority.NORMAL, transactionId, taskSignature,
tBrokerScanRange, params.tDescriptorTable, columnsDesc,
vaultId, schemaVersion);
if (AgentTaskQueue.addTask(pushTask)) {
batchTask.addTask(pushTask);
if (!tabletToSentReplicaPushTask.containsKey(tabletId)) {
tabletToSentReplicaPushTask.put(tabletId, Maps.newHashMap());
}
tabletToSentReplicaPushTask.get(tabletId).put(replicaId, pushTask);
}
}
if (finishedReplicas.contains(replicaId) && replica.getLastFailedVersion() < 0) {
tabletFinishedReplicas.add(replicaId);
}
}
if (tabletAllReplicas.size() == 0) {
LOG.error("invalid situation. tablet is empty. id: {}", tabletId);
}
// check tablet push states
if (tabletFinishedReplicas.size() >= quorumReplicaNum) {
quorumTablets.add(tabletId);
if (tabletFinishedReplicas.size() == tabletAllReplicas.size()) {
fullTablets.add(tabletId);
}
}
}
}
}
}
if (batchTask.getTaskNum() > 0) {
AgentTaskExecutor.submit(batchTask);
}
if (!hasLoadPartitions) {
String errMsg = new LogBuilder(LogKey.LOAD_JOB, id).add("database_id", dbId).add("label", label)
.add("error_msg", "all partitions have no load data").build();
throw new LoadException(errMsg);
}
return totalTablets;
} finally {
writeUnlock();
}
} finally {
MetaLockUtils.readUnlockTables(tableList);
}
}
public void addFinishedReplica(long replicaId, long tabletId, long backendId) {
writeLock();
try {
if (finishedReplicas.add(replicaId)) {
commitInfos.add(new TabletCommitInfo(tabletId, backendId));
// set replica push task null
Map<Long, PushTask> sentReplicaPushTask = tabletToSentReplicaPushTask.get(tabletId);
if (sentReplicaPushTask != null) {
if (sentReplicaPushTask.containsKey(replicaId)) {
sentReplicaPushTask.put(replicaId, null);
}
}
}
} finally {
writeUnlock();
}
}
/**
* 1. Sends push tasks to Be
* 2. Commit transaction after all push tasks execute successfully
*/
public void updateLoadingStatus() throws UserException {
if (!checkState(JobState.LOADING)) {
return;
}
// submit push tasks
Set<Long> totalTablets = submitPushTasks();
if (totalTablets.isEmpty()) {
LOG.warn("total tablets set is empty. job id: {}, state: {}", id, state);
return;
}
// update status
boolean canCommitJob = false;
writeLock();
try {
// loading progress
// 100: txn status is visible and load has been finished
progress = fullTablets.size() * 100 / totalTablets.size();
if (progress == 100) {
progress = 99;
}
// quorum finish ts
if (quorumFinishTimestamp < 0 && quorumTablets.containsAll(totalTablets)) {
quorumFinishTimestamp = System.currentTimeMillis();
}
// if all replicas are finished or stay in quorum finished for long time, try to commit it.
long stragglerTimeout = 300 * 1000;
if ((quorumFinishTimestamp > 0 && System.currentTimeMillis() - quorumFinishTimestamp > stragglerTimeout)
|| fullTablets.containsAll(totalTablets)) {
canCommitJob = true;
}
} finally {
writeUnlock();
}
// try commit transaction
if (canCommitJob) {
tryCommitJob();
}
}
private void tryCommitJob() throws UserException {
int retryTimes = 0;
while (true) {
Database db = getDb();
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(
Lists.newArrayList(tableToLoadPartitions.keySet()));
if (Config.isCloudMode()) {
MetaLockUtils.commitLockTables(tableList);
} else {
MetaLockUtils.writeLockTablesOrMetaException(tableList);
}
try {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId)
.add("msg", "Load job try to commit txn").build());
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
return;
} catch (TabletQuorumFailedException e) {
// retry in next loop
return;
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("txn_id", transactionId)
.add("database_id", dbId)
.add("retry_times", retryTimes)
.add("error_msg", "Failed to commit txn with error:" + e.getMessage())
.build(), e);
if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
retryTimes++;
if (retryTimes >= Config.mow_calculate_delete_bitmap_retry_times) {
LOG.warn("cancelJob {} because up to max retry time, exception {}", id, e);
throw e;
}
} else {
throw e;
}
} finally {
if (Config.isCloudMode()) {
MetaLockUtils.commitUnlockTables(tableList);
} else {
MetaLockUtils.writeUnlockTables(tableList);
}
}
}
}
/**
* load job already cancelled or finished, clear job below:
* 1. kill etl job and delete etl files
* 2. clear push tasks and infos that not persist
*/
private void clearJob() {
Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED);
if (LOG.isDebugEnabled()) {
LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state);
}
SparkEtlJobHandler handler = new SparkEtlJobHandler();
if (state == JobState.CANCELLED) {
if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkLoadAppHandle != null) {
try {
handler.killEtlJob(sparkLoadAppHandle, appId, id, sparkResource);
} catch (Exception e) {
LOG.warn("kill etl job failed. id: {}, state: {}", id, state, e);
}
}
}
if (!Strings.isNullOrEmpty(etlOutputPath)) {
try {
// delete label dir, remove the last taskId dir
String outputPath = etlOutputPath.substring(0, etlOutputPath.lastIndexOf("/"));
handler.deleteEtlOutputPath(outputPath, brokerDesc);
} catch (Exception e) {
LOG.warn("delete etl files failed. id: {}, state: {}", id, state, e);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("clear push tasks and infos that not persist. id: {}, state: {}", id, state);
}
writeLock();
try {
// clear push task first
for (Map<Long, PushTask> sentReplicaPushTask : tabletToSentReplicaPushTask.values()) {
for (PushTask pushTask : sentReplicaPushTask.values()) {
if (pushTask == null) {
continue;
}
AgentTaskQueue.removeTask(pushTask.getBackendId(), pushTask.getTaskType(), pushTask.getSignature());
}
}
// clear job infos that not persist
sparkLoadAppHandle = null;
resourceDesc = null;
etlOutputPath = "";
appId = "";
tableToLoadPartitions.clear();
indexToPushBrokerReaderParams.clear();
indexToSchemaHash.clear();
tabletToSentReplicaPushTask.clear();
finishedReplicas.clear();
quorumTablets.clear();
fullTablets.clear();
} finally {
writeUnlock();
}
}
@Override
public void afterVisible(TransactionState txnState, boolean txnOperated) {
super.afterVisible(txnState, txnOperated);
clearJob();
}
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
throws UserException {
super.afterAborted(txnState, txnOperated, txnStatusChangeReason);
clearJob();
}
@Override
public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
clearJob();
}
@Override
public void cancelJob(FailMsg failMsg) throws DdlException {
super.cancelJob(failMsg);
clearJob();
}
@Override
public String getResourceName() {
return sparkResource.getName();
}
@Override
protected long getEtlStartTimestamp() {
return etlStartTimestamp;
}
public SparkLoadAppHandle getHandle() {
return sparkLoadAppHandle;
}
public void clearSparkLauncherLog() {
if (sparkLoadAppHandle != null) {
String logPath = sparkLoadAppHandle.getLogPath();
if (!Strings.isNullOrEmpty(logPath)) {
File file = new File(logPath);
if (file.exists()) {
file.delete();
}
}
}
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
sparkResource = (SparkResource) Resource.read(in);
sparkLoadAppHandle = SparkLoadAppHandle.read(in);
etlStartTimestamp = in.readLong();
appId = Text.readString(in);
etlOutputPath = Text.readString(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tabletMetaStr = Text.readString(in);
Pair<String, Long> fileInfo = Pair.of(Text.readString(in), in.readLong());
tabletMetaToFileInfo.put(tabletMetaStr, fileInfo);
}
}
/**
* log load job update info when job state changed to etl or loading
*/
private void unprotectedLogUpdateStateInfo() {
SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo(
id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, appId, etlOutputPath,
loadStartTimestamp, tabletMetaToFileInfo);
Env.getCurrentEnv().getEditLog().logUpdateLoadJob(info);
}
@Override
public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {
super.replayUpdateStateInfo(info);
SparkLoadJobStateUpdateInfo sparkJobStateInfo = (SparkLoadJobStateUpdateInfo) info;
sparkLoadAppHandle = sparkJobStateInfo.getSparkLoadAppHandle();
etlStartTimestamp = sparkJobStateInfo.getEtlStartTimestamp();
appId = sparkJobStateInfo.getAppId();
etlOutputPath = sparkJobStateInfo.getEtlOutputPath();
tabletMetaToFileInfo = sparkJobStateInfo.getTabletMetaToFileInfo();
switch (state) {
case ETL:
// nothing to do
break;
case LOADING:
unprotectedPrepareLoadingInfos();
break;
default:
LOG.warn("replay update load job state info failed. error: wrong state. job id: {}, state: {}", id,
state);
break;
}
}
/**
* Used for spark load job journal log when job state changed to ETL or LOADING
*/
public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo {
@SerializedName(value = "sparkLoadAppHandle")
private SparkLoadAppHandle sparkLoadAppHandle;
@SerializedName(value = "etlStartTimestamp")
private long etlStartTimestamp;
@SerializedName(value = "appId")
private String appId;
@SerializedName(value = "etlOutputPath")
private String etlOutputPath;
@SerializedName(value = "tabletMetaToFileInfo")
private Map<String, Pair<String, Long>> tabletMetaToFileInfo;
public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId,
SparkLoadAppHandle sparkLoadAppHandle, long etlStartTimestamp, String appId, String etlOutputPath,
long loadStartTimestamp, Map<String, Pair<String, Long>> tabletMetaToFileInfo) {
super(jobId, state, transactionId, loadStartTimestamp);
this.sparkLoadAppHandle = sparkLoadAppHandle;
this.etlStartTimestamp = etlStartTimestamp;
this.appId = appId;
this.etlOutputPath = etlOutputPath;
this.tabletMetaToFileInfo = tabletMetaToFileInfo;
}
public SparkLoadAppHandle getSparkLoadAppHandle() {
return sparkLoadAppHandle;
}
public long getEtlStartTimestamp() {
return etlStartTimestamp;
}
public String getAppId() {
return appId;
}
public String getEtlOutputPath() {
return etlOutputPath;
}
public Map<String, Pair<String, Long>> getTabletMetaToFileInfo() {
return tabletMetaToFileInfo;
}
}
/**
* Params for be push broker reader
* 1. TBrokerScanRange: file path and size, broker address, tranform expr
* 2. TDescriptorTable: src and dest SlotDescriptors, src and dest tupleDescriptors
* <p>
* These params are sent to Be through push task
*/
private static class PushBrokerReaderParams {
TBrokerScanRange tBrokerScanRange;
TDescriptorTable tDescriptorTable;
public PushBrokerReaderParams() {
this.tBrokerScanRange = new TBrokerScanRange();
this.tDescriptorTable = null;
}
public void init(List<Column> columns, BrokerDesc brokerDesc) throws UserException {
// Generate tuple descriptor
DescriptorTable descTable = new DescriptorTable();
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
// use index schema to fill the descriptor table
for (Column column : columns) {
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
destSlotDesc.setIsMaterialized(true);
destSlotDesc.setColumn(column);
destSlotDesc.setIsNullable(column.isAllowNull());
}
initTBrokerScanRange(descTable, destTupleDesc, columns, brokerDesc);
initTDescriptorTable(descTable);
}
private void initTBrokerScanRange(DescriptorTable descTable, TupleDescriptor destTupleDesc,
List<Column> columns, BrokerDesc brokerDesc) throws AnalysisException {
// scan range params
TBrokerScanRangeParams params = new TBrokerScanRangeParams();
params.setStrictMode(false);
params.setProperties(brokerDesc.getBackendConfigProperties());
TupleDescriptor srcTupleDesc = descTable.createTupleDescriptor();
Map<String, SlotDescriptor> srcSlotDescByName = Maps.newHashMap();
for (Column column : columns) {
SlotDescriptor srcSlotDesc = descTable.addSlotDescriptor(srcTupleDesc);
srcSlotDesc.setIsMaterialized(true);
srcSlotDesc.setIsNullable(true);
if (column.getDataType() == PrimitiveType.BITMAP) {
// cast to bitmap when the target column type is bitmap
srcSlotDesc.setType(ScalarType.createType(PrimitiveType.BITMAP));
srcSlotDesc.setColumn(new Column(column.getName(), PrimitiveType.BITMAP));
} else {
srcSlotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
srcSlotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
}
params.addToSrcSlotIds(srcSlotDesc.getId().asInt());
srcSlotDescByName.put(column.getName(), srcSlotDesc);
}
Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
for (SlotDescriptor destSlotDesc : destTupleDesc.getSlots()) {
if (!destSlotDesc.isMaterialized()) {
continue;
}
SlotDescriptor srcSlotDesc = srcSlotDescByName.get(destSlotDesc.getColumn().getName());
destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
Expr expr = new SlotRef(srcSlotDesc);
expr = castToSlot(destSlotDesc, expr);
params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
}
params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
params.setSrcTupleId(srcTupleDesc.getId().asInt());
params.setDestTupleId(destTupleDesc.getId().asInt());
tBrokerScanRange.setParams(params);
// broker address updated for each replica
tBrokerScanRange.setBrokerAddresses(Lists.newArrayList());
// broker range desc
TBrokerRangeDesc tBrokerRangeDesc = new TBrokerRangeDesc();
tBrokerRangeDesc.setFileType(TFileType.FILE_BROKER);
tBrokerRangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
tBrokerRangeDesc.setSplittable(false);
tBrokerRangeDesc.setStartOffset(0);
tBrokerRangeDesc.setSize(-1);
// path and file size updated for each replica
tBrokerScanRange.setRanges(Lists.newArrayList(tBrokerRangeDesc));
}
private Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws AnalysisException {
PrimitiveType dstType = slotDesc.getType().getPrimitiveType();
PrimitiveType srcType = expr.getType().getPrimitiveType();
if (dstType == PrimitiveType.BOOLEAN && srcType == PrimitiveType.VARCHAR) {
// there is no cast VARCHAR to BOOLEAN function
// so we cast VARCHAR to TINYINT first, then cast TINYINT to BOOLEAN
return new CastExpr(Type.BOOLEAN, new CastExpr(Type.TINYINT, expr));
}
if (dstType != srcType) {
return expr.castTo(slotDesc.getType());
}
return expr;
}
private void initTDescriptorTable(DescriptorTable descTable) {
descTable.computeStatAndMemLayout();
tDescriptorTable = descTable.toThrift();
}
}
}