InsertIntoTableCommand.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.DataSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
 * insert into select command implementation
 * insert into select command support the grammar: explain? insert into table columns? partitions? hints? query
 * InsertIntoTableCommand is a command to represent insert the answer of a query into a table.
 * class structure's:
 * InsertIntoTableCommand(Query())
 * ExplainCommand(Query())
 */
public class InsertIntoTableCommand extends Command implements ForwardWithSync, Explainable {

    public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class);

    private LogicalPlan originalLogicalQuery;
    private LogicalPlan logicalQuery;
    private Optional<String> labelName;
    /**
     * When source it's from job scheduler,it will be set.
     */
    private long jobId;
    private final Optional<InsertCommandContext> insertCtx;
    private final Optional<LogicalPlan> cte;

    /**
     * constructor
     */
    public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
                                  Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> cte) {
        super(PlanType.INSERT_INTO_TABLE_COMMAND);
        this.originalLogicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
        this.logicalQuery = originalLogicalQuery;
        this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
        this.insertCtx = insertCtx;
        this.cte = cte;
    }

    public LogicalPlan getLogicalQuery() {
        return logicalQuery;
    }

    public Optional<String> getLabelName() {
        return labelName;
    }

    public void setLabelName(Optional<String> labelName) {
        this.labelName = labelName;
    }

    public void setJobId(long jobId) {
        this.jobId = jobId;
    }

    @Override
    public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
        runInternal(ctx, executor);
    }

    public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
                                  LoadStatistic loadStatistic) throws Exception {
        // TODO: add coordinator statistic
        runInternal(ctx, executor);
    }

    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception {
        return initPlan(ctx, executor, true);
    }

    /**
     * This function is used to generate the plan for Nereids.
     * There are some load functions that only need to the plan, such as stream_load.
     * Therefore, this section will be presented separately.
     * @param needBeginTransaction whether to start a transaction.
     *       For external uses such as creating a job, only basic analysis is needed without starting a transaction,
     *       in which case this can be set to false.
     */
    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor,
                                           boolean needBeginTransaction) throws Exception {
        List<String> qualifiedTargetTableName = InsertUtils.getTargetTableQualified(logicalQuery, ctx);
        AbstractInsertExecutor insertExecutor;
        int retryTimes = 0;
        while (++retryTimes < Math.max(ctx.getSessionVariable().dmlPlanRetryTimes, 3)) {
            TableIf targetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
            // check auth
            if (!Env.getCurrentEnv().getAccessManager()
                    .checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(),
                            targetTableIf.getDatabase().getFullName(), targetTableIf.getName(),
                            PrivPredicate.LOAD)) {
                ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
                        ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
                        targetTableIf.getDatabase().getFullName() + "." + targetTableIf.getName());
            }
            BuildInsertExecutorResult buildResult;
            try {
                buildResult = initPlanOnce(ctx, executor, targetTableIf);
            } catch (Throwable e) {
                Throwables.throwIfInstanceOf(e, RuntimeException.class);
                throw new IllegalStateException(e.getMessage(), e);
            }
            insertExecutor = buildResult.executor;
            if (!needBeginTransaction) {
                return insertExecutor;
            }
            // lock after plan and check does table's schema changed to ensure we lock table order by id.
            TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
            newestTargetTableIf.readLock();
            try {
                if (targetTableIf.getId() != newestTargetTableIf.getId()) {
                    LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}",
                            retryTimes, DebugUtil.printId(ctx.queryId()),
                            targetTableIf.getId(), newestTargetTableIf.getId());
                    newestTargetTableIf.readUnlock();
                    continue;
                }
                // Use the schema saved during planning as the schema of the original target table.
                if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) {
                    LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}",
                            retryTimes, DebugUtil.printId(ctx.queryId()),
                            ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema());
                    newestTargetTableIf.readUnlock();
                    continue;
                }
                if (!insertExecutor.isEmptyInsert()) {
                    insertExecutor.beginTransaction();
                    insertExecutor.finalizeSink(
                            buildResult.planner.getFragments().get(0), buildResult.dataSink,
                            buildResult.physicalSink
                    );
                }
                newestTargetTableIf.readUnlock();
            } catch (Throwable e) {
                newestTargetTableIf.readUnlock();
                // the abortTxn in onFail need to acquire table write lock
                if (insertExecutor != null) {
                    insertExecutor.onFail(e);
                }
                Throwables.throwIfInstanceOf(e, RuntimeException.class);
                throw new IllegalStateException(e.getMessage(), e);
            }
            executor.setProfileType(ProfileType.LOAD);
            // We exposed @StmtExecutor#cancel as a unified entry point for statement interruption,
            // so we need to set this here
            insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId());
            executor.setCoord(insertExecutor.getCoordinator());
            // for prepare and execute, avoiding normalization for every execute command
            this.originalLogicalQuery = this.logicalQuery;
            return insertExecutor;
        }
        LOG.warn("insert plan failed {} times. query id is {}.", retryTimes, DebugUtil.printId(ctx.queryId()));
        throw new AnalysisException("Insert plan failed. Could not get target table lock.");
    }

    private BuildInsertExecutorResult initPlanOnce(ConnectContext ctx,
            StmtExecutor executor, TableIf targetTableIf) throws Throwable {
        AbstractInsertExecutor insertExecutor;
        // 1. process inline table (default values, empty values)
        this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf, insertCtx);
        if (cte.isPresent()) {
            this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
        }
        OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, this.logicalQuery, this.insertCtx);
        LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
        NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
        planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
        executor.setPlanner(planner);
        executor.checkBlockRules();
        if (ctx.getConnectType() == ConnectType.MYSQL && ctx.getMysqlChannel() != null) {
            ctx.getMysqlChannel().reset();
        }
        Optional<PhysicalSink<?>> plan = (planner.getPhysicalPlan()
                .<PhysicalSink<?>>collect(PhysicalSink.class::isInstance)).stream()
                .findAny();
        Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table");
        PhysicalSink physicalSink = plan.get();
        DataSink sink = planner.getFragments().get(0).getSink();
        // Transaction insert should reuse the label in the transaction.
        String label = this.labelName.orElse(
                ctx.isTxnModel() ? null : String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo));

        if (physicalSink instanceof PhysicalOlapTableSink) {
            boolean emptyInsert = childIsEmptyRelation(physicalSink);
            OlapTable olapTable = (OlapTable) targetTableIf;
            // the insertCtx contains some variables to adjust SinkNode
            if (ctx.isTxnModel()) {
                insertExecutor = new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
            } else if (ctx.isGroupCommit()) {
                insertExecutor = new OlapGroupCommitInsertExecutor(ctx, olapTable, label, planner, insertCtx,
                        emptyInsert);
            } else {
                insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
            }

            boolean isEnableMemtableOnSinkNode =
                    olapTable.getTableProperty().getUseSchemaLightChange()
                            ? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
                            : false;
            insertExecutor.getCoordinator().getQueryOptions()
                    .setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
        } else if (physicalSink instanceof PhysicalHiveTableSink) {
            boolean emptyInsert = childIsEmptyRelation(physicalSink);
            HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
            if (hiveExternalTable.isHiveTransactionalTable()) {
                throw new AnalysisException("Not supported insert into hive transactional table.");
            }
            insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
                    Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert);
            // set hive query options
        } else if (physicalSink instanceof PhysicalIcebergTableSink) {
            boolean emptyInsert = childIsEmptyRelation(physicalSink);
            IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
            insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner,
                    Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert);
        } else if (physicalSink instanceof PhysicalJdbcTableSink) {
            boolean emptyInsert = childIsEmptyRelation(physicalSink);
            List<Column> cols = ((PhysicalJdbcTableSink<?>) physicalSink).getCols();
            List<Slot> slots = ((PhysicalJdbcTableSink<?>) physicalSink).getOutput();
            if (physicalSink.children().size() == 1) {
                if (physicalSink.child(0) instanceof PhysicalOneRowRelation
                        || physicalSink.child(0) instanceof PhysicalUnion) {
                    for (int i = 0; i < cols.size(); i++) {
                        if (!(cols.get(i).isAllowNull()) && slots.get(i).nullable()) {
                            throw new AnalysisException("Column `" + cols.get(i).getName()
                                    + "` is not nullable, but the inserted value is nullable.");
                        }
                    }
                }
            }
            JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) targetTableIf;
            insertExecutor = new JdbcInsertExecutor(ctx, jdbcExternalTable, label, planner,
                    Optional.of(insertCtx.orElse((new JdbcInsertCommandContext()))), emptyInsert);
        } else {
            // TODO: support other table types
            throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table");
        }
        return new BuildInsertExecutorResult(planner, insertExecutor, sink, physicalSink);
    }

    private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
        AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
        // if the insert stmt data source is empty, directly return, no need to be executed.
        if (insertExecutor.isEmptyInsert()) {
            return;
        }
        insertExecutor.executeSingleInsert(executor, jobId);
    }

    public boolean isExternalTableSink() {
        return !(logicalQuery instanceof UnboundTableSink);
    }

    /**
     * get the target table of the insert command
     */
    public TableIf getTable(ConnectContext ctx) throws Exception {
        TableIf targetTableIf = InsertUtils.getTargetTable(originalLogicalQuery, ctx);
        if (!Env.getCurrentEnv().getAccessManager()
                .checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(),
                        targetTableIf.getDatabase().getFullName(), targetTableIf.getName(),
                        PrivPredicate.LOAD)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
                    ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
                    targetTableIf.getDatabase().getFullName() + "." + targetTableIf.getName());
        }
        return targetTableIf;
    }

    /**
     * get the target columns of the insert command
     */
    public List<String> getTargetColumns() {
        if (originalLogicalQuery instanceof UnboundTableSink) {
            UnboundLogicalSink<? extends Plan> unboundTableSink
                    = (UnboundTableSink<? extends Plan>) originalLogicalQuery;
            return CollectionUtils.isEmpty(unboundTableSink.getColNames()) ? null : unboundTableSink.getColNames();
        } else {
            throw new AnalysisException(
                    "the root of plan should be [UnboundTableSink], but it is " + originalLogicalQuery.getType());
        }
    }

    @Override
    public Plan getExplainPlan(ConnectContext ctx) {
        Plan plan = InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
        if (cte.isPresent()) {
            plan = cte.get().withChildren(plan);
        }
        return plan;
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitInsertIntoTableCommand(this, context);
    }

    private boolean childIsEmptyRelation(PhysicalSink sink) {
        if (sink.children() != null && sink.children().size() == 1
                && sink.child(0) instanceof PhysicalEmptyRelation) {
            return true;
        }
        return false;
    }

    @Override
    public StmtType stmtType() {
        return StmtType.INSERT;
    }

    @Override
    public RedirectStatus toRedirectStatus() {
        if (ConnectContext.get().isGroupCommit()) {
            return RedirectStatus.NO_FORWARD;
        } else {
            return RedirectStatus.FORWARD_WITH_SYNC;
        }
    }

    private static class BuildInsertExecutorResult {
        private final NereidsPlanner planner;
        private final AbstractInsertExecutor executor;
        private final DataSink dataSink;
        private final PhysicalSink<?> physicalSink;

        public BuildInsertExecutorResult(NereidsPlanner planner, AbstractInsertExecutor executor, DataSink dataSink,
                PhysicalSink<?> physicalSink) {
            this.planner = planner;
            this.executor = executor;
            this.dataSink = dataSink;
            this.physicalSink = physicalSink;
        }
    }
}