CreateTableCommand.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.types.CharType;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.DecimalV2Type;
import org.apache.doris.nereids.types.NullType;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.TinyIntType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * create table command
 */
@Developing
public class CreateTableCommand extends Command implements NeedAuditEncryption, ForwardWithSync {
    public static final Logger LOG = LogManager.getLogger(CreateTableCommand.class);

    private final Optional<LogicalPlan> ctasQuery;
    private final CreateTableInfo createTableInfo;

    public CreateTableCommand(Optional<LogicalPlan> ctasQuery, CreateTableInfo createTableInfo) {
        super(PlanType.CREATE_TABLE_COMMAND);
        this.ctasQuery = ctasQuery;
        this.createTableInfo = Objects.requireNonNull(createTableInfo, "require CreateTableInfo object");
    }

    @Override
    public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
        if (!ctasQuery.isPresent()) {
            createTableInfo.validate(ctx);
            CreateTableStmt createTableStmt = createTableInfo.translateToLegacyStmt();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Nereids start to execute the create table command, query id: {}, tableName: {}",
                        ctx.queryId(), createTableInfo.getTableName());
            }

            Env.getCurrentEnv().createTable(createTableStmt);
            return;
        }
        LogicalPlan query = ctasQuery.get();
        List<String> ctasCols = createTableInfo.getCtasColumns();
        NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
        // must disable constant folding by be, because be constant folding may return wrong type
        ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
        Plan plan = planner.planWithLock(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
        if (ctasCols == null) {
            // we should analyze the plan firstly to get the columns' name.
            ctasCols = plan.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList());
        }
        List<Slot> slots = plan.getOutput();
        if (slots.size() != ctasCols.size()) {
            throw new AnalysisException("ctas column size is not equal to the query's");
        }
        String autoRangePartitionName = getAutoRangePartitionNameOrNull();
        ImmutableList.Builder<ColumnDefinition> columnsOfQuery = ImmutableList.builder();
        for (int i = 0; i < slots.size(); i++) {
            Slot s = slots.get(i);
            DataType dataType = s.getDataType().conversion();
            if (i == 0 && dataType.isStringType()) {
                // first column of olap table can not be string type.
                // So change it to varchar type.
                dataType = VarcharType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH);
            } else {
                dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
                        NullType.class, TinyIntType.INSTANCE);
                dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
                        DecimalV2Type.class, DecimalV2Type.SYSTEM_DEFAULT);
                if (s.isColumnFromTable()) {
                    if ((!((SlotReference) s).getOriginalTable().isPresent()
                            || !((SlotReference) s).getOriginalTable().get().isManagedTable())) {
                        if (createTableInfo.getPartitionTableInfo().inIdentifierPartitions(s.getName())
                                || (createTableInfo.getDistribution() != null
                                && createTableInfo.getDistribution().inDistributionColumns(s.getName()))) {
                            // String type can not be used in partition/distributed column,
                            // so we replace it to varchar
                            dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
                                    CharacterType.class, VarcharType.MAX_VARCHAR_TYPE);
                        } else {
                            if (i == 0) {
                                // first column of olap table can not be string type.
                                // So change it to varchar type.
                                dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
                                        CharacterType.class, VarcharType.MAX_VARCHAR_TYPE);
                            } else {
                                // change varchar/char column from external table to string type
                                dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
                                        CharacterType.class, StringType.INSTANCE);
                            }
                        }
                    }
                } else {
                    if (ctx.getSessionVariable().useMaxLengthOfVarcharInCtas) {
                        dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
                                VarcharType.class, VarcharType.MAX_VARCHAR_TYPE);
                        dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
                                CharType.class, VarcharType.MAX_VARCHAR_TYPE);
                    }
                }
            }
            if (autoRangePartitionName != null && autoRangePartitionName.equalsIgnoreCase(s.getName())) {
                // for auto range partition column, it must be not nullable. so keep its origin.
                columnsOfQuery.add(new ColumnDefinition(s.getName(), dataType, s.nullable()));
            } else {
                // if the column is an expression, we set it to nullable, otherwise according to the nullable of the
                // slot.
                columnsOfQuery.add(new ColumnDefinition(s.getName(), dataType, !s.isColumnFromTable() || s.nullable()));
            }
        }
        List<String> qualifierTableName = RelationUtil.getQualifierName(ctx, createTableInfo.getTableNameParts());
        createTableInfo.validateCreateTableAsSelect(qualifierTableName, columnsOfQuery.build(), ctx);
        CreateTableStmt createTableStmt = createTableInfo.translateToLegacyStmt();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Nereids start to execute the ctas command, query id: {}, tableName: {}",
                    ctx.queryId(), createTableInfo.getTableName());
        }
        try {
            if (Env.getCurrentEnv().createTable(createTableStmt)) {
                return;
            }
        } catch (Exception e) {
            throw new AnalysisException(e.getMessage(), e.getCause());
        }

        query = UnboundTableSinkCreator.createUnboundTableSink(createTableInfo.getTableNameParts(),
                ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), query);
        try {
            if (!FeConstants.runningUnitTest) {
                new InsertIntoTableCommand(query, Optional.empty(), Optional.empty(), Optional.empty()).run(
                        ctx, executor);
            }
            if (ctx.getState().getStateType() == MysqlStateType.ERR) {
                handleFallbackFailedCtas(ctx);
            }
        } catch (Exception e) {
            handleFallbackFailedCtas(ctx);
            throw new AnalysisException("Failed to execute CTAS Reason: " + e.getMessage(), e);
        }
    }

    void handleFallbackFailedCtas(ConnectContext ctx) {
        try {
            Env.getCurrentEnv().dropTable(new DropTableStmt(false,
                    new TableName(createTableInfo.getCtlName(),
                            createTableInfo.getDbName(), createTableInfo.getTableName()), true));
        } catch (Exception e) {
            // TODO: refactor it with normal error process.
            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, e.getMessage());
        }
    }

    private String getAutoRangePartitionNameOrNull() {
        try {
            if (createTableInfo.getPartitionTableInfo().isAutoPartition()
                    && createTableInfo.getPartitionTableInfo().getPartitionType().equalsIgnoreCase("RANGE")) {
                // should collect first before use them.
                createTableInfo.getPartitionTableInfo().extractPartitionColumns();
                return createTableInfo.getPartitionTableInfo().getIdentifierPartitionColumns().get(0);
            }
        } catch (Exception e) {
            return null;
        }
        return null;
    }

    public boolean isCtasCommand() {
        return ctasQuery.isPresent();
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitCreateTableCommand(this, context);
    }

    public CreateTableInfo getCreateTableInfo() {
        return createTableInfo;
    }

    public Optional<LogicalPlan> getCtasQuery() {
        return ctasQuery;
    }

    @Override
    public StmtType stmtType() {
        return StmtType.CREATE;
    }

    @Override
    public boolean needAuditEncryption() {
        return !createTableInfo.getEngineName().equalsIgnoreCase(CreateTableInfo.ENGINE_OLAP);
    }
}