MysqlDataDescription.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.load;

import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.Separator;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;

/**
 * MysqlDataDescription
 */
public class MysqlDataDescription {
    private final List<String> filePaths;
    private String dbName;
    private String tableName;
    private final PartitionNamesInfo partitionNamesInfo;
    private final Separator columnSeparator;
    private Separator lineDelimiter;
    private int skipLines = 0;
    private List<String> columns;
    private final List<Expression> columnMappingList;
    private final Map<String, String> properties;
    private boolean clientLocal;
    private List<ImportColumnDesc> parsedColumnExprList = Lists.newArrayList();
    private FileFormatProperties fileFormatProperties;

    // This map is used to collect information of file format properties.
    // The map should be only used in `constructor` and `analyzeWithoutCheckPriv` method.
    private Map<String, String> analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

    /**
     * MysqlDataDescription
     */
    public MysqlDataDescription(List<String> filePaths,
                                TableNameInfo tableNameInfo,
                                boolean clientLocal,
                                PartitionNamesInfo partitionNamesInfo,
                                Optional<String> columnSeparator,
                                Optional<String> lineDelimiter,
                                int skipLines,
                                List<String> columns,
                                List<Expression> columnMappingList,
                                Map<String, String> properties) {
        Objects.requireNonNull(filePaths, "filePaths is null");
        Objects.requireNonNull(tableNameInfo, "tableNameInfo is null");
        Objects.requireNonNull(clientLocal, "clientLocal is null");
        Objects.requireNonNull(partitionNamesInfo, "partitionNamesInfo is null");
        Objects.requireNonNull(columnSeparator, "columnSeparator is null");
        Objects.requireNonNull(lineDelimiter, "lineDelimiter is null");
        Objects.requireNonNull(skipLines, "skipLines is null");
        Objects.requireNonNull(columns, "columns is null");
        Objects.requireNonNull(columnMappingList, "columnMappingList is null");
        Objects.requireNonNull(properties, "properties is null");

        this.filePaths = filePaths;
        this.dbName = tableNameInfo.getDb();
        this.tableName = tableNameInfo.getTbl();
        this.clientLocal = clientLocal;
        this.partitionNamesInfo = partitionNamesInfo;
        this.columnSeparator = new Separator(columnSeparator.orElse(null));
        this.lineDelimiter = new Separator(lineDelimiter.orElse(null));
        this.skipLines = skipLines;
        this.columns = columns;
        this.columnMappingList = columnMappingList;
        this.properties = properties;
        this.analysisMap.putAll(properties);
    }

    public List<String> getFilePaths() {
        return filePaths;
    }

    public String getDbName() {
        return dbName;
    }

    public String getTableName() {
        return tableName;
    }

    public PartitionNamesInfo getPartitionNamesInfo() {
        return partitionNamesInfo;
    }

    public String getColumnSeparator() {
        if (columnSeparator == null) {
            return null;
        }
        return columnSeparator.getSeparator();
    }

    public String getLineDelimiter() {
        if (lineDelimiter == null) {
            return null;
        }
        return lineDelimiter.getSeparator();
    }

    public int getSkipLines() {
        return skipLines;
    }

    public List<String> getColumns() {
        return columns;
    }

    public List<Expression> getColumnMappingList() {
        return columnMappingList;
    }

    public Map<String, String> getProperties() {
        return properties;
    }

    public boolean isClientLocal() {
        return clientLocal;
    }

    /**
     * analyzeFullDbName
     */
    public String analyzeFullDbName(ConnectContext ctx) throws AnalysisException {
        String dbName = Strings.isNullOrEmpty(getDbName()) ? ctx.getDatabase() : getDbName();
        if (Strings.isNullOrEmpty(dbName)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
        }
        this.dbName = dbName;
        return this.dbName;
    }

    /**
     * analyze
     */
    public void analyze(String fullDbName) throws UserException {
        checkLoadPriv(fullDbName);
        analyzeWithoutCheckPriv(fullDbName);
    }

    private void checkLoadPriv(String fullDbName) throws AnalysisException {
        if (Strings.isNullOrEmpty(tableName)) {
            throw new AnalysisException("No table name in load statement.");
        }

        // check auth
        if (!Env.getCurrentEnv().getAccessManager()
                .checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, fullDbName, tableName,
                    PrivPredicate.LOAD)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
                    ConnectContext.get().getQualifiedUser(),
                    ConnectContext.get().getRemoteIP(), fullDbName + ": " + tableName);
        }
    }

    /**
     * analyzeWithoutCheckPriv
     */
    public void analyzeWithoutCheckPriv(String fullDbName) throws UserException {
        analyzeFilePaths();
        analyzeLoadAttributes();
        analyzeColumns();

        fileFormatProperties = FileFormatProperties.createFileFormatProperties(analysisMap);
        fileFormatProperties.analyzeFileFormatProperties(analysisMap, false);
    }

    private void analyzeFilePaths() throws AnalysisException {
        if (filePaths.isEmpty()) {
            throw new AnalysisException("No file path in load command.");
        }
        filePaths.replaceAll(String::trim);
    }

    private void analyzeLoadAttributes() throws UserException {
        if (columnSeparator.getOriSeparator() != null) {
            columnSeparator.analyze(false);
        }

        if (lineDelimiter.getOriSeparator() != null) {
            lineDelimiter.analyze(true);
        }

        if (partitionNamesInfo.getPartitionNames() != null && !partitionNamesInfo.getPartitionNames().isEmpty()) {
            partitionNamesInfo.validate();
        }
    }

    private void analyzeColumns() throws AnalysisException {
        Set<String> columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        // analyze columns
        for (String columnName : columns) {
            if (!columnNames.add(columnName)) {
                throw new AnalysisException("Duplicate column: " + columnName);
            }
            ImportColumnDesc importColumnDesc = new ImportColumnDesc(columnName, null);
            parsedColumnExprList.add(importColumnDesc);
        }
    }

    public FileFormatProperties getFileFormatProperties() {
        return fileFormatProperties;
    }

    /**
     * toSql
     */
    public String toSql() {
        StringBuilder sb = new StringBuilder();
        sb.append("DATA ").append(isClientLocal() ? "LOCAL " : "");
        sb.append("INFILE '").append(filePaths.get(0)).append("'");
        sb.append(" INTO TABLE ");
        sb.append(ClusterNamespace.getNameFromFullName(dbName) + "." + tableName);
        sb.append(" ");
        sb.append(partitionNamesInfo.toSql());

        if (columnSeparator != null) {
            sb.append(" COLUMNS TERMINATED BY ").append(columnSeparator.toSql());
        }
        if (lineDelimiter != null) {
            sb.append(" LINES TERMINATED BY ").append(lineDelimiter.toSql());
        }
        if (!columns.isEmpty()) {
            sb.append(" (");
            Joiner.on(", ").appendTo(sb, columns).append(")");
        }

        if (!columnMappingList.isEmpty()) {
            sb.append(" SET (");
            Joiner.on(", ").appendTo(sb, Lists.transform(columnMappingList, new Function<Expression, Object>() {
                @Override
                public Object apply(Expression expr) {
                    return expr.toSql();
                }
            })).append(")");
        }
        return sb.toString();
    }
}