ExportStmt.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.ExportJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import lombok.Getter;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

// EXPORT statement, export data to dirs by broker.
//
// syntax:
//      EXPORT TABLE table_name [PARTITION (name1[, ...])]
//          TO 'export_target_path'
//          [PROPERTIES("key"="value")]
//          WITH BROKER 'broker_name' [( $broker_attrs)]
@Getter
public class ExportStmt extends StatementBase implements NotFallbackInParser {
    public static final String PARALLELISM = "parallelism";
    public static final String LABEL = "label";
    public static final String DATA_CONSISTENCY = "data_consistency";
    public static final String COMPRESS_TYPE = "compress_type";

    private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
    private static final String DEFAULT_LINE_DELIMITER = "\n";
    private static final String DEFAULT_PARALLELISM = "1";
    private static final Integer DEFAULT_TIMEOUT = 7200;

    private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
            .add(LABEL)
            .add(PARALLELISM)
            .add(DATA_CONSISTENCY)
            .add(LoadStmt.KEY_IN_PARAM_COLUMNS)
            .add(OutFileClause.PROP_MAX_FILE_SIZE)
            .add(OutFileClause.PROP_DELETE_EXISTING_FILES)
            .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
            .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
            .add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
            .add("format")
            .add(COMPRESS_TYPE)
            .build();

    private TableName tblName;
    private List<String> partitionStringNames;
    private Expr whereExpr;
    private String path;
    private BrokerDesc brokerDesc;
    private Map<String, String> properties = Maps.newHashMap();
    private String columnSeparator;
    private String lineDelimiter;
    private String columns;

    private TableRef tableRef;

    private String format;

    private String label;

    private Integer parallelism;

    private Integer timeout;

    private String maxFileSize;
    private String deleteExistingFiles;
    private String withBom;
    private String dataConsistency = ExportJob.CONSISTENT_PARTITION;
    private String compressionType;
    private SessionVariable sessionVariables;

    private String qualifiedUser;

    private UserIdentity userIdentity;

    private ExportJob exportJob;

    public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
                      Map<String, String> properties, BrokerDesc brokerDesc) {
        this.tableRef = tableRef;
        this.whereExpr = whereExpr;
        this.path = path.trim();
        if (properties != null) {
            this.properties = properties;
        }
        this.brokerDesc = brokerDesc;
        this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
        this.lineDelimiter = DEFAULT_LINE_DELIMITER;
        this.timeout = DEFAULT_TIMEOUT;

        // ConnectionContext may not exist when in replay thread
        if (ConnectContext.get() != null) {
            this.sessionVariables = VariableMgr.cloneSessionVariable(ConnectContext.get().getSessionVariable());
        } else {
            this.sessionVariables = VariableMgr.cloneSessionVariable(VariableMgr.getDefaultSessionVariable());
        }
    }

    @Override
    public boolean needAuditEncryption() {
        return brokerDesc != null;
    }

    @Override
    public void analyze(Analyzer analyzer) throws UserException {
        super.analyze(analyzer);

        if (!Config.enable_outfile_to_local && Objects.requireNonNull(path)
                .startsWith(OutFileClause.LOCAL_FILE_PREFIX)) {
            throw new AnalysisException("`enable_outfile_to_local` = false, exporting file to local fs is disabled.");
        }

        tableRef = analyzer.resolveTableRef(tableRef);
        Preconditions.checkNotNull(tableRef);
        tableRef.analyze(analyzer);

        // disallow external catalog
        tblName = tableRef.getName();
        Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());

        // get partitions name
        Optional<PartitionNames> optionalPartitionNames = Optional.ofNullable(tableRef.getPartitionNames());
        if (optionalPartitionNames.isPresent()) {
            if (optionalPartitionNames.get().isTemp()) {
                throw new AnalysisException("Do not support exporting temporary partitions");
            }
            partitionStringNames = optionalPartitionNames.get().getPartitionNames();
        } else {
            partitionStringNames = ImmutableList.of();
        }

        // check auth
        if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), tblName.getCtl(),
                tblName.getDb(), tblName.getTbl(),
                PrivPredicate.SELECT)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "EXPORT",
                    ConnectContext.get().getQualifiedUser(),
                    ConnectContext.get().getRemoteIP(),
                    tblName.getDb() + ": " + tblName.getTbl());
        }
        qualifiedUser = ConnectContext.get().getQualifiedUser();
        userIdentity = ConnectContext.get().getCurrentUserIdentity();

        // check table && partitions whether exist
        checkPartitions(analyzer.getEnv());

        // check broker whether exist
        if (brokerDesc == null) {
            brokerDesc = new BrokerDesc("local", StorageBackend.StorageType.LOCAL, null);
        }

        // check path is valid
        StorageBackend.checkPath(path, brokerDesc.getStorageType(), null);
        if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
            BrokerMgr brokerMgr = analyzer.getEnv().getBrokerMgr();
            if (!brokerMgr.containsBroker(brokerDesc.getName())) {
                throw new AnalysisException("broker " + brokerDesc.getName() + " does not exist");
            }
            if (null == brokerMgr.getAnyBroker(brokerDesc.getName())) {
                throw new AnalysisException("failed to get alive broker");
            }
        }

        // check properties
        checkProperties(properties);

        // create job and analyze job
        setJob();
        exportJob.generateOutfileStatement();
    }

    private void setJob() throws UserException {
        exportJob = new ExportJob(Env.getCurrentEnv().getNextId());

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
        exportJob.setDbId(db.getId());
        exportJob.setTableName(this.tblName);
        exportJob.setExportTable(db.getTableOrDdlException(this.tblName.getTbl()));
        exportJob.setTableId(db.getTableOrDdlException(this.tblName.getTbl()).getId());
        exportJob.setTableRef(this.tableRef);

        // set partitions
        exportJob.setPartitionNames(this.partitionStringNames);

        // set where expr
        exportJob.setWhereExpr(this.whereExpr);

        // set path
        exportJob.setExportPath(this.path);

        // set properties
        exportJob.setLabel(this.label);
        exportJob.setColumnSeparator(this.columnSeparator);
        exportJob.setLineDelimiter(this.lineDelimiter);
        exportJob.setFormat(this.format);
        exportJob.setColumns(this.columns);
        exportJob.setParallelism(this.parallelism);
        exportJob.setMaxFileSize(this.maxFileSize);
        exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
        exportJob.setWithBom(this.withBom);
        exportJob.setDataConsistency(this.dataConsistency);
        exportJob.setCompressType(this.compressionType);

        if (columns != null) {
            Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
            exportJob.setExportColumns(split.splitToList(this.columns.toLowerCase()));
        }

        // set broker desc
        exportJob.setBrokerDesc(this.brokerDesc);

        // set sessions
        exportJob.setQualifiedUser(this.qualifiedUser);
        exportJob.setUserIdentity(this.userIdentity);
        SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable(
                ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
        exportJob.setSessionVariables(clonedSessionVariable);
        exportJob.setTimeoutSecond(this.timeout);

        exportJob.setOrigStmt(this.getOrigStmt());
    }

    // check partitions specified by user are belonged to the table.
    private void checkPartitions(Env env) throws AnalysisException {
        if (partitionStringNames.isEmpty()) {
            return;
        }

        if (partitionStringNames.size() > Config.maximum_number_of_export_partitions) {
            throw new AnalysisException("The partitions number of this export job is larger than the maximum number"
                    + " of partitions allowed by an export job");
        }

        Database db = env.getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
        Table table = db.getTableOrAnalysisException(tblName.getTbl());
        table.readLock();
        try {
            // check table
            if (!table.isPartitionedTable()) {
                throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
            }
            Table.TableType tblType = table.getType();
            switch (tblType) {
                case MYSQL:
                case ODBC:
                case JDBC:
                case OLAP:
                    break;
                case BROKER:
                case SCHEMA:
                case INLINE_VIEW:
                case VIEW:
                default:
                    throw new AnalysisException("Table[" + tblName.getTbl() + "] is "
                            + tblType + " type, do not support EXPORT.");
            }

            for (String partitionName : partitionStringNames) {
                Partition partition = table.getPartition(partitionName);
                if (partition == null) {
                    throw new AnalysisException("Partition [" + partitionName + "] does not exist "
                            + "in Table[" + tblName.getTbl() + "]");
                }
            }
        } finally {
            table.readUnlock();
        }
    }

    private void checkProperties(Map<String, String> properties) throws UserException {
        for (String key : properties.keySet()) {
            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
                throw new UserException("Invalid property key: [" + key + "]");
            }
        }

        // convert key to lowercase
        Map<String, String> tmpMap = Maps.newHashMap();
        for (String key : properties.keySet()) {
            tmpMap.put(key.toLowerCase(), properties.get(key));
        }
        properties = tmpMap;

        this.columnSeparator = Separator.convertSeparator(PropertyAnalyzer.analyzeColumnSeparator(
                properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
        this.lineDelimiter = Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
                properties, ExportStmt.DEFAULT_LINE_DELIMITER));
        // null means not specified
        // "" means user specified zero columns
        this.columns = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, null);

        // format
        this.format = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE, "csv").toLowerCase();

        // parallelism
        String parallelismString = properties.getOrDefault(PARALLELISM, DEFAULT_PARALLELISM);
        try {
            this.parallelism = Integer.parseInt(parallelismString);
        } catch (NumberFormatException e) {
            throw new UserException("The value of parallelism is invalid!");
        }

        // timeout
        String timeoutString = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT,
                String.valueOf(DEFAULT_TIMEOUT));
        try {
            this.timeout = Integer.parseInt(timeoutString);
        } catch (NumberFormatException e) {
            throw new UserException("The value of timeout is invalid!");
        }

        // max_file_size
        this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
        this.deleteExistingFiles = properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, "");

        // label
        if (properties.containsKey(LABEL)) {
            FeNameFormat.checkLabel(properties.get(LABEL));
            this.label = properties.get(LABEL);
        } else {
            // generate a random label
            this.label = "export_" + UUID.randomUUID();
        }

        // with bom
        this.withBom = properties.getOrDefault(OutFileClause.PROP_WITH_BOM, "false");

        // data consistency
        if (properties.containsKey(DATA_CONSISTENCY)) {
            String dataConsistencyStr = properties.get(DATA_CONSISTENCY);
            if (ExportJob.CONSISTENT_NONE.equalsIgnoreCase(dataConsistencyStr)) {
                this.dataConsistency = ExportJob.CONSISTENT_NONE;
            } else if (ExportJob.CONSISTENT_PARTITION.equalsIgnoreCase(dataConsistencyStr)) {
                this.dataConsistency = ExportJob.CONSISTENT_PARTITION;
            } else {
                throw new AnalysisException("The value of data_consistency is invalid, please use `"
                        + ExportJob.CONSISTENT_PARTITION + "`/`" + ExportJob.CONSISTENT_NONE + "`");
            }
        }

        // compress_type
        this.compressionType = properties.getOrDefault(COMPRESS_TYPE, "");
    }

    @Override
    public String toSql() {
        StringBuilder sb = new StringBuilder();
        sb.append("EXPORT TABLE ");
        if (tblName == null) {
            sb.append("non-exist");
        } else {
            sb.append(tblName.toSql());
        }
        if (partitionStringNames != null && !partitionStringNames.isEmpty()) {
            sb.append(" PARTITION (");
            Joiner.on(", ").appendTo(sb, partitionStringNames);
            sb.append(")");
        }
        sb.append("\n");

        sb.append(" TO ").append("'");
        sb.append(path);
        sb.append("'");

        if (properties != null && !properties.isEmpty()) {
            sb.append("\nPROPERTIES (");
            sb.append(new PrintableMap<String, String>(properties, "=", true, false));
            sb.append(")");
        }

        if (brokerDesc != null) {
            sb.append("\n WITH BROKER '").append(brokerDesc.getName()).append("' (");
            sb.append(new PrintableMap<String, String>(brokerDesc.getProperties(), "=", true, false, true));
            sb.append(")");
        }

        return sb.toString();
    }

    @Override
    public RedirectStatus getRedirectStatus() {
        return RedirectStatus.FORWARD_WITH_SYNC;
    }

    @Override
    public String toString() {
        return toSql();
    }

    @Override
    public StmtType stmtType() {
        return StmtType.EXPORT;
    }
}