ExportCommand.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.ExportJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

/**
 * EXPORT statement, export data to dirs by broker.
 *
 * syntax:
 *      EXPORT TABLE table_name [PARTITION (name1[, ...])]
 *          TO 'export_target_path'
 *          [PROPERTIES("key"="value")]
 *          WITH BROKER 'broker_name' [( $broker_attrs)]
 */
public class ExportCommand extends Command implements NeedAuditEncryption, ForwardWithSync {
    public static final String PARALLELISM = "parallelism";
    public static final String LABEL = "label";
    public static final String DATA_CONSISTENCY = "data_consistency";
    public static final String COMPRESS_TYPE = "compress_type";
    private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
    private static final String DEFAULT_LINE_DELIMITER = "\n";
    private static final String DEFAULT_PARALLELISM = "1";
    private static final Integer DEFAULT_TIMEOUT = 7200;

    private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
            .add(LABEL)
            .add(PARALLELISM)
            .add(DATA_CONSISTENCY)
            .add(LoadStmt.KEY_IN_PARAM_COLUMNS)
            .add(OutFileClause.PROP_MAX_FILE_SIZE)
            .add(OutFileClause.PROP_DELETE_EXISTING_FILES)
            .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
            .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
            .add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
            .add("format")
            .add(OutFileClause.PROP_WITH_BOM)
            .add(COMPRESS_TYPE)
            .build();

    private final List<String> nameParts;
    private final Optional<Expression> expr;
    private final String path;
    private final List<String> partitionsNames;
    private final Map<String, String> fileProperties;
    private final Optional<BrokerDesc> brokerDesc;

    /**
     * constructor of ExportCommand
     */
    public ExportCommand(List<String> nameParts, List<String> partitions, Optional<Expression> expr,
            String path, Map<String, String> fileProperties, Optional<BrokerDesc> brokerDesc) {
        super(PlanType.EXPORT_COMMAND);
        this.nameParts = ImmutableList.copyOf(Objects.requireNonNull(nameParts, "nameParts should not be null"));
        this.path = Objects.requireNonNull(path.trim(), "export path should not be null");
        this.partitionsNames = ImmutableList.copyOf(
                Objects.requireNonNull(partitions, "partitions should not be null"));
        this.fileProperties = ImmutableSortedMap.copyOf(
                Objects.requireNonNull(fileProperties, "fileProperties should not be null"),
                String.CASE_INSENSITIVE_ORDER);
        this.expr = expr;
        if (!brokerDesc.isPresent()) {
            this.brokerDesc = Optional.of(new BrokerDesc("local", StorageBackend.StorageType.LOCAL, null));
        } else {
            this.brokerDesc = brokerDesc;
        }
    }

    @Override
    public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
        // get tblName
        List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, this.nameParts);
        TableName tblName = new TableName(qualifiedTableName.get(0), qualifiedTableName.get(1),
                qualifiedTableName.get(2));

        // check auth
        if (!Env.getCurrentEnv().getAccessManager()
                .checkTblPriv(ctx, tblName.getCtl(), tblName.getDb(), tblName.getTbl(),
                        PrivPredicate.SELECT)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "EXPORT",
                    ctx.getQualifiedUser(),
                    ctx.getRemoteIP(),
                    tblName.getDb() + ": " + tblName.getTbl());
        }

        if (!Config.enable_outfile_to_local && path.startsWith(OutFileClause.LOCAL_FILE_PREFIX)) {
            throw new AnalysisException("`enable_outfile_to_local` = false, exporting file to local fs is disabled.");
        }

        // check phases
        checkAllParameters(ctx, tblName, fileProperties);

        ExportJob exportJob = generateExportJob(ctx, fileProperties, tblName);
        // register job
        ctx.getEnv().getExportMgr().addExportJobAndRegisterTask(exportJob);
    }

    private void checkAllParameters(ConnectContext ctx, TableName tblName, Map<String, String> fileProperties)
            throws UserException {
        checkPropertyKey(fileProperties);
        checkPartitions(ctx, tblName);
        checkBrokerDesc(ctx);
        checkFileProperties(ctx, fileProperties, tblName);
    }

    // check property key
    private void checkPropertyKey(Map<String, String> properties) throws AnalysisException {
        for (String key : properties.keySet()) {
            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
                throw new AnalysisException("Invalid property key: [" + key + "]");
            }
        }
    }

    // check partitions specified by user are belonged to the table.
    private void checkPartitions(ConnectContext ctx, TableName tblName) throws AnalysisException, UserException {
        if (this.partitionsNames.isEmpty()) {
            return;
        }

        CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
        // As for external table, we do not support export PARTITION
        if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalog.getType())) {
            throw new AnalysisException("Table[" + tblName.getTbl() + "] is EXTERNAL TABLE type, "
                    + "do not support export PARTITION.");
        }

        DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
        Table table = (Table) db.getTableOrAnalysisException(tblName.getTbl());
        if (table.isTemporary()) {
            throw new AnalysisException("Table[" + tblName.getTbl() + "] is "
                + "temporary table, do not support EXPORT.");
        }

        if (this.partitionsNames.size() > Config.maximum_number_of_export_partitions) {
            throw new AnalysisException("The partitions number of this export job is larger than the maximum number"
                    + " of partitions allowed by an export job");
        }

        table.readLock();
        try {
            Table.TableType tblType = table.getType();
            switch (tblType) {
                case MYSQL:
                case ODBC:
                case JDBC:
                case OLAP:
                    if (table.isTemporary()) {
                        throw new AnalysisException("Do not support exporting temporary partitions");
                    }
                    break;
                case VIEW: // We support export view, so we do not need to check partition here.
                    if (this.partitionsNames.size() > 0) {
                        throw new AnalysisException("Table[" + tblName.getTbl() + "] is " + tblType + " type, "
                                + "do not support export PARTITION.");
                    }
                    return;
                case BROKER:
                case SCHEMA:
                case INLINE_VIEW:
                default:
                    throw new AnalysisException("Table[" + tblName.getTbl() + "] is "
                            + tblType + " type, do not support EXPORT.");
            }
            // check table
            if (!table.isPartitionedTable()) {
                throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
            }
            for (String partitionName : this.partitionsNames) {
                Partition partition = table.getPartition(partitionName);
                if (partition == null) {
                    throw new AnalysisException("Partition [" + partitionName + "] does not exist "
                            + "in Table[" + tblName.getTbl() + "]");
                }
            }
        } finally {
            table.readUnlock();
        }
    }

    private void checkBrokerDesc(ConnectContext ctx) throws UserException {
        // check path is valid
        StorageBackend.checkPath(this.path, this.brokerDesc.get().getStorageType(), null);

        if (brokerDesc.get().getStorageType() == StorageBackend.StorageType.BROKER) {
            BrokerMgr brokerMgr = ctx.getEnv().getBrokerMgr();
            if (!brokerMgr.containsBroker(brokerDesc.get().getName())) {
                throw new AnalysisException("broker " + brokerDesc.get().getName() + " does not exist");
            }
            if (null == brokerMgr.getAnyBroker(brokerDesc.get().getName())) {
                throw new AnalysisException("failed to get alive broker");
            }
        }
    }

    private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName)
            throws UserException {
        ExportJob exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
        // set export job and check catalog/db/table
        CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
        DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
        TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
        if (table.isTemporary()) {
            throw new AnalysisException("Table[" + tblName.getTbl() + "] is temporary table, do not support export.");
        }

        exportJob.setDbId(db.getId());
        exportJob.setTableName(tblName);
        exportJob.setExportTable(table);
        exportJob.setTableId(table.getId());
        // set partitions
        exportJob.setPartitionNames(this.partitionsNames);
        // set where expression
        exportJob.setWhereExpression(this.expr);
        // set path
        exportJob.setExportPath(this.path);

        // set column separator
        String columnSeparator = Separator.convertSeparator(fileProperties.getOrDefault(
                PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR));
        exportJob.setColumnSeparator(columnSeparator);

        // set line delimiter
        String lineDelimiter = Separator.convertSeparator(fileProperties.getOrDefault(
                PropertyAnalyzer.PROPERTIES_LINE_DELIMITER, DEFAULT_LINE_DELIMITER));
        exportJob.setLineDelimiter(lineDelimiter);

        // set format
        exportJob.setFormat(fileProperties.getOrDefault(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE, "csv")
                .toLowerCase());

        // set withBom
        exportJob.setWithBom(fileProperties.getOrDefault(OutFileClause.PROP_WITH_BOM, "false"));

        // set parallelism
        int parallelism;
        try {
            parallelism = Integer.parseInt(fileProperties.getOrDefault(PARALLELISM, DEFAULT_PARALLELISM));
        } catch (NumberFormatException e) {
            throw new AnalysisException("The value of parallelism is invalid!");
        }
        exportJob.setParallelism(parallelism);

        // set label
        // if fileProperties contains LABEL, the label has been checked in check phases
        String defaultLabel = "export_" + UUID.randomUUID();
        exportJob.setLabel(fileProperties.getOrDefault(LABEL, defaultLabel));

        // set max_file_size
        exportJob.setMaxFileSize(fileProperties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, ""));
        // set delete_existing_files
        exportJob.setDeleteExistingFiles(fileProperties.getOrDefault(
                OutFileClause.PROP_DELETE_EXISTING_FILES, ""));

        // null means not specified
        // "" means user specified zero columns
        // if fileProperties contains KEY_IN_PARAM_COLUMNS, the columns have been checked in check phases
        String columns = fileProperties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, null);
        exportJob.setColumns(columns);
        if (columns != null) {
            Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
            exportJob.setExportColumns(split.splitToList(columns.toLowerCase()));
        }

        // set broker desc
        exportJob.setBrokerDesc(this.brokerDesc.get());

        // set sessions
        exportJob.setQualifiedUser(ctx.getQualifiedUser());
        exportJob.setUserIdentity(ctx.getCurrentUserIdentity());

        // set data consistency
        if (fileProperties.containsKey(DATA_CONSISTENCY)) {
            String dataConsistencyStr = fileProperties.get(DATA_CONSISTENCY);
            if (ExportJob.CONSISTENT_NONE.equalsIgnoreCase(dataConsistencyStr)) {
                exportJob.setDataConsistency(ExportJob.CONSISTENT_NONE);
            } else if (ExportJob.CONSISTENT_PARTITION.equalsIgnoreCase(dataConsistencyStr)) {
                exportJob.setDataConsistency(ExportJob.CONSISTENT_PARTITION);
            } else {
                throw new AnalysisException("The value of data_consistency is invalid, please use `"
                        + ExportJob.CONSISTENT_PARTITION + "`/`" + ExportJob.CONSISTENT_NONE + "`");
            }
        }

        // Must copy session variable, because session variable may be changed during export job running.
        SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable(
                ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
        exportJob.setSessionVariables(clonedSessionVariable);

        // set timeoutSecond
        int timeoutSecond;
        String timeoutString = fileProperties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT,
                String.valueOf(DEFAULT_TIMEOUT));
        try {
            timeoutSecond = Integer.parseInt(timeoutString);
        } catch (NumberFormatException e) {
            throw new UserException("The value of timeout is invalid!");
        }
        exportJob.setTimeoutSecond(timeoutSecond);

        // set compress_type
        if (fileProperties.containsKey(COMPRESS_TYPE)) {
            exportJob.setCompressType(fileProperties.get(COMPRESS_TYPE));
        }

        // exportJob generate outfile sql
        exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, this.nameParts));
        return exportJob;
    }

    private void checkFileProperties(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName)
            throws UserException {
        // check user specified label
        if (fileProperties.containsKey(LABEL)) {
            FeNameFormat.checkLabel(fileProperties.get(LABEL));
        }
    }

    public Map<String, String> getFileProperties() {
        return this.fileProperties;
    }

    public List<String> getNameParts() {
        return this.nameParts;
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitExportCommand(this, context);
    }

    @Override
    public StmtType stmtType() {
        return StmtType.EXPORT;
    }

    @Override
    public boolean needAuditEncryption() {
        return true;
    }
}