NereidsBrokerFileGroup.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.load;

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.datasource.property.fileformat.OrcFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.ParquetFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.trees.expressions.Expression;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * A broker file group information for Nereids, one @NereidsDataDescription will
 * produce one BrokerFileGroup. After parsed by broker, detailed
 * broker file information will be saved here.
 */
public class NereidsBrokerFileGroup implements Writable {
    private long tableId;
    private String columnSeparator;
    private String lineDelimiter;
    // fileFormat may be null, which means format will be decided by file's suffix
    private String fileFormat;
    private boolean isNegative;
    private List<Long> partitionIds; // can be null, means no partition specified
    private List<String> filePaths;

    // this only used in multi load, all filePaths is file not dir
    private List<Long> fileSize;

    private List<String> fileFieldNames;
    // partition columnNames
    private List<String> columnNamesFromPath;
    // columnExprList includes all fileFieldNames, columnsFromPath and column mappings
    // this param will be recreated by data desc when the log replay
    private List<NereidsImportColumnDesc> columnExprList;
    // this is only for hadoop function check
    private Map<String, Pair<String, List<String>>> columnToHadoopFunction;
    // filter the data from source directly
    private Expression precedingFilterExpr;
    // filter the data which has been mapped and transformed
    private Expression whereExpr;
    private Expression deleteCondition;
    private LoadTask.MergeType mergeType;
    // sequence column name
    private String sequenceCol;

    // load from table
    private long srcTableId = -1;
    private boolean isLoadFromTable = false;

    private boolean ignoreCsvRedundantCol = false;

    private FileFormatProperties fileFormatProperties;

    // for unit test and edit log persistence
    private NereidsBrokerFileGroup() {
    }

    /**
     * NereidsBrokerFileGroup
     */
    public NereidsBrokerFileGroup(NereidsDataDescription dataDescription) {
        this.fileFieldNames = dataDescription.getFileFieldNames();
        this.columnNamesFromPath = dataDescription.getColumnsFromPath();
        this.columnExprList = dataDescription.getParsedColumnExprList() != null
                ? dataDescription.getParsedColumnExprList()
                : new ArrayList<>();
        this.columnToHadoopFunction = dataDescription.getColumnToHadoopFunction();
        this.precedingFilterExpr = dataDescription.getPrecdingFilterExpr();
        this.whereExpr = dataDescription.getWhereExpr();
        this.deleteCondition = dataDescription.getDeleteCondition();
        this.mergeType = dataDescription.getMergeType();
        this.sequenceCol = dataDescription.getSequenceCol();
        this.filePaths = dataDescription.getFilePaths();
        // use for cloud copy into
        this.ignoreCsvRedundantCol = dataDescription.getIgnoreCsvRedundantCol();
    }

    /**
     * NereidsBrokerFileGroup
     */
    public NereidsBrokerFileGroup(long tableId, boolean isNegative,
            List<Long> partitionIds, List<String> filePaths, List<Long> fileSize, List<String> fileFieldNames,
            List<String> columnNamesFromPath, List<NereidsImportColumnDesc> columnExprList,
            Map<String, Pair<String, List<String>>> columnToHadoopFunction,
            Expression precedingFilterExpr, Expression whereExpr, Expression deleteCondition,
            LoadTask.MergeType mergeType, String sequenceCol, long srcTableId,
            boolean isLoadFromTable, boolean ignoreCsvRedundantCol, FileFormatProperties fileFormatProperties) {
        this.tableId = tableId;
        this.isNegative = isNegative;
        this.partitionIds = partitionIds;
        this.filePaths = filePaths;
        this.fileSize = fileSize;
        this.fileFieldNames = fileFieldNames;
        this.columnNamesFromPath = columnNamesFromPath;
        this.columnExprList = columnExprList != null ? columnExprList : new ArrayList<>();
        this.columnToHadoopFunction = columnToHadoopFunction;
        this.precedingFilterExpr = precedingFilterExpr;
        this.whereExpr = whereExpr;
        this.deleteCondition = deleteCondition;
        this.mergeType = mergeType;
        this.sequenceCol = sequenceCol;
        this.srcTableId = srcTableId;
        this.isLoadFromTable = isLoadFromTable;
        this.ignoreCsvRedundantCol = ignoreCsvRedundantCol;
        this.fileFormatProperties = fileFormatProperties;
    }

    /**
     * This will parse the input NereidsDataDescription to list for NereidsBrokerFileGroup
     */
    public void parse(Database db, NereidsDataDescription dataDescription) throws DdlException {
        // tableId
        OlapTable olapTable = db.getOlapTableOrDdlException(dataDescription.getTableName());
        tableId = olapTable.getId();
        olapTable.readLock();
        try {
            // partitionId
            PartitionNames partitionNames = dataDescription.getPartitionNames();
            if (partitionNames != null) {
                partitionIds = Lists.newArrayList();
                for (String pName : partitionNames.getPartitionNames()) {
                    Partition partition = olapTable.getPartition(pName, partitionNames.isTemp());
                    if (partition == null) {
                        throw new DdlException("Unknown partition '" + pName
                                + "' in table '" + olapTable.getName() + "'");
                    }
                    // partition which need load data
                    if (partition.getState() == Partition.PartitionState.RESTORE) {
                        throw new DdlException("Table [" + olapTable.getName()
                                + "], Partition[" + partition.getName() + "] is under restore");
                    }
                    partitionIds.add(partition.getId());
                }
            }

            // only do check when here's restore on this table now
            if (olapTable.getState() == OlapTable.OlapTableState.RESTORE) {
                boolean hasPartitionRestoring = olapTable.getPartitions().stream()
                        .anyMatch(partition -> partition.getState() == Partition.PartitionState.RESTORE);
                // tbl RESTORE && all partition NOT RESTORE -> whole table restore
                // tbl RESTORE && some partition RESTORE -> just partitions restore, NOT WHOLE TABLE
                // so check wether the whole table restore here
                if (!hasPartitionRestoring) {
                    throw new DdlException("Table [" + olapTable.getName() + "] is under restore");
                }
            }

            if (olapTable.getKeysType() != KeysType.AGG_KEYS && dataDescription.isNegative()) {
                throw new DdlException("Load for AGG_KEYS table should not specify NEGATIVE");
            }

            // check negative for sum aggregate type
            if (dataDescription.isNegative()) {
                for (Column column : olapTable.getBaseSchema()) {
                    if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) {
                        throw new DdlException("Column is not SUM AggregateType. column:" + column.getName());
                    }
                }
            }
        } finally {
            olapTable.readUnlock();
        }

        fileFormatProperties = dataDescription.getFileFormatProperties();
        fileFormat = fileFormatProperties.getFormatName();
        if (fileFormatProperties instanceof CsvFileFormatProperties) {
            columnSeparator = ((CsvFileFormatProperties) fileFormatProperties).getColumnSeparator();
            lineDelimiter = ((CsvFileFormatProperties) fileFormatProperties).getLineDelimiter();
        }

        isNegative = dataDescription.isNegative();

        // FilePath
        filePaths = dataDescription.getFilePaths();
        fileSize = dataDescription.getFileSize();

        if (dataDescription.isLoadFromTable()) {
            String srcTableName = dataDescription.getSrcTableName();
            // src table should be hive table
            Table srcTable = db.getTableOrDdlException(srcTableName);
            if (!(srcTable instanceof HiveTable)) {
                throw new DdlException("Source table " + srcTableName + " is not HiveTable");
            }
            // src table columns should include all columns of loaded table
            for (Column column : olapTable.getBaseSchema()) {
                boolean isIncluded = false;
                for (Column srcColumn : srcTable.getBaseSchema()) {
                    if (srcColumn.getName().equalsIgnoreCase(column.getName())) {
                        isIncluded = true;
                        break;
                    }
                }
                if (!isIncluded) {
                    throw new DdlException("Column " + column.getName() + " is not in Source table");
                }
            }
            srcTableId = srcTable.getId();
            isLoadFromTable = true;
        }
    }

    public long getTableId() {
        return tableId;
    }

    public boolean isNegative() {
        return isNegative;
    }

    public List<Long> getPartitionIds() {
        return partitionIds;
    }

    public Expression getPrecedingFilterExpr() {
        return precedingFilterExpr;
    }

    public Expression getWhereExpr() {
        return whereExpr;
    }

    public void setWhereExpr(Expression whereExpr) {
        this.whereExpr = whereExpr;
    }

    public List<String> getFilePaths() {
        return filePaths;
    }

    public List<String> getColumnNamesFromPath() {
        return columnNamesFromPath;
    }

    public List<NereidsImportColumnDesc> getColumnExprList() {
        return columnExprList;
    }

    public List<String> getFileFieldNames() {
        return fileFieldNames;
    }

    public Map<String, Pair<String, List<String>>> getColumnToHadoopFunction() {
        return columnToHadoopFunction;
    }

    public long getSrcTableId() {
        return srcTableId;
    }

    public boolean isLoadFromTable() {
        return isLoadFromTable;
    }

    public Expression getDeleteCondition() {
        return deleteCondition;
    }

    public LoadTask.MergeType getMergeType() {
        return mergeType;
    }

    public String getSequenceCol() {
        return sequenceCol;
    }

    public boolean hasSequenceCol() {
        return !Strings.isNullOrEmpty(sequenceCol);
    }

    public List<Long> getFileSize() {
        return fileSize;
    }

    public void setFileSize(List<Long> fileSize) {
        this.fileSize = fileSize;
    }

    public boolean isBinaryFileFormat() {
        return fileFormatProperties instanceof ParquetFileFormatProperties
                || fileFormatProperties instanceof OrcFileFormatProperties;
    }

    public FileFormatProperties getFileFormatProperties() {
        return fileFormatProperties;
    }

    public boolean getIgnoreCsvRedundantCol() {
        return ignoreCsvRedundantCol;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("BrokerFileGroup{tableId=").append(tableId);
        if (partitionIds != null) {
            sb.append(",partitionIds=[");
            int idx = 0;
            for (long id : partitionIds) {
                if (idx++ != 0) {
                    sb.append(",");
                }
                sb.append(id);
            }
            sb.append("]");
        }
        if (columnNamesFromPath != null) {
            sb.append(",columnsFromPath=[");
            int idx = 0;
            for (String name : columnNamesFromPath) {
                if (idx++ != 0) {
                    sb.append(",");
                }
                sb.append(name);
            }
            sb.append("]");
        }
        if (fileFieldNames != null) {
            sb.append(",fileFieldNames=[");
            int idx = 0;
            for (String name : fileFieldNames) {
                if (idx++ != 0) {
                    sb.append(",");
                }
                sb.append(name);
            }
            sb.append("]");
        }
        sb.append(",valueSeparator=").append(columnSeparator)
                .append(",lineDelimiter=").append(lineDelimiter)
                .append(",fileFormat=").append(fileFormat)
                .append(",isNegative=").append(isNegative);
        sb.append(",fileInfos=[");
        int idx = 0;
        for (String path : filePaths) {
            if (idx++ != 0) {
                sb.append(",");
            }
            sb.append(path);
        }
        sb.append("]");
        sb.append(",srcTableId=").append(srcTableId);
        sb.append(",isLoadFromTable=").append(isLoadFromTable);
        sb.append("}");

        return sb.toString();
    }

    @Deprecated
    @Override
    public void write(DataOutput out) throws IOException {
        // tableId
        out.writeLong(tableId);
        // valueSeparator
        Text.writeString(out, columnSeparator);
        // lineDelimiter
        Text.writeString(out, lineDelimiter);
        // isNegative
        out.writeBoolean(isNegative);
        // partitionIds
        if (partitionIds == null) {
            out.writeInt(0);
        } else {
            out.writeInt(partitionIds.size());
            for (long id : partitionIds) {
                out.writeLong(id);
            }
        }
        // fileFieldNames
        if (fileFieldNames == null) {
            out.writeInt(0);
        } else {
            out.writeInt(fileFieldNames.size());
            for (String name : fileFieldNames) {
                Text.writeString(out, name);
            }
        }
        // filePaths
        out.writeInt(filePaths.size());
        for (String path : filePaths) {
            Text.writeString(out, path);
        }
        // expr column map will be null after broker load supports function
        out.writeInt(0);

        // fileFormat
        if (fileFormat == null) {
            out.writeBoolean(false);
        } else {
            out.writeBoolean(true);
            Text.writeString(out, fileFormat);
        }

        // src table
        out.writeLong(srcTableId);
        out.writeBoolean(isLoadFromTable);
    }

    /**
     * readFields
     */
    @Deprecated
    public void readFields(DataInput in) throws IOException {
        tableId = in.readLong();
        columnSeparator = Text.readString(in);
        lineDelimiter = Text.readString(in);
        isNegative = in.readBoolean();
        // partitionIds
        int partSize = in.readInt();
        if (partSize > 0) {
            partitionIds = Lists.newArrayList();
            for (int i = 0; i < partSize; ++i) {
                partitionIds.add(in.readLong());
            }
        }
        // fileFieldName
        int fileFieldNameSize = in.readInt();
        if (fileFieldNameSize > 0) {
            fileFieldNames = Lists.newArrayList();
            for (int i = 0; i < fileFieldNameSize; ++i) {
                fileFieldNames.add(Text.readString(in));
            }
        }
        // fileInfos
        int size = in.readInt();
        filePaths = Lists.newArrayList();
        for (int i = 0; i < size; ++i) {
            filePaths.add(Text.readString(in));
        }
        // expr column map
        Map<String, Expr> exprColumnMap = Maps.newHashMap();
        size = in.readInt();
        for (int i = 0; i < size; ++i) {
            final String name = Text.readString(in);
            exprColumnMap.put(name, Expr.readIn(in));
        }
        // file format
        if (in.readBoolean()) {
            fileFormat = Text.readString(in);
        }
        srcTableId = in.readLong();
        isLoadFromTable = in.readBoolean();

        // There are no columnExprList in the previous load job which is created before function is supported.
        // The columnExprList could not be analyzed without origin stmt in the previous load job.
        // So, the columnExprList need to be merged in here.
        if (fileFieldNames == null || fileFieldNames.isEmpty()) {
            return;
        }
        // Order of columnExprList: fileFieldNames + columnsFromPath
        columnExprList = Lists.newArrayList();
        for (String columnName : fileFieldNames) {
            columnExprList.add(new NereidsImportColumnDesc(columnName, null));
        }
        if (exprColumnMap == null || exprColumnMap.isEmpty()) {
            return;
        }
        for (Map.Entry<String, Expr> columnExpr : exprColumnMap.entrySet()) {
            List<Expression> exprs;
            try {
                exprs = NereidsLoadUtils.parseExpressionSeq(columnExpr.getValue().toSql());
            } catch (UserException e) {
                throw new IOException(e);
            }
            columnExprList.add(new NereidsImportColumnDesc(columnExpr.getKey(), exprs.get(0)));
        }
    }

    @Deprecated
    public static NereidsBrokerFileGroup read(DataInput in) throws IOException {
        NereidsBrokerFileGroup fileGroup = new NereidsBrokerFileGroup();
        fileGroup.readFields(in);
        return fileGroup;
    }
}