InsertIntoTableCommand.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.dictionary.Dictionary;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.TVFRelation;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.DataSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * insert into select command implementation
 * insert into select command support the grammar: explain? insert into table columns? partitions? hints? query
 * InsertIntoTableCommand is a command to represent insert the answer of a query into a table.
 * class structure's:
 * InsertIntoTableCommand(Query())
 * ExplainCommand(Query())
 */
public class InsertIntoTableCommand extends Command implements NeedAuditEncryption, ForwardWithSync, Explainable {

    public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class);

    private LogicalPlan originLogicalQuery;
    private Optional<LogicalPlan> logicalQuery;
    private Optional<String> labelName;
    /**
     * When source it's from job scheduler,it will be set.
     */
    private long jobId;

    // default is empty. only for OlapInsertExecutor#finalizeSink will construct one for check allow auto partition
    private final Optional<InsertCommandContext> insertCtx;
    private final Optional<LogicalPlan> cte;

    /**
     * constructor
     */
    public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
                                  Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> cte) {
        super(PlanType.INSERT_INTO_TABLE_COMMAND);
        this.originLogicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
        this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
        this.logicalQuery = Optional.empty();
        this.insertCtx = insertCtx;
        this.cte = cte;
    }

    /**
     * constructor for derived class
     */
    public InsertIntoTableCommand(InsertIntoTableCommand command, PlanType planType) {
        super(planType);
        this.originLogicalQuery = command.originLogicalQuery;
        this.labelName = command.labelName;
        this.logicalQuery = command.logicalQuery;
        this.insertCtx = command.insertCtx;
        this.cte = command.cte;
        this.jobId = command.jobId;
    }

    public LogicalPlan getLogicalQuery() {
        return logicalQuery.orElse(originLogicalQuery);
    }

    protected void setLogicalQuery(LogicalPlan logicalQuery) {
        this.logicalQuery = Optional.of(logicalQuery);
    }

    protected void setOriginLogicalQuery(LogicalPlan logicalQuery) {
        this.originLogicalQuery = logicalQuery;
    }

    public Optional<String> getLabelName() {
        return labelName;
    }

    public void setLabelName(Optional<String> labelName) {
        this.labelName = labelName;
    }

    public long getJobId() {
        return jobId;
    }

    public void setJobId(long jobId) {
        this.jobId = jobId;
    }

    @Override
    public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
        runInternal(ctx, executor);
    }

    public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
                                  LoadStatistic loadStatistic) throws Exception {
        // TODO: add coordinator statistic
        runInternal(ctx, executor);
    }

    // may be overridden
    protected TableIf getTargetTableIf(ConnectContext ctx, List<String> qualifiedTargetTableName) {
        return RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
    }

    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception {
        return initPlan(ctx, executor, true);
    }

    /**
     * This function is used to generate the plan for Nereids.
     * There are some load functions that only need to the plan, such as stream_load.
     * Therefore, this section will be presented separately.
     * @param needBeginTransaction whether to start a transaction.
     *       For external uses such as creating a job, only basic analysis is needed without starting a transaction,
     *       in which case this can be set to false.
     */
    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExecutor,
                                           boolean needBeginTransaction) throws Exception {
        List<String> qualifiedTargetTableName = InsertUtils.getTargetTableQualified(originLogicalQuery, ctx);

        AbstractInsertExecutor insertExecutor;
        int retryTimes = 0;
        while (++retryTimes < Math.max(ctx.getSessionVariable().dmlPlanRetryTimes, 3)) {
            TableIf targetTableIf = getTargetTableIf(ctx, qualifiedTargetTableName);
            // check auth
            if (!Env.getCurrentEnv().getAccessManager()
                    .checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(),
                            targetTableIf.getDatabase().getFullName(), targetTableIf.getName(),
                            PrivPredicate.LOAD)) {
                ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
                        ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
                        targetTableIf.getDatabase().getFullName() + "." + targetTableIf.getName());
            }
            BuildInsertExecutorResult buildResult;
            try {
                // use originLogicalQuery to build logicalQuery again.
                buildResult = initPlanOnce(ctx, stmtExecutor, targetTableIf);
            } catch (Throwable e) {
                Throwables.throwIfInstanceOf(e, RuntimeException.class);
                throw new IllegalStateException(e.getMessage(), e);
            }
            insertExecutor = buildResult.executor;
            if (!needBeginTransaction) {
                return insertExecutor;
            }

            // lock after plan and check does table's schema changed to ensure we lock table order by id.
            TableIf newestTargetTableIf = getTargetTableIf(ctx, qualifiedTargetTableName);
            newestTargetTableIf.readLock();
            try {
                if (targetTableIf.getId() != newestTargetTableIf.getId()) {
                    LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}",
                            retryTimes, DebugUtil.printId(ctx.queryId()),
                            targetTableIf.getId(), newestTargetTableIf.getId());
                    newestTargetTableIf.readUnlock();
                    continue;
                }
                // Use the schema saved during planning as the schema of the original target table.
                if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) {
                    LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}",
                            retryTimes, DebugUtil.printId(ctx.queryId()),
                            ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema());
                    newestTargetTableIf.readUnlock();
                    continue;
                }
                if (!insertExecutor.isEmptyInsert()) {
                    insertExecutor.beginTransaction();
                    insertExecutor.finalizeSink(
                            buildResult.planner.getFragments().get(0), buildResult.dataSink,
                            buildResult.physicalSink
                    );
                }
                newestTargetTableIf.readUnlock();
            } catch (Throwable e) {
                newestTargetTableIf.readUnlock();
                // the abortTxn in onFail need to acquire table write lock
                if (insertExecutor != null) {
                    insertExecutor.onFail(e);
                }
                Throwables.throwIfInstanceOf(e, RuntimeException.class);
                throw new IllegalStateException(e.getMessage(), e);
            }
            stmtExecutor.setProfileType(ProfileType.LOAD);
            // We exposed @StmtExecutor#cancel as a unified entry point for statement interruption,
            // so we need to set this here
            insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId());
            stmtExecutor.setCoord(insertExecutor.getCoordinator());
            return insertExecutor;
        }
        LOG.warn("insert plan failed {} times. query id is {}.", retryTimes, DebugUtil.printId(ctx.queryId()));
        throw new AnalysisException("Insert plan failed. Could not get target table lock.");
    }

    private BuildInsertExecutorResult initPlanOnce(ConnectContext ctx,
            StmtExecutor stmtExecutor, TableIf targetTableIf) throws Throwable {
        targetTableIf.readLock();
        try {
            Optional<CascadesContext> analyzeContext = Optional.of(
                    CascadesContext.initContext(ctx.getStatementContext(), originLogicalQuery, PhysicalProperties.ANY)
            );
            if (!(this instanceof InsertIntoDictionaryCommand)) {
                // process inline table (default values, empty values)
                this.logicalQuery = Optional.of((LogicalPlan) InsertUtils.normalizePlan(originLogicalQuery,
                        targetTableIf, analyzeContext, insertCtx));
            }
            if (cte.isPresent()) {
                this.logicalQuery = Optional.of((LogicalPlan) cte.get().withChildren(logicalQuery.get()));
            }
            OlapGroupCommitInsertExecutor.analyzeGroupCommit(
                    ctx, targetTableIf, this.logicalQuery.get(), this.insertCtx);
        } finally {
            targetTableIf.readUnlock();
        }

        LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery.get(), ctx.getStatementContext());
        return planInsertExecutor(ctx, stmtExecutor, logicalPlanAdapter, targetTableIf);
    }

    // we should select the factory type first, but we can not initial InsertExecutor at this time,
    // because Nereids's DistributePlan are not gernerated, so we return factory and after the
    // DistributePlan have been generated, we can create InsertExecutor
    private ExecutorFactory selectInsertExecutorFactory(
            NereidsPlanner planner, ConnectContext ctx, StmtExecutor stmtExecutor, TableIf targetTableIf) {
        try {
            stmtExecutor.setPlanner(planner);
            stmtExecutor.checkBlockRules();
            if (ctx.getConnectType() == ConnectType.MYSQL && ctx.getMysqlChannel() != null) {
                ctx.getMysqlChannel().reset();
            }
            Optional<PhysicalSink<?>> plan = (planner.getPhysicalPlan()
                    .<PhysicalSink<?>>collect(PhysicalSink.class::isInstance)).stream()
                    .findAny();
            Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table");
            PhysicalSink<?> physicalSink = plan.get();
            DataSink dataSink = planner.getFragments().get(0).getSink();
            // Transaction insert should reuse the label in the transaction.
            String label = this.labelName.orElse(
                    ctx.isTxnModel() ? null : String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo));

            if (physicalSink instanceof PhysicalOlapTableSink) {
                boolean emptyInsert = childIsEmptyRelation(physicalSink);
                OlapTable olapTable = (OlapTable) targetTableIf;

                ExecutorFactory executorFactory;
                // the insertCtx contains some variables to adjust SinkNode
                if (ctx.isTxnModel()) {
                    executorFactory = ExecutorFactory.from(
                            planner,
                            dataSink,
                            physicalSink,
                            () -> new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
                    );
                } else if (ctx.isGroupCommit()) {
                    Backend groupCommitBackend = Env.getCurrentEnv()
                            .getGroupCommitManager()
                            .selectBackendForGroupCommit(targetTableIf.getId(), ctx);
                    // set groupCommitBackend for Nereids's DistributePlanner
                    planner.getCascadesContext().getStatementContext().setGroupCommitMergeBackend(groupCommitBackend);
                    executorFactory = ExecutorFactory.from(
                            planner,
                            dataSink,
                            physicalSink,
                            () -> new OlapGroupCommitInsertExecutor(
                                    ctx, olapTable, label, planner, insertCtx, emptyInsert, groupCommitBackend
                            )
                    );
                } else {
                    executorFactory = ExecutorFactory.from(
                            planner,
                            dataSink,
                            physicalSink,
                            () -> new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
                    );
                }

                return executorFactory.onCreate(executor -> {
                    Coordinator coordinator = executor.getCoordinator();
                    boolean isEnableMemtableOnSinkNode = olapTable.getTableProperty().getUseSchemaLightChange()
                                    && coordinator.getQueryOptions().isEnableMemtableOnSinkNode();
                    coordinator.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
                });
            } else if (physicalSink instanceof PhysicalHiveTableSink) {
                boolean emptyInsert = childIsEmptyRelation(physicalSink);
                HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
                if (hiveExternalTable.isHiveTransactionalTable()) {
                    throw new UserException("Not supported insert into hive transactional table.");
                }

                return ExecutorFactory.from(
                        planner,
                        dataSink,
                        physicalSink,
                        () -> new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
                                Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert)
                );
                // set hive query options
            } else if (physicalSink instanceof PhysicalIcebergTableSink) {
                boolean emptyInsert = childIsEmptyRelation(physicalSink);
                IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
                return ExecutorFactory.from(
                        planner,
                        dataSink,
                        physicalSink,
                        () -> new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner,
                                Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))),
                                emptyInsert
                        )
                );
            } else if (physicalSink instanceof PhysicalJdbcTableSink) {
                boolean emptyInsert = childIsEmptyRelation(physicalSink);
                List<Column> cols = ((PhysicalJdbcTableSink<?>) physicalSink).getCols();
                List<Slot> slots = physicalSink.getOutput();
                if (physicalSink.children().size() == 1) {
                    if (physicalSink.child(0) instanceof PhysicalOneRowRelation
                            || physicalSink.child(0) instanceof PhysicalUnion) {
                        for (int i = 0; i < cols.size(); i++) {
                            if (!(cols.get(i).isAllowNull()) && slots.get(i).nullable()) {
                                throw new AnalysisException("Column `" + cols.get(i).getName()
                                        + "` is not nullable, but the inserted value is nullable.");
                            }
                        }
                    }
                }
                JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) targetTableIf;
                return ExecutorFactory.from(
                        planner,
                        dataSink,
                        physicalSink,
                        () -> new JdbcInsertExecutor(ctx, jdbcExternalTable, label, planner,
                                Optional.of(insertCtx.orElse((new JdbcInsertCommandContext()))), emptyInsert)
                );
            } else if (physicalSink instanceof PhysicalDictionarySink) {
                boolean emptyInsert = childIsEmptyRelation(physicalSink);
                Dictionary dictionary = (Dictionary) targetTableIf;
                // insertCtx is not useful for dictionary. so keep it empty is ok.
                return ExecutorFactory.from(planner, dataSink, physicalSink,
                        () -> new DictionaryInsertExecutor(ctx, dictionary, label, planner, insertCtx, emptyInsert));
            } else {
                // TODO: support other table types
                throw new AnalysisException(
                        "insert into command only support [olap, dictionary, hive, iceberg, jdbc] table");
            }
        } catch (Throwable t) {
            Throwables.propagateIfInstanceOf(t, RuntimeException.class);
            throw new IllegalStateException(t.getMessage(), t);
        }
    }

    private BuildInsertExecutorResult planInsertExecutor(
            ConnectContext ctx, StmtExecutor stmtExecutor,
            LogicalPlanAdapter logicalPlanAdapter, TableIf targetTableIf) throws Throwable {
        LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();

        boolean supportFastInsertIntoValues = InsertUtils.supportFastInsertIntoValues(logicalPlan, targetTableIf, ctx);
        // the key logical when use new coordinator:
        // 1. use NereidsPlanner to generate PhysicalPlan
        // 2. use PhysicalPlan to select InsertExecutorFactory, some InsertExecutors want to control
        //    which backend should be used, for example, OlapGroupCommitInsertExecutor need select
        //    a backend to do group commit.
        //    Note: we can not initialize InsertExecutor at this time, because the DistributePlans
        //    have not been generated, so the NereidsSqlCoordinator can not initial too,
        // 3. NereidsPlanner use PhysicalPlan and the provided backend to generate DistributePlan
        // 4. ExecutorFactory use the DistributePlan to generate the NereidsSqlCoordinator and InsertExecutor

        AtomicReference<ExecutorFactory> executorFactoryRef = new AtomicReference<>();
        FastInsertIntoValuesPlanner planner = new FastInsertIntoValuesPlanner(
                ctx.getStatementContext(), supportFastInsertIntoValues) {
            @Override
            protected void doDistribute(boolean canUseNereidsDistributePlanner) {
                // when enter this method, the step 1 already executed

                // step 2
                executorFactoryRef.set(
                        selectInsertExecutorFactory(this, ctx, stmtExecutor, targetTableIf)
                );
                // step 3
                super.doDistribute(canUseNereidsDistributePlanner);
            }
        };

        // step 1, 2, 3
        planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
        if (LOG.isDebugEnabled()) {
            LOG.debug("insert into plan for query_id: {} is: {}.", DebugUtil.printId(ctx.queryId()),
                    planner.getPhysicalPlan().treeString());
        }
        // step 4
        return executorFactoryRef.get().build();
    }

    private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
        AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
        // if the insert stmt data source is empty, directly return, no need to be executed.
        if (insertExecutor.isEmptyInsert()) {
            return;
        }
        insertExecutor.executeSingleInsert(executor, jobId);
    }

    public boolean isExternalTableSink() {
        return !(getLogicalQuery() instanceof UnboundTableSink);
    }

    /**
     * get the target table of the insert command
     */
    public TableIf getTable(ConnectContext ctx) throws Exception {
        TableIf targetTableIf = InsertUtils.getTargetTable(originLogicalQuery, ctx);
        if (!Env.getCurrentEnv().getAccessManager()
                .checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(),
                        targetTableIf.getDatabase().getFullName(), targetTableIf.getName(),
                        PrivPredicate.LOAD)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
                    ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
                    targetTableIf.getDatabase().getFullName() + "." + targetTableIf.getName());
        }
        return targetTableIf;
    }

    /**
     * get the target columns of the insert command
     */
    public List<String> getTargetColumns() {
        if (originLogicalQuery instanceof UnboundTableSink) {
            UnboundLogicalSink<? extends Plan> unboundTableSink = (UnboundTableSink<? extends Plan>) originLogicalQuery;
            return CollectionUtils.isEmpty(unboundTableSink.getColNames()) ? null : unboundTableSink.getColNames();
        } else {
            throw new AnalysisException(
                    "the root of plan should be [UnboundTableSink], but it is " + originLogicalQuery.getType());
        }
    }

    @Override
    public Plan getExplainPlan(ConnectContext ctx) {
        Optional<CascadesContext> analyzeContext = Optional.of(
                CascadesContext.initContext(ctx.getStatementContext(), originLogicalQuery, PhysicalProperties.ANY)
        );
        Plan plan = InsertUtils.getPlanForExplain(ctx, analyzeContext, getLogicalQuery());
        if (cte.isPresent()) {
            plan = cte.get().withChildren(plan);
        }
        return plan;
    }

    @Override
    public Optional<NereidsPlanner> getExplainPlanner(LogicalPlan logicalPlan, StatementContext ctx) {
        ConnectContext connectContext = ctx.getConnectContext();
        TableIf targetTableIf = InsertUtils.getTargetTable(originLogicalQuery, connectContext);
        boolean supportFastInsertIntoValues
                = InsertUtils.supportFastInsertIntoValues(logicalPlan, targetTableIf, connectContext);
        return Optional.of(new FastInsertIntoValuesPlanner(ctx, supportFastInsertIntoValues));
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitInsertIntoTableCommand(this, context);
    }

    private boolean childIsEmptyRelation(PhysicalSink sink) {
        if (sink.children() != null && sink.children().size() == 1
                && sink.child(0) instanceof PhysicalEmptyRelation) {
            return true;
        }
        return false;
    }

    @Override
    public StmtType stmtType() {
        return StmtType.INSERT;
    }

    @Override
    public RedirectStatus toRedirectStatus() {
        if (ConnectContext.get().isGroupCommit()) {
            return RedirectStatus.NO_FORWARD;
        } else {
            return RedirectStatus.FORWARD_WITH_SYNC;
        }
    }

    @Override
    public boolean needAuditEncryption() {
        return originLogicalQuery.anyMatch(node -> node instanceof TVFRelation);
    }

    /**
     * this factory is used to delay create the AbstractInsertExecutor until the DistributePlan is generated
     * by NereidsPlanner
     */
    private static class ExecutorFactory {
        public final NereidsPlanner planner;
        public final DataSink dataSink;
        public final PhysicalSink<?> physicalSink;
        public final Supplier<AbstractInsertExecutor> executorSupplier;
        private List<Consumer<AbstractInsertExecutor>> createCallback;

        private ExecutorFactory(NereidsPlanner planner, DataSink dataSink, PhysicalSink<?> physicalSink,
                Supplier<AbstractInsertExecutor> executorSupplier) {
            this.planner = planner;
            this.dataSink = dataSink;
            this.physicalSink = physicalSink;
            this.executorSupplier = executorSupplier;
            this.createCallback = Lists.newArrayList();
        }

        public static ExecutorFactory from(
                 NereidsPlanner planner, DataSink dataSink, PhysicalSink<?> physicalSink,
                Supplier<AbstractInsertExecutor> executorSupplier) {
            return new ExecutorFactory(planner, dataSink, physicalSink, executorSupplier);
        }

        public ExecutorFactory onCreate(Consumer<AbstractInsertExecutor> onCreate) {
            this.createCallback.add(onCreate);
            return this;
        }

        public BuildInsertExecutorResult build() {
            AbstractInsertExecutor executor = executorSupplier.get();
            for (Consumer<AbstractInsertExecutor> callback : createCallback) {
                callback.accept(executor);
            }
            return new BuildInsertExecutorResult(planner, executor, dataSink, physicalSink);
        }
    }

    private static class BuildInsertExecutorResult {
        private final NereidsPlanner planner;
        private final AbstractInsertExecutor executor;
        private final DataSink dataSink;
        private final PhysicalSink<?> physicalSink;

        public BuildInsertExecutorResult(NereidsPlanner planner, AbstractInsertExecutor executor, DataSink dataSink,
                PhysicalSink<?> physicalSink) {
            this.planner = planner;
            this.executor = executor;
            this.dataSink = dataSink;
            this.physicalSink = physicalSink;
        }
    }
}