LoadManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.loadv2;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
 * The broker and mini load jobs(v2) are included in this class.
 * The lock sequence:
 * Database.lock
 * LoadManager.lock
 * LoadJob.lock
 */
public class LoadManager implements Writable {
    private static final Logger LOG = LogManager.getLogger(LoadManager.class);

    protected Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
    protected Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap();
    protected LoadJobScheduler loadJobScheduler;

    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private MysqlLoadManager mysqlLoadManager;

    public LoadManager(LoadJobScheduler loadJobScheduler) {
        this.loadJobScheduler = loadJobScheduler;
        this.mysqlLoadManager = new MysqlLoadManager();
    }

    public void start() {
        mysqlLoadManager.start();
    }

    /**
     * This method will be invoked by the broker load(v2) now.
     */
    public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserException {
        List<TPipelineWorkloadGroup> twgList = null;
        if (Config.enable_workload_group) {
            try {
                twgList = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get());
            } catch (Throwable t) {
                LOG.info("Get workload group failed when create load job,", t);
                throw t;
            }
        }

        Database database = checkDb(stmt.getLabel().getDbName());
        long dbId = database.getId();
        LoadJob loadJob;
        writeLock();
        try {
            if (stmt.getBrokerDesc() != null && stmt.getBrokerDesc().isMultiLoadBroker()) {
                if (!Env.getCurrentEnv().getLoadInstance()
                        .isUncommittedLabel(dbId, stmt.getLabel().getLabelName())) {
                    throw new DdlException("label: " + stmt.getLabel().getLabelName() + " not found!");
                }
            } else {
                checkLabelUsed(dbId, stmt.getLabel().getLabelName());
                if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) {
                    throw new DdlException("LoadManager only support the broker and spark load.");
                }
                if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
                    throw new DdlException(
                            "There are more than " + Config.desired_max_waiting_jobs
                                    + " unfinished load jobs, please retry later. "
                                    + "You can use `SHOW LOAD` to view submitted jobs");
                }
            }

            loadJob = BulkLoadJob.fromLoadStmt(stmt);

            if (twgList != null) {
                loadJob.settWorkloadGroups(twgList);
            }

            createLoadJob(loadJob);
        } finally {
            writeUnlock();
        }

        Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);

        // The job must be submitted after edit log.
        // It guarantee that load job has not been changed before edit log.
        loadJobScheduler.submitJob(loadJob);
        return loadJob.getId();
    }

    private long unprotectedGetUnfinishedJobNum() {
        return idToLoadJob.values().stream()
                .filter(j -> (j.getState() != JobState.FINISHED && j.getState() != JobState.CANCELLED)).count();
    }

    /**
     * MultiLoadMgr use.
     **/
    public void createLoadJobV1FromMultiStart(String fullDbName, String label) throws DdlException {
        Database database = checkDb(fullDbName);
        writeLock();
        try {
            checkLabelUsed(database.getId(), label);
            Env.getCurrentEnv().getLoadInstance()
                    .registerMiniLabel(fullDbName, label, System.currentTimeMillis());
        } finally {
            writeUnlock();
        }
    }

    public MysqlLoadManager getMysqlLoadManager() {
        return mysqlLoadManager;
    }

    public void replayCreateLoadJob(LoadJob loadJob) {
        createLoadJob(loadJob);
        LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build());
    }

    // add load job and also add to callback factory
    protected void createLoadJob(LoadJob loadJob) {
        if (loadJob.isExpired(System.currentTimeMillis())) {
            // This can happen in replay logic.
            return;
        }
        addLoadJob(loadJob);
        // add callback before txn if load job is uncompleted,
        // because callback will be performed on replay without txn begin
        // register txn state listener
        if (!loadJob.isCompleted()) {
            Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
        }
    }

    private void addLoadJob(LoadJob loadJob) {
        idToLoadJob.put(loadJob.getId(), loadJob);
        long dbId = loadJob.getDbId();
        if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
            dbIdToLabelToLoadJobs.put(loadJob.getDbId(), new ConcurrentHashMap<>());
        }
        Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
        if (!labelToLoadJobs.containsKey(loadJob.getLabel())) {
            labelToLoadJobs.put(loadJob.getLabel(), new ArrayList<>());
        }
        labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
    }

    /**
     * Record finished load job by editLog.
     **/
    public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType,
                                      long createTimestamp, String failMsg, String trackingUrl,
                                      UserIdentity userInfo, long jobId) throws MetaNotFoundException {

        // get db id
        Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);

        LoadJob loadJob;
        switch (jobType) {
            case INSERT:
                loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg,
                        trackingUrl, userInfo);
                break;
            case INSERT_JOB:
                loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg,
                        trackingUrl, userInfo, jobId);
                break;
            default:
                return;
        }
        addLoadJob(loadJob);
        // persistent
        Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
    }

    /**
     * Match need cancel loadJob by stmt.
     **/
    @VisibleForTesting
    public static void addNeedCancelLoadJob(String label, String state, Expression operator,
                                            List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
            throws AnalysisException {
        PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
                CaseSensibility.LABEL.getCaseSensibility());
        matchLoadJobs.addAll(
                loadJobs.stream()
                        .filter(job -> job.getState() != JobState.CANCELLED)
                        .filter(job -> {
                            if (operator != null) {
                                // compound
                                boolean labelFilter =
                                        label.contains("%") ? matcher.match(job.getLabel())
                                                : job.getLabel().equalsIgnoreCase(label);
                                boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
                                return operator instanceof And ? labelFilter && stateFilter :
                                        labelFilter || stateFilter;
                            }
                            if (StringUtils.isNotEmpty(label)) {
                                return label.contains("%") ? matcher.match(job.getLabel())
                                        : job.getLabel().equalsIgnoreCase(label);
                            }
                            if (StringUtils.isNotEmpty(state)) {
                                return job.getState().name().equalsIgnoreCase(state);
                            }
                            return false;
                        }).collect(Collectors.toList())
        );
    }

    /**
     * Cancel load job by stmt.
     **/
    public void cancelLoadJob(String dbName, String label, String state, Expression operator)
            throws DdlException, AnalysisException {
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
        // List of load jobs waiting to be cancelled
        List<LoadJob> unfinishedLoadJob;
        readLock();
        try {
            Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
            if (labelToLoadJobs == null) {
                throw new DdlException("Load job does not exist");
            }
            List<LoadJob> matchLoadJobs = Lists.newArrayList();
            addNeedCancelLoadJob(label, state, operator,
                    labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
                    matchLoadJobs);
            if (matchLoadJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }
            // check state here
            unfinishedLoadJob =
                    matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
            if (unfinishedLoadJob.isEmpty()) {
                throw new DdlException("There is no uncompleted job");
            }
        } finally {
            readUnlock();
        }
        for (LoadJob loadJob : unfinishedLoadJob) {
            try {
                loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
            } catch (DdlException e) {
                throw new DdlException(
                        "Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel()
                                +
                                "] failed msg=" + e.getMessage());
            }
        }
    }

    /**
     * Match need cancel loadJob by stmt.
     **/
    @VisibleForTesting
    public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
            throws AnalysisException {
        String label = stmt.getLabel();
        String state = stmt.getState();
        PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
                CaseSensibility.LABEL.getCaseSensibility());
        matchLoadJobs.addAll(
                loadJobs.stream()
                .filter(job -> job.getState() != JobState.CANCELLED)
                .filter(job -> {
                    if (stmt.getOperator() != null) {
                        // compound
                        boolean labelFilter =
                                label.contains("%") ? matcher.match(job.getLabel())
                                : job.getLabel().equalsIgnoreCase(label);
                        boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
                        return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
                            labelFilter || stateFilter;
                    }
                    if (StringUtils.isNotEmpty(label)) {
                        return label.contains("%") ? matcher.match(job.getLabel())
                            : job.getLabel().equalsIgnoreCase(label);
                    }
                    if (StringUtils.isNotEmpty(state)) {
                        return job.getState().name().equalsIgnoreCase(state);
                    }
                    return false;
                }).collect(Collectors.toList())
        );
    }

    /**
     * Cancel load job by stmt.
     **/
    public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
        // List of load jobs waiting to be cancelled
        List<LoadJob> unfinishedLoadJob;
        readLock();
        try {
            Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
            if (labelToLoadJobs == null) {
                throw new DdlException("Load job does not exist");
            }
            List<LoadJob> matchLoadJobs = Lists.newArrayList();
            addNeedCancelLoadJob(stmt,
                    labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
                    matchLoadJobs);
            if (matchLoadJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }
            // check state here
            unfinishedLoadJob =
                matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
            if (unfinishedLoadJob.isEmpty()) {
                throw new DdlException("There is no uncompleted job");
            }
        } finally {
            readUnlock();
        }
        for (LoadJob loadJob : unfinishedLoadJob) {
            try {
                loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
            } catch (DdlException e) {
                throw new DdlException(
                    "Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel()
                        +
                        "] failed msg=" + e.getMessage());
            }
        }
    }

    /**
     * Replay end load job.
     **/
    public void replayEndLoadJob(LoadJobFinalOperation operation) {
        LoadJob job = idToLoadJob.get(operation.getId());
        if (job == null) {
            // This should not happen.
            // Last time I found that when user submit a job with already used label, an END_LOAD_JOB edit log
            // will be wrote but the job is not added to 'idToLoadJob', so this job here we got will be null.
            // And this bug has been fixed.
            // Just add a log here to observe.
            LOG.warn("job does not exist when replaying end load job edit log: {}", operation);
            return;
        }
        job.unprotectReadEndOperation(operation);
        LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId()).add("operation", operation)
                .add("msg", "replay end load job").build());

        Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(operation.getId());
        // When idToLoadJob size increase 10000 roughly, we run removeOldLoadJob to reduce mem used
        if ((idToLoadJob.size() > 0) && (idToLoadJob.size() % 10000 == 0)) {
            removeOldLoadJob();
        }
    }

    /**
     * Replay update load job.
     **/
    public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) {
        long jobId = info.getJobId();
        LoadJob job = idToLoadJob.get(jobId);
        if (job == null) {
            LOG.warn("replay update load job state failed. error: job not found, id: {}", jobId);
            return;
        }

        job.replayUpdateStateInfo(info);
    }

    /**
     * Get load job num, used by proc.
     **/
    public int getLoadJobNum(JobState jobState, long dbId) {
        readLock();
        try {
            Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
            if (labelToLoadJobs == null) {
                return 0;
            }
            List<LoadJob> loadJobList =
                    labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList());
            return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count();
        } finally {
            readUnlock();
        }
    }

    /**
     * Get load job num, used by proc.
     **/
    public int getLoadJobNum(JobState jobState) {
        readLock();
        try {
            Map<String, List<LoadJob>> labelToLoadJobs = new HashMap<>();
            for (Long dbId : dbIdToLabelToLoadJobs.keySet()) {
                if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
                        InternalCatalog.INTERNAL_CATALOG_NAME,
                        Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
                        PrivPredicate.LOAD)) {
                    continue;
                }

                labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId));
            }

            List<LoadJob> loadJobList =
                    labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList());
            return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count();
        } finally {
            readUnlock();
        }
    }

    /**
     * Get load job num, used by metric.
     **/
    public Map<Pair<EtlJobType, JobState>, Long> getLoadJobNum() {
        return idToLoadJob.values().stream().collect(Collectors.groupingBy(
                loadJob -> Pair.of(loadJob.getJobType(), loadJob.getState()),
                Collectors.counting()));
    }

    /**
     * Remove old load job.
     **/
    public void removeOldLoadJob() {
        long currentTimeMs = System.currentTimeMillis();
        removeLoadJobIf(job -> job.isExpired(currentTimeMs));
    }

    /**
     * Remove completed jobs if total job num exceed Config.label_num_threshold
     */
    public void removeOverLimitLoadJob() {
        if (Config.label_num_threshold < 0 || idToLoadJob.size() <= Config.label_num_threshold) {
            return;
        }
        writeLock();
        try {
            Deque<LoadJob> finishedJobs = idToLoadJob
                    .values()
                    .stream()
                    .filter(LoadJob::isCompleted)
                    .sorted(Comparator.comparingLong(o -> o.finishTimestamp))
                    .collect(Collectors.toCollection(ArrayDeque::new));
            while (!finishedJobs.isEmpty()
                    && idToLoadJob.size() > Config.label_num_threshold) {
                LoadJob loadJob = finishedJobs.pollFirst();
                idToLoadJob.remove(loadJob.getId());
                jobRemovedTrigger(loadJob);
            }
        } finally {
            writeUnlock();
        }
    }

    private void jobRemovedTrigger(LoadJob job) {
        if (job instanceof SparkLoadJob) {
            ((SparkLoadJob) job).clearSparkLauncherLog();
        }
        if (job instanceof BulkLoadJob) {
            ((BulkLoadJob) job).recycleProgress();
        }
        Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(job.getDbId());
        if (map == null) {
            return;
        }
        List<LoadJob> list = map.get(job.getLabel());
        if (list == null) {
            return;
        }
        list.remove(job);
        if (list.isEmpty()) {
            map.remove(job.getLabel());
        }
        if (map.isEmpty()) {
            dbIdToLabelToLoadJobs.remove(job.getDbId());
        }
    }

    private void removeLoadJobIf(Predicate<LoadJob> pred) {
        long removeJobNum = 0;
        StopWatch stopWatch = StopWatch.createStarted();
        writeLock();
        try {
            Iterator<Map.Entry<Long, LoadJob>> iter = idToLoadJob.entrySet().iterator();
            while (iter.hasNext()) {
                LoadJob job = iter.next().getValue();
                if (pred.test(job)) {
                    iter.remove();
                    jobRemovedTrigger(job);
                    removeJobNum++;
                }
            }
        } finally {
            writeUnlock();
            stopWatch.stop();
            LOG.info("end to removeOldLoadJob, removeJobNum:{} cost:{} ms",
                    removeJobNum, stopWatch.getTime());
        }
    }

    /**
     * Only for those jobs which have etl state, like SparkLoadJob.
     **/
    public void processEtlStateJobs() {
        idToLoadJob.values().stream()
                .filter(job -> ((job.jobType == EtlJobType.SPARK || job.jobType == EtlJobType.INGESTION)
                        && job.state == JobState.ETL))
                .forEach(job -> {
                    try {
                        if (job instanceof SparkLoadJob) {
                            ((SparkLoadJob) job).updateEtlStatus();
                        } else if (job instanceof IngestionLoadJob) {
                            ((IngestionLoadJob) job).updateEtlStatus();
                        }
                    } catch (DataQualityException e) {
                        LOG.info("update load job etl status failed. job id: {}", job.getId(), e);
                        job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
                                DataQualityException.QUALITY_FAIL_MSG), true, true);
                    } catch (UserException e) {
                        LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
                        job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
                    } catch (Exception e) {
                        LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
                    }
                });
    }

    /**
     * Only for those jobs which load by PushTask.
     **/
    public void processLoadingStateJobs() {
        idToLoadJob.values().stream()
                .filter(job -> ((job.jobType == EtlJobType.SPARK || job.jobType == EtlJobType.INGESTION)
                        && job.state == JobState.LOADING))
                .forEach(job -> {
                    try {
                        if (job instanceof SparkLoadJob) {
                            ((SparkLoadJob) job).updateLoadingStatus();
                        } else if (job instanceof IngestionLoadJob) {
                            ((IngestionLoadJob) job).updateLoadingStatus();
                        }
                    } catch (UserException e) {
                        LOG.warn("update load job loading status failed. job id: {}", job.getId(), e);
                        job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
                    } catch (Exception e) {
                        LOG.warn("update load job loading status failed. job id: {}", job.getId(), e);
                    }
                });
    }

    public List<Pair<Long, String>> getCreateLoadStmt(long dbId, String label) throws DdlException {
        List<Pair<Long, String>> result = new ArrayList<>();
        readLock();
        try {
            if (dbIdToLabelToLoadJobs.containsKey(dbId)) {
                Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
                if (labelToLoadJobs.containsKey(label)) {
                    List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label);
                    for (LoadJob job : labelLoadJobs) {
                        try {
                            Method getOriginStmt = job.getClass().getMethod("getOriginStmt");
                            if (getOriginStmt != null) {
                                result.add(
                                        Pair.of(job.getId(), ((OriginStatement) getOriginStmt.invoke(job)).originStmt));
                            } else {
                                throw new DdlException("Not support load job type: " + job.getClass().getName());
                            }
                        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
                            throw new DdlException("Not support load job type: " + job.getClass().getName());
                        }
                    }
                } else {
                    throw new DdlException("Label does not exist: " + label);
                }
            } else {
                throw new DdlException("Database does not exist");
            }
            return result;
        } finally {
            readUnlock();
        }
    }

    /**
     * This method will return the jobs info which can meet the condition of input param.
     *
     * @param dbId          used to filter jobs which belong to this db
     * @param labelValue    used to filter jobs which's label is or like labelValue.
     * @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself.
     * @param statesValue   used to filter jobs which's state within the statesValue set.
     * @return The result is the list of jobInfo.
     * JobInfo is a list which includes the comparable object: jobId, label, state etc.
     * The result is unordered.
     */
    public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue, boolean accurateMatch,
                                                      Set<String> statesValue) throws AnalysisException {
        LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
        if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
            return loadJobInfos;
        }

        Set<JobState> states = Sets.newHashSet();
        if (statesValue == null || statesValue.size() == 0) {
            states.addAll(EnumSet.allOf(JobState.class));
        } else {
            for (String stateValue : statesValue) {
                try {
                    states.add(JobState.valueOf(stateValue));
                } catch (IllegalArgumentException e) {
                    // ignore this state
                }
            }
        }

        readLock();
        try {
            Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
            List<LoadJob> loadJobList = Lists.newArrayList();
            if (Strings.isNullOrEmpty(labelValue)) {
                loadJobList.addAll(
                        labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
            } else {
                // check label value
                if (accurateMatch) {
                    if (!labelToLoadJobs.containsKey(labelValue)) {
                        return loadJobInfos;
                    }
                    loadJobList.addAll(labelToLoadJobs.get(labelValue));
                } else {
                    // non-accurate match
                    PatternMatcher matcher =
                            PatternMatcherWrapper.createMysqlPattern(labelValue,
                                    CaseSensibility.LABEL.getCaseSensibility());
                    for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
                        if (matcher.match(entry.getKey())) {
                            loadJobList.addAll(entry.getValue());
                        }
                    }
                }
            }

            // check state
            for (LoadJob loadJob : loadJobList) {
                try {
                    if (!states.contains(loadJob.getState())) {
                        continue;
                    }
                    // check auth
                    try {
                        loadJob.checkAuth("show load");
                    } catch (DdlException e) {
                        continue;
                    }
                    // add load job info
                    loadJobInfos.add(loadJob.getShowInfo());
                } catch (RuntimeException | DdlException e) {
                    // ignore this load job
                    LOG.warn("get load job info failed. job id: {}", loadJob.getId(), e);
                }
            }
            return loadJobInfos;
        } finally {
            readUnlock();
        }
    }

    public List<List<Comparable>> getAllLoadJobInfos() {
        LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();

        readLock();
        try {
            Map<String, List<LoadJob>> labelToLoadJobs = new HashMap<>();
            for (Long dbId : dbIdToLabelToLoadJobs.keySet()) {
                if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
                        InternalCatalog.INTERNAL_CATALOG_NAME,
                        Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
                        PrivPredicate.LOAD)) {
                    continue;
                }

                labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId));
            }
            List<LoadJob> loadJobList = Lists.newArrayList();
            loadJobList.addAll(
                    labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));

            // check state
            for (LoadJob loadJob : loadJobList) {
                try {
                    // add load job info
                    loadJobInfos.add(loadJob.getShowInfo());
                } catch (DdlException e) {
                    continue;
                }
            }
            return loadJobInfos;
        } finally {
            readUnlock();
        }
    }

    /**
     * Get load job info.
     **/
    public void getLoadJobInfo(Load.JobInfo info) throws DdlException {
        String fullDbName = info.dbName;
        info.dbName = fullDbName;
        Database database = checkDb(info.dbName);
        readLock();
        try {
            // find the latest load job by info
            Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(database.getId());
            if (labelToLoadJobs == null) {
                throw new DdlException("No jobs belong to database(" + info.dbName + ")");
            }
            List<LoadJob> loadJobList = labelToLoadJobs.get(info.label);
            if (loadJobList == null || loadJobList.isEmpty()) {
                throw new DdlException("Unknown job(" + info.label + ")");
            }

            LoadJob loadJob = loadJobList.get(loadJobList.size() - 1);
            loadJob.getJobInfo(info);
        } finally {
            readUnlock();
        }
    }

    public LoadJob getLoadJob(long jobId) {
        return idToLoadJob.get(jobId);
    }

    public List<LoadJob> queryLoadJobsByJobIds(List<Long> jobIds) {
        if (CollectionUtils.isEmpty(jobIds)) {
            return new ArrayList<>();
        }
        List<LoadJob> jobs = new ArrayList<>();
        jobIds.forEach(id -> {
            if (null != idToLoadJob.get(id)) {
                jobs.add(idToLoadJob.get(id));
            }
        });
        return jobs;
    }

    public void prepareJobs() {
        analyzeLoadJobs();
        submitJobs();
    }

    private void submitJobs() {
        loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(loadJob -> loadJob.state == JobState.PENDING)
                .collect(Collectors.toList()));
    }

    private void analyzeLoadJobs() {
        for (LoadJob loadJob : idToLoadJob.values()) {
            if (loadJob.getState() == JobState.PENDING) {
                loadJob.analyze();
            }
        }
    }

    protected Database checkDb(String dbName) throws DdlException {
        return Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
    }

    /**
     * step1: if label has been used in old load jobs which belong to load class.
     * step2: if label has been used in v2 load jobs.
     * step2.1: if label has been user in v2 load jobs, the create timestamp will be checked.
     *
     * @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job.
     */
    private void checkLabelUsed(long dbId, String label) throws DdlException {
        // if label has been used in old load jobs
        Env.getCurrentEnv().getLoadInstance().isLabelUsed(dbId, label);
        // if label has been used in v2 of load jobs
        if (dbIdToLabelToLoadJobs.containsKey(dbId)) {
            Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
            if (labelToLoadJobs.containsKey(label)) {
                List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label);
                Optional<LoadJob> loadJobOptional = labelLoadJobs.stream()
                        .filter(entity -> entity.getState() != JobState.CANCELLED).findFirst();
                if (loadJobOptional.isPresent()) {
                    LOG.warn("Failed to add load job when label {} has been used.", label);
                    throw new LabelAlreadyUsedException(label);
                }
            }
        }
    }

    public void cleanLabel(String dbName, String label) throws DdlException {
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
        cleanLabelInternal(db.getId(), label, false);
    }

    public void replayCleanLabel(CleanLabelOperationLog log) {
        cleanLabelInternal(log.getDbId(), log.getLabel(), true);
    }

    /**
     * Clean the label with given database and label
     * It will only remove the load jobs which are already done.
     * 1. Remove from LoadManager
     * 2. Remove from DatabaseTransactionMgr
     *
     * @param dbId
     * @param label
     * @param isReplay
     */
    private void cleanLabelInternal(long dbId, String label, boolean isReplay) {
        // 1. Remove from LoadManager
        int counter = 0;
        writeLock();
        try {
            if (dbIdToLabelToLoadJobs.containsKey(dbId)) {
                Map<String, List<LoadJob>> labelToJob = dbIdToLabelToLoadJobs.get(dbId);
                if (Strings.isNullOrEmpty(label)) {
                    // clean all labels in this db
                    Iterator<Map.Entry<String, List<LoadJob>>> iter = labelToJob.entrySet().iterator();
                    while (iter.hasNext()) {
                        List<LoadJob> jobs = iter.next().getValue();
                        Iterator<LoadJob> innerIter = jobs.iterator();
                        while (innerIter.hasNext()) {
                            LoadJob job = innerIter.next();
                            if (!job.isCompleted()) {
                                continue;
                            }
                            if (job instanceof BulkLoadJob) {
                                ((BulkLoadJob) job).recycleProgress();
                            }
                            innerIter.remove();
                            idToLoadJob.remove(job.getId());
                            ++counter;
                        }
                        if (jobs.isEmpty()) {
                            iter.remove();
                        }
                    }
                } else {
                    List<LoadJob> jobs = labelToJob.get(label);
                    if (jobs != null) {
                        // stream load labelToJob is null
                        Iterator<LoadJob> iter = jobs.iterator();
                        while (iter.hasNext()) {
                            LoadJob job = iter.next();
                            if (!job.isCompleted()) {
                                continue;
                            }
                            if (job instanceof BulkLoadJob) {
                                ((BulkLoadJob) job).recycleProgress();
                            }
                            iter.remove();
                            idToLoadJob.remove(job.getId());
                            ++counter;
                        }
                        if (jobs.isEmpty()) {
                            labelToJob.remove(label);
                        }
                    }
                }
            }
        } finally {
            writeUnlock();
        }
        // 2. Remove from DatabaseTransactionMgr
        try {
            Env.getCurrentGlobalTransactionMgr().cleanLabel(dbId, label, isReplay);
        } catch (Exception e) {
            // just ignore, because we don't want to throw any exception here.
            LOG.warn("Exception:", e);
        }

        LOG.info("finished to clean {} labels on db {} with label '{}' in load mgr. is replay: {}",
                counter, dbId, label, isReplay);
    }

    protected void readLock() {
        lock.readLock().lock();
    }

    protected void readUnlock() {
        lock.readLock().unlock();
    }

    protected void writeLock() {
        lock.writeLock().lock();
    }

    protected void writeUnlock() {
        lock.writeLock().unlock();
    }

    /**
     * Init.
     **/
    public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
                                List<Long> relatedBackendIds) {
        LoadJob job = idToLoadJob.get(jobId);
        if (job != null) {
            job.initLoadProgress(loadId, fragmentIds, relatedBackendIds);
        }
    }

    /**
     * Update.
     **/
    public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
                                  long scannedBytes, boolean isDone) {
        LoadJob job = idToLoadJob.get(jobId);
        if (job != null) {
            job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        long currentTimeMs = System.currentTimeMillis();
        List<LoadJob> loadJobs =
                idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs))
                        .filter(t -> !(t instanceof MiniLoadJob)).collect(Collectors.toList());

        LOG.info("write load job size: {}", loadJobs.size());
        out.writeInt(loadJobs.size());
        for (LoadJob loadJob : loadJobs) {
            LOG.info("write load job: {}", loadJob.getId());
            loadJob.write(out);
        }
    }

    /**
     * Read from file.
     **/
    public void readFields(DataInput in) throws IOException {
        long currentTimeMs = System.currentTimeMillis();
        int size = in.readInt();
        LOG.info("load job num {} ", size);
        for (int i = 0; i < size; i++) {
            LoadJob loadJob = LoadJob.read(in);
            if (loadJob.isExpired(currentTimeMs)) {
                continue;
            }

            if (loadJob.getJobType() == EtlJobType.MINI) {
                LOG.warn("skip mini load job {} in db {} as it is no longer supported", loadJob.getId(),
                        loadJob.getDbId());
                continue;
            }
            idToLoadJob.put(loadJob.getId(), loadJob);
            Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
            if (map == null) {
                map = Maps.newConcurrentMap();
                dbIdToLabelToLoadJobs.put(loadJob.getDbId(), map);
            }

            List<LoadJob> jobs = map.get(loadJob.getLabel());
            if (jobs == null) {
                jobs = Lists.newArrayList();
                map.put(loadJob.getLabel(), jobs);
            }
            jobs.add(loadJob);
            // The callback of load job which is replayed by image need to be registered in callback factory.
            // The commit and visible txn will callback the unfinished load job.
            // Otherwise, the load job always does not be completed while the txn is visible.
            if (!loadJob.isCompleted()) {
                Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
            }
        }
    }

    // ------------------------ for load refactor ------------------------
    public long createLoadJobFromStmt(InsertStmt insertStmt) throws DdlException {
        Database database = checkDb(insertStmt.getLoadLabel().getDbName());
        long dbId = database.getId();
        LoadJob loadJob;
        writeLock();
        BrokerDesc brokerDesc = (BrokerDesc) insertStmt.getResourceDesc();
        try {
            if (brokerDesc != null && brokerDesc.isMultiLoadBroker()) {
                if (!Env.getCurrentEnv().getLoadInstance()
                        .isUncommittedLabel(dbId, insertStmt.getLoadLabel().getLabelName())) {
                    throw new DdlException("label: " + insertStmt.getLoadLabel().getLabelName() + " not found!");
                }
            } else {
                checkLabelUsed(dbId, insertStmt.getLoadLabel().getLabelName());
                if (brokerDesc == null && insertStmt.getResourceDesc() == null) {
                    throw new DdlException("LoadManager only support the broker and spark load.");
                }
                if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
                    throw new DdlException(
                            "There are more than " + Config.desired_max_waiting_jobs
                                    + " unfinished load jobs, please retry later. "
                                    + "You can use `SHOW LOAD` to view submitted jobs");
                }
            }

            loadJob = BulkLoadJob.fromInsertStmt(insertStmt);
            createLoadJob(loadJob);
        } finally {
            writeUnlock();
        }
        Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);

        // The job must be submitted after edit log.
        // It guarantee that load job has not been changed before edit log.
        loadJobScheduler.submitJob(loadJob);
        return loadJob.getId();
    }

    public long createIngestionLoadJob(String dbName, String label, List<String> tableNames,
                                       Map<String, String> properties,
                                       UserIdentity userInfo)
            throws DdlException, LoadException {
        Database db = checkDb(dbName);
        long dbId = db.getId();
        LoadJob loadJob;
        writeLock();
        try {
            checkLabelUsed(dbId, label);
            if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
                throw new DdlException("There are more than " + Config.desired_max_waiting_jobs
                        + " unfinished load jobs, please retry later. You can use `SHOW LOAD` to view submitted jobs");
            }
            loadJob = new IngestionLoadJob(dbId, label, tableNames, userInfo);
            loadJob.setJobProperties(properties);
            createLoadJob(loadJob);
        } finally {
            writeUnlock();
        }
        Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
        return loadJob.getId();
    }

}