ExportMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load;

import org.apache.doris.analysis.CancelExportStmt;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.exception.JobException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class ExportMgr {
    private static final Logger LOG = LogManager.getLogger(ExportJob.class);
    private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob
    // dbid -> <label -> job>
    private Map<Long, Map<String, Long>> dbTolabelToExportJobId = Maps.newHashMap();

    // lock for protecting export jobs.
    // need to be added when creating or cancelling export job.
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

    public ExportMgr() {
    }

    public List<ExportJob> getJobs() {
        return Lists.newArrayList(exportIdToJob.values());
    }

    private void readLock() {
        lock.readLock().lock();
    }

    private void readUnlock() {
        lock.readLock().unlock();
    }

    private void writeLock() {
        lock.writeLock().lock();
    }

    private void writeUnlock() {
        lock.writeLock().unlock();
    }

    public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
        writeLock();
        try {
            if (dbTolabelToExportJobId.containsKey(job.getDbId())
                    && dbTolabelToExportJobId.get(job.getDbId()).containsKey(job.getLabel())) {
                Long oldJobId = dbTolabelToExportJobId.get(job.getDbId()).get(job.getLabel());
                ExportJob oldJob = exportIdToJob.get(oldJobId);
                if (oldJob != null && oldJob.getState() != ExportJobState.CANCELLED) {
                    throw new LabelAlreadyUsedException(job.getLabel());
                }
            }
            unprotectAddJob(job);
            Env.getCurrentEnv().getEditLog().logExportCreate(job);
        } finally {
            writeUnlock();
        }
        // delete existing files
        if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) {
            if (job.getBrokerDesc() == null) {
                throw new AnalysisException("Local file system does not support delete existing files");
            }
            String fullPath = job.getExportPath();
            BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1),
                    job.getBrokerDesc());
        }
        // ATTN: Must add task after edit log, otherwise the job may finish before adding job.
        try {
            for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
                Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(job.getCopiedTaskExecutors().get(i));
            }
        } catch (Exception e) {
            // If there happens exceptions in `addMemoryTask`
            // we must update the state of export job to `CANCELLED`
            // because we have added this export in `ExportMgr`
            job.updateExportJobState(ExportJobState.CANCELLED, 0L, null,
                    ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
        }
        LOG.info("add export job. {}", job);
    }

    public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException {
        // List of export jobs waiting to be cancelled
        List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt);
        if (matchExportJobs.isEmpty()) {
            throw new DdlException("Export job(s) do not exist");
        }
        matchExportJobs = matchExportJobs.stream()
                .filter(job -> !job.isFinalState()).collect(Collectors.toList());
        if (matchExportJobs.isEmpty()) {
            throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)");
        }

        // check auth
        checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, stmt.getDbName(), matchExportJobs);
        // Must add lock to protect export job.
        // Because job may be cancelled when generating task executors,
        // the cancel process may clear the task executor list at same time,
        // which will cause ConcurrentModificationException
        writeLock();
        try {
            for (ExportJob exportJob : matchExportJobs) {
                // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
                exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L, null,
                        ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
            }
        } catch (JobException e) {
            throw new AnalysisException(e.getMessage());
        } finally {
            writeUnlock();
        }
    }

    private List<ExportJob> getWaitingCancelJobs(
            String label, String state,
            Expression operator)
            throws AnalysisException {
        Predicate<ExportJob> jobFilter = buildCancelJobFilter(label, state, operator);
        readLock();
        try {
            return getJobs().stream().filter(jobFilter).collect(Collectors.toList());
        } finally {
            readUnlock();
        }
    }

    @VisibleForTesting
    public static Predicate<ExportJob> buildCancelJobFilter(
            String label, String state,
            Expression operator)
            throws AnalysisException {
        PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
                CaseSensibility.LABEL.getCaseSensibility());

        return job -> {
            boolean labelFilter = true;
            boolean stateFilter = true;
            if (StringUtils.isNotEmpty(label)) {
                labelFilter = label.contains("%") ? matcher.match(job.getLabel()) :
                    job.getLabel().equalsIgnoreCase(label);
            }
            if (StringUtils.isNotEmpty(state)) {
                stateFilter = job.getState().name().equalsIgnoreCase(state);
            }

            if (operator != null && operator instanceof Or) {
                return labelFilter || stateFilter;
            }

            return labelFilter && stateFilter;
        };
    }

    /**
     * used for Nereids planner
     */
    public void cancelExportJob(
            String label,
            String state,
            Expression operator, String dbName)
            throws DdlException, AnalysisException {
        // List of export jobs waiting to be cancelled
        List<ExportJob> matchExportJobs = getWaitingCancelJobs(label, state, operator);
        if (matchExportJobs.isEmpty()) {
            throw new DdlException("Export job(s) do not exist");
        }
        matchExportJobs = matchExportJobs.stream()
            .filter(job -> !job.isFinalState()).collect(Collectors.toList());
        if (matchExportJobs.isEmpty()) {
            throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)");
        }

        // check auth
        checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, dbName, matchExportJobs);
        // Must add lock to protect export job.
        // Because job may be cancelled when generating task executors,
        // the cancel process may clear the task executor list at same time,
        // which will cause ConcurrentModificationException
        writeLock();
        try {
            for (ExportJob exportJob : matchExportJobs) {
                // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
                exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L, null,
                        ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
            }
        } catch (JobException e) {
            throw new AnalysisException(e.getMessage());
        } finally {
            writeUnlock();
        }
    }

    public void checkCancelExportJobAuth(String ctlName, String dbName, List<ExportJob> jobs) throws AnalysisException {
        if (jobs.size() > 1) {
            if (!Env.getCurrentEnv().getAccessManager()
                    .checkDbPriv(ConnectContext.get(), ctlName, dbName,
                            PrivPredicate.SELECT)) {
                ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED_ERROR,
                        PrivPredicate.SELECT.getPrivs().toString(), dbName);
            }
        } else {
            TableName tableName = jobs.get(0).getTableName();
            if (tableName == null) {
                return;
            }
            if (!Env.getCurrentEnv().getAccessManager()
                    .checkTblPriv(ConnectContext.get(), ctlName, dbName,
                            tableName.getTbl(),
                            PrivPredicate.SELECT)) {
                ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_ACCESS_DENIED_ERROR,
                        PrivPredicate.SELECT.getPrivs().toString(), tableName.getTbl());
            }
        }
    }

    public void unprotectAddJob(ExportJob job) {
        exportIdToJob.put(job.getId(), job);
        dbTolabelToExportJobId.computeIfAbsent(job.getDbId(),
                k -> Maps.newHashMap()).put(job.getLabel(), job.getId());
    }

    private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException {
        Predicate<ExportJob> jobFilter = buildCancelJobFilter(stmt);
        readLock();
        try {
            return getJobs().stream().filter(jobFilter).collect(Collectors.toList());
        } finally {
            readUnlock();
        }
    }

    @VisibleForTesting
    public static Predicate<ExportJob> buildCancelJobFilter(CancelExportStmt stmt) throws AnalysisException {
        String label = stmt.getLabel();
        String state = stmt.getState();
        PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
                CaseSensibility.LABEL.getCaseSensibility());

        return job -> {
            boolean labelFilter = true;
            boolean stateFilter = true;
            if (StringUtils.isNotEmpty(label)) {
                labelFilter = label.contains("%") ? matcher.match(job.getLabel()) :
                        job.getLabel().equalsIgnoreCase(label);
            }
            if (StringUtils.isNotEmpty(state)) {
                stateFilter = job.getState().name().equalsIgnoreCase(state);
            }

            if (stmt.getOperator() != null && CompoundPredicate.Operator.OR.equals(stmt.getOperator())) {
                return labelFilter || stateFilter;
            }

            return labelFilter && stateFilter;
        };
    }

    public ExportJob getJob(long jobId) {
        ExportJob job;
        readLock();
        try {
            job = exportIdToJob.get(jobId);
        } finally {
            readUnlock();
        }
        return job;
    }

    // used for `show export` statement
    // NOTE: jobid and states may both specified, or only one of them, or neither
    public List<List<String>> getExportJobInfosByIdOrState(
            long dbId, long jobId, String label, boolean isLabelUseLike, Set<ExportJobState> states,
            ArrayList<OrderByPair> orderByPairs, long limit) throws AnalysisException {

        long resultNum = limit == -1L ? Integer.MAX_VALUE : limit;
        LinkedList<List<Comparable>> exportJobInfos = new LinkedList<List<Comparable>>();
        PatternMatcher matcher = null;
        if (isLabelUseLike) {
            matcher = PatternMatcherWrapper.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
        }

        readLock();
        try {
            int counter = 0;
            for (ExportJob job : exportIdToJob.values()) {
                long id = job.getId();
                ExportJobState state = job.getState();
                String jobLabel = job.getLabel();

                if (job.getDbId() != dbId) {
                    continue;
                }

                if (jobId != 0 && id != jobId) {
                    continue;
                }

                if (!Strings.isNullOrEmpty(label)) {
                    if (!isLabelUseLike && !jobLabel.equals(label)) {
                        // use = but does not match
                        continue;
                    } else if (isLabelUseLike && !matcher.match(jobLabel)) {
                        // use like but does not match
                        continue;
                    }
                }

                if (states != null) {
                    if (!states.contains(state)) {
                        continue;
                    }
                }

                // check auth
                if (isJobShowable(job)) {
                    exportJobInfos.add(composeExportJobInfo(job));
                }

                if (++counter >= resultNum && orderByPairs == null) {
                    break;
                }
            }
        } finally {
            readUnlock();
        }

        // order by
        ListComparator<List<Comparable>> comparator = null;
        if (orderByPairs != null) {
            OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()];
            comparator = new ListComparator<List<Comparable>>(orderByPairs.toArray(orderByPairArr));
        } else {
            // sort by id asc
            comparator = new ListComparator<List<Comparable>>(0);
        }
        Collections.sort(exportJobInfos, comparator);

        List<List<String>> results = Lists.newArrayList();
        int counter = 0;
        for (List<Comparable> list : exportJobInfos) {
            results.add(list.stream().map(e -> e.toString()).collect(Collectors.toList()));
            if (++counter >= resultNum) {
                break;
            }
        }

        return results;
    }

    public List<List<String>> getExportJobInfos(long limit) {
        long resultNum = limit == -1L ? Integer.MAX_VALUE : limit;
        LinkedList<List<Comparable>> exportJobInfos = new LinkedList<List<Comparable>>();

        readLock();
        try {
            int counter = 0;
            for (ExportJob job : exportIdToJob.values()) {
                // check auth
                if (isJobShowable(job)) {
                    exportJobInfos.add(composeExportJobInfo(job));
                }

                if (++counter >= resultNum) {
                    break;
                }
            }
        } finally {
            readUnlock();
        }

        // order by
        ListComparator<List<Comparable>> comparator = null;
        // sort by id asc
        comparator = new ListComparator<List<Comparable>>(0);
        Collections.sort(exportJobInfos, comparator);

        List<List<String>> results = Lists.newArrayList();
        for (List<Comparable> list : exportJobInfos) {
            results.add(list.stream().map(e -> e.toString()).collect(Collectors.toList()));
        }

        return results;
    }

    public boolean isJobShowable(ExportJob job) {
        TableName tableName = job.getTableName();
        if (tableName == null || tableName.getTbl().equals("DUMMY")) {
            // forward compatibility, no table name is saved before
            Database db = Env.getCurrentInternalCatalog().getDbNullable(job.getDbId());
            if (db == null) {
                return false;
            }
            if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
                    InternalCatalog.INTERNAL_CATALOG_NAME, db.getFullName(), PrivPredicate.SHOW)) {
                return false;
            }
        } else {
            if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), tableName.getCtl(),
                    tableName.getDb(), tableName.getTbl(),
                    PrivPredicate.SHOW)) {
                return false;
            }
        }

        return true;
    }

    private List<Comparable> composeExportJobInfo(ExportJob job) {
        List<Comparable> jobInfo = new ArrayList<Comparable>();

        jobInfo.add(job.getId());
        jobInfo.add(job.getLabel());
        jobInfo.add(job.getState().name());
        jobInfo.add(job.getProgress() + "%");

        // task infos
        Map<String, Object> infoMap = Maps.newHashMap();
        List<String> partitions = job.getPartitionNames();
        if (partitions == null) {
            partitions = Lists.newArrayList();
            partitions.add("*");
        }
        infoMap.put("db", job.getTableName().getDb());
        infoMap.put("tbl", job.getTableName().getTbl());
        if (job.getWhereExpr() != null) {
            infoMap.put("where expr", job.getWhereExpr().toSql());
        }
        infoMap.put("partitions", partitions);
        infoMap.put("broker", job.getBrokerDesc().getName());
        infoMap.put("column_separator", job.getColumnSeparator());
        infoMap.put("format", job.getFormat());
        infoMap.put("with_bom", job.getWithBom());
        infoMap.put("line_delimiter", job.getLineDelimiter());
        infoMap.put("columns", job.getColumns());
        infoMap.put("tablet_num", job.getTabletsNum());
        infoMap.put("max_file_size", job.getMaxFileSize());
        infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
        infoMap.put("parallelism", job.getParallelism());
        infoMap.put("data_consistency", job.getDataConsistency());
        jobInfo.add(new Gson().toJson(infoMap));
        // path
        jobInfo.add(job.getExportPath());

        jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs()));
        jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs()));
        jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs()));
        jobInfo.add(job.getTimeoutSecond());

        // error msg
        if (job.getState() == ExportJobState.CANCELLED) {
            ExportFailMsg failMsg = job.getFailMsg();
            jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
        } else {
            jobInfo.add(FeConstants.null_string);
        }

        // outfileInfo
        if (job.getState() == ExportJobState.FINISHED) {
            jobInfo.add(job.getOutfileInfo());
        } else {
            jobInfo.add(FeConstants.null_string);
        }

        return jobInfo;
    }

    public void removeOldExportJobs() {
        long currentTimeMs = System.currentTimeMillis();

        writeLock();
        try {
            Iterator<Map.Entry<Long, ExportJob>> iter = exportIdToJob.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<Long, ExportJob> entry = iter.next();
                ExportJob job = entry.getValue();
                if ((currentTimeMs - job.getCreateTimeMs()) / 1000 > Config.history_job_keep_max_second
                        && (job.getState() == ExportJobState.CANCELLED
                        || job.getState() == ExportJobState.FINISHED)) {
                    iter.remove();
                    Map<String, Long> labelJobs = dbTolabelToExportJobId.get(job.getDbId());
                    if (labelJobs != null) {
                        labelJobs.remove(job.getLabel());
                        if (labelJobs.isEmpty()) {
                            dbTolabelToExportJobId.remove(job.getDbId());
                        }
                    }
                }
            }

            if (exportIdToJob.size() > Config.max_export_history_job_num) {
                List<Map.Entry<Long, ExportJob>> jobList = new ArrayList<>(exportIdToJob.entrySet());
                jobList.sort(Comparator.comparingLong(entry -> entry.getValue().getCreateTimeMs()));
                while (exportIdToJob.size() > Config.max_export_history_job_num) {
                    // Remove the oldest job
                    Map.Entry<Long, ExportJob> oldestEntry = jobList.remove(0);
                    exportIdToJob.remove(oldestEntry.getKey());
                    Map<String, Long> labelJobs = dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId());
                    if (labelJobs != null) {
                        labelJobs.remove(oldestEntry.getValue().getLabel());
                        if (labelJobs.isEmpty()) {
                            dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId());
                        }
                    }
                }
            }
        } finally {
            writeUnlock();
        }
    }

    public void replayCreateExportJob(ExportJob job) {
        writeLock();
        try {
            unprotectAddJob(job);
        } finally {
            writeUnlock();
        }
    }

    public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
        writeLock();
        try {
            LOG.info("replay update export job: {}, {}", stateTransfer.getJobId(), stateTransfer.getState());
            ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
            job.replayExportJobState(stateTransfer.getState());
            job.setStartTimeMs(stateTransfer.getStartTimeMs());
            job.setFinishTimeMs(stateTransfer.getFinishTimeMs());
            job.setFailMsg(stateTransfer.getFailMsg());
            job.setOutfileInfo(stateTransfer.getOutFileInfo());
        } finally {
            writeUnlock();
        }
    }

    public long getJobNum(ExportJobState state, long dbId) {
        int size = 0;
        readLock();
        try {
            for (ExportJob job : exportIdToJob.values()) {
                if (job.getState() == state && job.getDbId() == dbId) {
                    ++size;
                }
            }
        } finally {
            readUnlock();
        }
        return size;
    }

    public long getJobNum(ExportJobState state) {
        int size = 0;
        readLock();
        try {
            for (ExportJob job : exportIdToJob.values()) {
                if (!Env.getCurrentEnv().getAccessManager()
                        .checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME,
                                Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(),
                                PrivPredicate.LOAD)) {
                    continue;
                }

                if (job.getState() == state) {
                    ++size;
                }
            }
        } finally {
            readUnlock();
        }
        return size;
    }
}