ExportTaskExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load;

import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Status;
import org.apache.doris.load.ExportFailMsg.CancelType;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
public class ExportTaskExecutor implements TransientTaskExecutor {
    private static final Logger LOG = LogManager.getLogger(ExportTaskExecutor.class);

    Optional<StatementBase> selectStmt;

    ExportJob exportJob;

    Long taskId;

    private StmtExecutor stmtExecutor;

    private AtomicBoolean isCanceled;

    private AtomicBoolean isFinished;

    ExportTaskExecutor(Optional<StatementBase> selectStmt, ExportJob exportJob) {
        this.taskId = UUID.randomUUID().getMostSignificantBits();
        this.selectStmt = selectStmt;
        this.exportJob = exportJob;
        this.isCanceled = new AtomicBoolean(false);
        this.isFinished = new AtomicBoolean(false);
    }

    @Override
    public Long getId() {
        return taskId;
    }

    @Override
    public void execute() throws JobException {
        LOG.debug("[Export Task] taskId: {} starting execution", taskId);
        if (isCanceled.get()) {
            LOG.debug("[Export Task] taskId: {} was already canceled before execution", taskId);
            throw new JobException("Export executor has been canceled, task id: {}", taskId);
        }
        LOG.debug("[Export Task] taskId: {} updating state to EXPORTING", taskId);
        exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, null, null);
        List<OutfileInfo> outfileInfoList = Lists.newArrayList();
        if (selectStmt.isPresent()) {
            if (isCanceled.get()) {
                LOG.debug("[Export Task] taskId: {} canceled during execution", taskId);
                throw new JobException("Export executor has been canceled, task id: {}", taskId);
            }
            // check the version of tablets, skip if the consistency is in partition level.
            if (exportJob.getExportTable().isManagedTable() && !exportJob.isPartitionConsistency()) {
                LOG.debug("[Export Task] taskId: {} checking tablet versions", taskId);
                try {
                    Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
                            exportJob.getTableName().getDb());
                    OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
                    LOG.debug("[Export Lock] taskId: {}, table: {} about to acquire readLock",
                            taskId, table.getName());
                    table.readLock();
                    LOG.debug("[Export Lock] taskId: {}, table: {} acquired readLock", taskId, table.getName());
                    try {
                        List<Long> tabletIds;
                        LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) selectStmt.get();
                        Optional<UnboundRelation> unboundRelation = findUnboundRelation(
                                logicalPlanAdapter.getLogicalPlan());
                        tabletIds = unboundRelation.get().getTabletIds();

                        for (Long tabletId : tabletIds) {
                            TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
                                    tabletId);
                            Partition partition = table.getPartition(tabletMeta.getPartitionId());
                            long nowVersion = partition.getVisibleVersion();
                            long oldVersion = exportJob.getPartitionToVersion().get(partition.getName());
                            if (nowVersion != oldVersion) {
                                LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock"
                                        + "due to version mismatch", taskId, table.getName());
                                exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
                                        CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed");
                                throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}"
                                        + ", now version = {}", exportJob.getId(), tabletId, oldVersion, nowVersion);
                            }
                        }
                    } catch (Exception e) {
                        LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock"
                                + "due to exception: {}", taskId, table.getName(), e.getMessage());
                        exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
                                ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
                        throw new JobException(e);
                    } finally {
                        LOG.debug("[Export Lock] taskId: {}, table: {} releasing readLock in finally block",
                                taskId, table.getName());
                        table.readUnlock();
                        LOG.debug("[Export Lock] taskId: {}, table: {} released readLock successfully",
                                taskId, table.getName());
                    }
                } catch (AnalysisException e) {
                    exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
                            ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
                    throw new JobException(e);
                }
            }

            try (AutoCloseConnectContext r = buildConnectContext()) {
                LOG.debug("[Export Task] taskId: {} executing", taskId);
                stmtExecutor = new StmtExecutor(r.connectContext, selectStmt.get());
                stmtExecutor.execute();
                if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) {
                    LOG.debug("[Export Task] taskId: {} failed with MySQL error: {}", taskId,
                            r.connectContext.getState().getErrorMessage());
                    exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
                            ExportFailMsg.CancelType.RUN_FAIL, r.connectContext.getState().getErrorMessage());
                    return;
                }
                LOG.debug("[Export Task] taskId: {} executed successfully", taskId);
                outfileInfoList = getOutFileInfo(r.connectContext.getResultAttachedInfo());
            } catch (Exception e) {
                LOG.debug("[Export Task] taskId: {} failed with exception: {}",
                        taskId, e.getMessage(), e);
                exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
                        ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
                throw new JobException(e);
            }
        }
        if (isCanceled.get()) {
            LOG.debug("[Export Task] taskId: {} canceled after processing all statements", taskId);
            throw new JobException("Export executor has been canceled, task id: {}", taskId);
        }
        LOG.debug("[Export Task] taskId: {} completed successfully, updating state to FINISHED", taskId);
        exportJob.updateExportJobState(ExportJobState.FINISHED, taskId, outfileInfoList, null, null);
        isFinished.getAndSet(true);
        LOG.debug("[Export Task] taskId: {} execution completed", taskId);
    }

    @Override
    public void cancel() throws JobException {
        if (isFinished.get()) {
            throw new JobException("Export executor has finished, task id: {}", taskId);
        }
        isCanceled.getAndSet(true);
        if (stmtExecutor != null) {
            stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "export task cancelled"));
        }
    }

    private AutoCloseConnectContext buildConnectContext() {
        ConnectContext connectContext = new ConnectContext();
        exportJob.getSessionVariables().setQueryTimeoutS(exportJob.getTimeoutSecond());
        connectContext.setSessionVariable(VariableMgr.cloneSessionVariable(exportJob.getSessionVariables()));
        // The rollback to the old optimizer is prohibited
        // Since originStmt is empty, reverting to the old optimizer when the new optimizer is enabled is meaningless.
        connectContext.setEnv(Env.getCurrentEnv());
        connectContext.setDatabase(exportJob.getTableName().getDb());
        connectContext.setQualifiedUser(exportJob.getQualifiedUser());
        connectContext.setCurrentUserIdentity(exportJob.getUserIdentity());
        UUID uuid = UUID.randomUUID();
        TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
        connectContext.setQueryId(queryId);
        connectContext.setStartTime();
        return new AutoCloseConnectContext(connectContext);
    }

    private List<OutfileInfo> getOutFileInfo(List<Map<String, String>> resultAttachedInfo) {
        List<OutfileInfo> outfileInfo = Lists.newArrayList();
        for (Map<String, String> row : resultAttachedInfo) {
            OutfileInfo outfileInfoOneRow = new OutfileInfo();
            outfileInfoOneRow.setFileNumber(row.get(OutFileClause.FILE_NUMBER));
            outfileInfoOneRow.setTotalRows(row.get(OutFileClause.TOTAL_ROWS));
            outfileInfoOneRow.setFileSize(row.get(OutFileClause.FILE_SIZE));
            outfileInfoOneRow.setUrl(row.get(OutFileClause.URL));
            outfileInfoOneRow.setWriteTime(row.get(OutFileClause.WRITE_TIME_SEC));
            outfileInfoOneRow.setWriteSpeed(row.get(OutFileClause.WRITE_SPEED_KB));
            outfileInfo.add(outfileInfoOneRow);
        }
        return outfileInfo;
    }

    private Optional<UnboundRelation> findUnboundRelation(LogicalPlan plan) {
        if (plan instanceof UnboundRelation) {
            return Optional.of((UnboundRelation) plan);
        }
        for (int i = 0; i < plan.children().size(); ++i) {
            Optional<UnboundRelation> optional = findUnboundRelation((LogicalPlan) plan.children().get(i));
            if (optional.isPresent()) {
                return optional;
            }
        }
        return Optional.empty();
    }

    public void setTaskId(Long taskId) {
        this.taskId = taskId;
    }
}