BulkLoadJob.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.loadv2;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UnifiedLoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.annotation.LogException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.audit.LoadAuditEvent;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
 * parent class of BrokerLoadJob and SparkLoadJob from load stmt
 */
public abstract class BulkLoadJob extends LoadJob implements GsonPostProcessable {
    private static final Logger LOG = LogManager.getLogger(BulkLoadJob.class);

    // input params
    @SerializedName(value = "bd")
    protected BrokerDesc brokerDesc;
    // this param is used to persist the expr of columns
    // the origin stmt is persisted instead of columns expr
    // the expr of columns will be reanalyzed when the log is replayed
    @SerializedName(value = "os")
    private OriginStatement originStmt;

    // include broker desc and data desc
    protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
    protected List<TabletCommitInfo> commitInfos = Lists.newArrayList();

    // sessionVariable's name -> sessionVariable's value
    // we persist these sessionVariables due to the session is not available when replaying the job.
    @SerializedName(value = "svs")
    protected Map<String, String> sessionVariables = Maps.newHashMap();

    public BulkLoadJob(EtlJobType jobType) {
        super(jobType);
    }

    public BulkLoadJob(EtlJobType jobType, long dbId, String label,
            OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException {
        super(jobType, dbId, label);
        this.originStmt = originStmt;
        this.authorizationInfo = gatherAuthInfo();
        this.userInfo = userInfo;

        if (ConnectContext.get() != null) {
            SessionVariable var = ConnectContext.get().getSessionVariable();
            sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
            sessionVariables.put(SessionVariable.AUTO_PROFILE_THRESHOLD_MS,
                                    Long.toString(var.getAutoProfileThresholdMs()));
            sessionVariables.put(SessionVariable.PROFILE_LEVEL, Long.toString(var.getProfileLevel()));
        } else {
            sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
            sessionVariables.put(SessionVariable.AUTO_PROFILE_THRESHOLD_MS, Long.toString(-1));
            sessionVariables.put(SessionVariable.PROFILE_LEVEL, Long.toString(1));
        }
    }

    public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
        // get db id
        String dbName = stmt.getLabel().getDbName();
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        // create job
        BulkLoadJob bulkLoadJob;
        try {
            switch (stmt.getEtlJobType()) {
                case BROKER:
                    bulkLoadJob = EnvFactory.getInstance().createBrokerLoadJob(db.getId(),
                            stmt.getLabel().getLabelName(), stmt.getBrokerDesc(), stmt.getOrigStmt(),
                            stmt.getUserInfo());
                    break;
                case SPARK:
                    bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(), stmt.getResourceDesc(),
                            stmt.getOrigStmt(), stmt.getUserInfo());
                    break;
                case MINI:
                case DELETE:
                case HADOOP:
                case INSERT:
                    throw new DdlException("LoadManager only support create broker and spark load job from stmt.");
                default:
                    throw new DdlException("Unknown load job type.");
            }
            bulkLoadJob.setComment(stmt.getComment());
            bulkLoadJob.setJobProperties(stmt.getProperties());
            bulkLoadJob.checkAndSetDataSourceInfo((Database) db, stmt.getDataDescriptions());
            // In the construction method, there may not be table information yet
            bulkLoadJob.rebuildAuthorizationInfo();
            return bulkLoadJob;
        } catch (MetaNotFoundException e) {
            throw new DdlException(e.getMessage());
        }
    }

    public void checkAndSetDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
        // check data source info
        db.readLock();
        try {
            LoadTask.MergeType mergeType = null;
            for (DataDescription dataDescription : dataDescriptions) {
                if (mergeType == null) {
                    mergeType = dataDescription.getMergeType();
                }
                if (mergeType != dataDescription.getMergeType()) {
                    throw new DdlException("merge type in all statement must be the same.");
                }
                BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
                fileGroup.parse(db, dataDescription);
                fileGroupAggInfo.addFileGroup(fileGroup);
            }
        } finally {
            db.readUnlock();
        }
    }

    private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
        Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
        return new AuthorizationInfo(database.getFullName(), getTableNames());
    }

    public void rebuildAuthorizationInfo() throws MetaNotFoundException {
        this.authorizationInfo = gatherAuthInfo();
    }

    @Override
    public Set<String> getTableNamesForShow() {
        Optional<Database> db = Env.getCurrentInternalCatalog().getDb(dbId);
        return fileGroupAggInfo.getAllTableIds().stream()
                .map(tableId -> db.flatMap(d -> d.getTable(tableId)).map(TableIf::getName)
                        .orElse(String.valueOf(tableId)))
                .collect(Collectors.toSet());
    }

    @LogException
    @Override
    public Set<String> getTableNames() throws MetaNotFoundException {
        Set<String> result = Sets.newHashSet();
        Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
        // The database will not be locked in here.
        // The getTable is a thread-safe method called without read lock of database
        for (long tableId : fileGroupAggInfo.getAllTableIds()) {
            Table table = database.getTableOrMetaException(tableId);
            result.add(table.getName());
        }
        return result;
    }

    @Override
    public void onTaskFailed(long taskId, FailMsg failMsg) {
        List<LoadTask> retriedTasks = Lists.newArrayList();
        writeLock();
        try {
            // check if job has been completed
            if (isTxnDone()) {
                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                        .add("state", state)
                        .add("error_msg", "this task will be ignored when job is: " + state)
                        .build());
                return;
            }
            LoadTask loadTask = idToTasks.get(taskId);
            if (loadTask == null) {
                return;
            }
            Predicate<LoadTask> isTaskTimeout =
                    (LoadTask task) -> task instanceof LoadLoadingTask
                            && ((LoadLoadingTask) task).getLeftTimeMs() <= 0;
            if (loadTask.getRetryTime() <= 0 || isTaskTimeout.test(loadTask)) {
                unprotectedExecuteCancel(failMsg, true);
                logFinalOperation();
                return;
            } else {
                // retry task
                idToTasks.remove(loadTask.getSignature());
                if (loadTask instanceof LoadLoadingTask) {
                    loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId());
                }
                loadTask.updateRetryInfo();
                idToTasks.put(loadTask.getSignature(), loadTask);
                // load id will be added to loadStatistic when executing this task
                retriedTasks.add(loadTask);
            }
        } finally {
            writeUnlock();
        }

        // submit retried loading task outside the job's lock, cause task submitting may be block for a while
        for (LoadTask loadTask : retriedTasks) {
            try {
                if (loadTask.getTaskType() == LoadTask.TaskType.PENDING) {
                    Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(loadTask);
                } else if (loadTask.getTaskType() == LoadTask.TaskType.LOADING) {
                    Env.getCurrentEnv().getLoadingLoadTaskScheduler().submit(loadTask);
                }
            } catch (RejectedExecutionException e) {
                writeLock();
                try {
                    unprotectedExecuteCancel(failMsg, true);
                    logFinalOperation();
                    return;
                } finally {
                    writeUnlock();
                }
            }
        }
    }

    /**
     * If the db or table could not be found, the Broker load job will be cancelled.
     */
    @Override
    public void analyze() {
        if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) {
            return;
        }
        // Reset dataSourceInfo, it will be re-created in analyze
        fileGroupAggInfo = new BrokerFileGroupAggInfo();
        SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt),
                Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
        try {
            Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
            analyzeStmt(SqlParserUtils.getStmt(parser, originStmt.idx), db);
        } catch (Exception e) {
            LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
                    .add("origin_stmt", originStmt)
                    .add("msg", "The failure happens in analyze, the load job will be cancelled with error:"
                            + e.getMessage())
                    .build(), e);
            cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true);
        }
    }

    protected void analyzeStmt(StatementBase stmtBase, Database db) throws UserException {
        LoadStmt stmt = null;
        if (stmtBase instanceof UnifiedLoadStmt) {
            stmt = (LoadStmt) ((UnifiedLoadStmt) stmtBase).getProxyStmt();
        } else {
            stmt = (LoadStmt) stmtBase;
        }

        for (DataDescription dataDescription : stmt.getDataDescriptions()) {
            dataDescription.analyzeWithoutCheckPriv(db.getFullName());
        }
        checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
    }

    @Override
    protected void replayTxnAttachment(TransactionState txnState) {
        if (txnState.getTxnCommitAttachment() == null) {
            // The txn attachment maybe null when broker load has been cancelled without attachment.
            // The end log of broker load has been record but the callback id of txnState hasn't been removed
            // So the callback of txn is executed when log of txn aborted is replayed.
            return;
        }
        unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
    }

    public OriginStatement getOriginStmt() {
        return this.originStmt;
    }

    @Override
    public void gsonPostProcess() throws IOException {
        super.gsonPostProcess();
        if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_117) {
            userInfo.setIsAnalyzed();
        }
    }

    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        brokerDesc = BrokerDesc.read(in);
        originStmt = OriginStatement.read(in);
        // The origin stmt does not be analyzed in here.
        // The reason is that it will throw MetaNotFoundException when the tableId could not be found by tableName.
        // The origin stmt will be analyzed after the replay is completed.

        if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_117) {
            userInfo = UserIdentity.read(in);
            // must set is as analyzed, because when write the user info to meta image, it will be checked.
            userInfo.setIsAnalyzed();
        }
        int size = in.readInt();
        for (int i = 0; i < size; i++) {
            String key = Text.readString(in);
            String value = Text.readString(in);
            sessionVariables.put(key, value);
        }
    }

    public UserIdentity getUserInfo() {
        return userInfo;
    }

    public void recycleProgress() {
        // Recycle memory occupied by Progress.
        Env.getCurrentProgressManager().removeProgress(String.valueOf(id));
    }

    @Override
    protected void auditFinishedLoadJob() {
        try {
            String dbName = getDb().getFullName();
            String tableListName = StringUtils.join(getTableNames(), ",");
            List<String> filePathList = Lists.newArrayList();
            for (List<BrokerFileGroup> brokerFileGroups : fileGroupAggInfo.getAggKeyToFileGroups().values()) {
                for (BrokerFileGroup brokerFileGroup : brokerFileGroups) {
                    filePathList.add("(" + StringUtils.join(brokerFileGroup.getFilePaths(), ",") + ")");
                }
            }
            String filePathListName = StringUtils.join(filePathList, ",");
            String brokerUserName = getBrokerUserName();
            AuditEvent auditEvent = new LoadAuditEvent.AuditEventBuilder()
                    .setEventType(AuditEvent.EventType.LOAD_SUCCEED)
                    .setJobId(id).setLabel(label).setLoadType(jobType.name()).setDb(dbName).setTableList(tableListName)
                    .setFilePathList(filePathListName).setBrokerUser(brokerUserName).setTimestamp(createTimestamp)
                    .setLoadStartTime(loadStartTimestamp).setLoadFinishTime(finishTimestamp)
                    .setScanRows(loadStatistic.getScannedRows()).setScanBytes(loadStatistic.totalFileSizeB)
                    .setFileNumber(loadStatistic.fileNum)
                    .build();
            Env.getCurrentEnv().getAuditEventProcessor().handleAuditEvent(auditEvent);
        } catch (Exception e) {
            LOG.warn("audit finished load job info failed", e);
        }
    }

    private String getBrokerUserName() {
        Map<String, String> properties = brokerDesc.getProperties();
        if (properties.containsKey("kerberos_principal")) {
            return properties.get("kerberos_principal");
        } else if (properties.containsKey("username")) {
            return properties.get("username");
        } else if (properties.containsKey("bos_accesskey")) {
            return properties.get("bos_accesskey");
        } else if (properties.containsKey("fs.s3a.access.key")) {
            return properties.get("fs.s3a.access.key");
        }
        return null;
    }

    // ---------------- for load stmt ----------------
    public static BulkLoadJob fromInsertStmt(InsertStmt insertStmt) throws DdlException {
        // get db id
        String dbName = insertStmt.getLoadLabel().getDbName();
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        // create job
        BulkLoadJob bulkLoadJob;
        try {
            switch (insertStmt.getLoadType()) {
                case BROKER_LOAD:
                    bulkLoadJob = EnvFactory.getInstance().createBrokerLoadJob(db.getId(),
                            insertStmt.getLoadLabel().getLabelName(), (BrokerDesc) insertStmt.getResourceDesc(),
                            insertStmt.getOrigStmt(), insertStmt.getUserInfo());
                    break;
                case SPARK_LOAD:
                    bulkLoadJob = new SparkLoadJob(db.getId(), insertStmt.getLoadLabel().getLabelName(),
                            insertStmt.getResourceDesc(),
                            insertStmt.getOrigStmt(), insertStmt.getUserInfo());
                    break;
                default:
                    throw new DdlException("Unknown load job type.");
            }
            bulkLoadJob.setComment(insertStmt.getComments());
            bulkLoadJob.setJobProperties(insertStmt.getProperties());
            // TODO(tsy): use generic and change the param in checkAndSetDataSourceInfo
            bulkLoadJob.checkAndSetDataSourceInfo(db, (List<DataDescription>) insertStmt.getDataDescList());
            return bulkLoadJob;
        } catch (MetaNotFoundException e) {
            throw new DdlException(e.getMessage());
        }
    }
}