RewriteTableCommand.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.TVFRelation;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.DataSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
 * RewriteTableCommand: dedicated command for rewrite operations (currently
 * Iceberg only).
 * This command is used to rewrite the data file of an iceberg table.
 */
public class RewriteTableCommand extends Command implements NeedAuditEncryption, ForwardWithSync, Explainable {

    public static final Logger LOG = LogManager.getLogger(RewriteTableCommand.class);

    private LogicalPlan originLogicalQuery;
    private Optional<LogicalPlan> logicalQuery;
    private Optional<String> labelName;
    private Optional<Plan> parsedPlan;
    private final Optional<InsertCommandContext> insertCtx;
    private final Optional<LogicalPlan> cte;
    private final Optional<String> branchName;
    private long jobId;

    /**
     * constructor for rewrite operation
     */
    public RewriteTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
            Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> cte, Optional<String> branchName) {
        super(PlanType.INSERT_INTO_TABLE_COMMAND);
        this.originLogicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
        this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
        this.logicalQuery = Optional.empty();
        this.insertCtx = insertCtx;
        this.cte = cte;
        this.branchName = branchName;
        if (Env.getCurrentEnv().isMaster()) {
            this.jobId = Env.getCurrentEnv().getNextId();
        } else {
            this.jobId = -1;
        }
    }

    public LogicalPlan getLogicalQuery() {
        return logicalQuery.orElse(originLogicalQuery);
    }

    public Optional<Plan> getParsedPlan() {
        return parsedPlan;
    }

    protected void setLogicalQuery(LogicalPlan plan) {
        this.logicalQuery = Optional.of(plan);
    }

    protected TableIf getTargetTableIf(ConnectContext ctx, List<String> qualifiedTargetTableName) {
        return RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv(), Optional.empty());
    }

    /**
     * For rewrite, we never begin transaction here. We always finalize sink in init
     * stage to keep parity with previous rewrite flow, and let external caller
     * inject txnId to coordinator.
     */
    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExecutor) throws Exception {
        List<String> qualifiedTargetTableName = InsertUtils.getTargetTableQualified(originLogicalQuery, ctx);
        ctx.getStatementContext().setIsInsert(true);

        TableIf targetTableIf = getTargetTableIf(ctx, qualifiedTargetTableName);
        if (!Env.getCurrentEnv().getAccessManager()
                .checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(),
                        targetTableIf.getDatabase().getFullName(), targetTableIf.getName(),
                        org.apache.doris.mysql.privilege.PrivPredicate.LOAD)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
                    ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
                    targetTableIf.getDatabase().getFullName() + "."
                            + Util.getTempTableDisplayName(targetTableIf.getName()));
        }

        BuildResult buildResult;
        try {
            buildResult = initPlanOnce(ctx, stmtExecutor, targetTableIf);
        } catch (Throwable e) {
            Throwables.throwIfInstanceOf(e, RuntimeException.class);
            throw new IllegalStateException(e.getMessage(), e);
        }
        AbstractInsertExecutor insertExecutor = buildResult.executor;
        parsedPlan = Optional.ofNullable(buildResult.planner.getParsedPlan());

        // For rewrite: do not begin transaction here, but finalize sink
        insertExecutor.finalizeSink(
                buildResult.planner.getFragments().get(0), buildResult.dataSink, buildResult.physicalSink);
        return insertExecutor;
    }

    private BuildResult initPlanOnce(ConnectContext ctx, StmtExecutor stmtExecutor, TableIf targetTableIf)
            throws Throwable {
        targetTableIf.readLock();
        try {
            Optional<CascadesContext> analyzeContext = Optional.of(
                    CascadesContext.initContext(ctx.getStatementContext(), originLogicalQuery, PhysicalProperties.ANY));
            // rewrite does not need special logic like normalize insert-into-values, just
            // use normalize directly
            this.logicalQuery = Optional.of((LogicalPlan) InsertUtils.normalizePlan(originLogicalQuery,
                    targetTableIf, analyzeContext, insertCtx));
            if (cte.isPresent()) {
                this.logicalQuery = Optional.of((LogicalPlan) cte.get().withChildren(logicalQuery.get()));
            }
        } finally {
            targetTableIf.readUnlock();
        }

        LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery.get(), ctx.getStatementContext());
        return planInsertExecutor(ctx, stmtExecutor, logicalPlanAdapter, targetTableIf);
    }

    private ExecutorFactory selectInsertExecutorFactory(NereidsPlanner planner, ConnectContext ctx,
            StmtExecutor stmtExecutor, TableIf targetTableIf) {
        try {
            stmtExecutor.setPlanner(planner);
            stmtExecutor.checkBlockRules();
            if (ctx.getConnectType() == ConnectContext.ConnectType.MYSQL && ctx.getMysqlChannel() != null) {
                ctx.getMysqlChannel().reset();
            }
            Optional<PhysicalSink<?>> plan = (planner.getPhysicalPlan()
                    .<PhysicalSink<?>>collect(PhysicalSink.class::isInstance)).stream().findAny();
            Preconditions.checkArgument(plan.isPresent(), "rewrite command must contain target table");
            PhysicalSink<?> physicalSink = plan.get();
            DataSink dataSink = planner.getFragments().get(0).getSink();
            String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo));

            if (physicalSink instanceof PhysicalIcebergTableSink) {
                boolean emptyInsert = childIsEmptyRelation(physicalSink);
                IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
                IcebergInsertCommandContext icebergInsertCtx = insertCtx
                        .map(c -> (IcebergInsertCommandContext) c)
                        .orElseGet(IcebergInsertCommandContext::new);
                branchName.ifPresent(notUsed -> icebergInsertCtx.setBranchName(branchName));
                return ExecutorFactory.from(planner, dataSink, physicalSink,
                        () -> new IcebergRewriteExecutor(ctx, icebergExternalTable, label, planner,
                                Optional.of(icebergInsertCtx), emptyInsert, jobId));
            }
            throw new AnalysisException("Rewrite only supports iceberg table");
        } catch (Throwable t) {
            Throwables.throwIfInstanceOf(t, RuntimeException.class);
            throw new IllegalStateException(t.getMessage(), t);
        }
    }

    private BuildResult planInsertExecutor(ConnectContext ctx, StmtExecutor stmtExecutor,
            LogicalPlanAdapter logicalPlanAdapter, TableIf targetTableIf) throws Throwable {
        LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
        boolean supportFastInsertIntoValues = InsertUtils.supportFastInsertIntoValues(logicalPlan, targetTableIf, ctx);
        AtomicReference<ExecutorFactory> executorFactoryRef = new AtomicReference<>();

        FastInsertIntoValuesPlanner planner = new FastInsertIntoValuesPlanner(
                ctx.getStatementContext(), supportFastInsertIntoValues) {
            @Override
            protected void doDistribute(boolean canUseNereidsDistributePlanner, ExplainLevel explainLevel) {
                executorFactoryRef.set(selectInsertExecutorFactory(this, ctx, stmtExecutor, targetTableIf));
                super.doDistribute(canUseNereidsDistributePlanner, explainLevel);
            }
        };

        planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
        if (LOG.isDebugEnabled()) {
            LOG.debug("rewrite plan for query_id: {} is: {}.", DebugUtil.printId(ctx.queryId()),
                    planner.getPhysicalPlan().treeString());
        }
        return executorFactoryRef.get().build();
    }

    @Override
    public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
        AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
        if (insertExecutor.isEmptyInsert()) {
            return;
        }
        insertExecutor.executeSingleInsert(executor);
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitCommand(this, context);
    }

    @Override
    public Plan getExplainPlan(ConnectContext ctx) {
        Optional<CascadesContext> analyzeContext = Optional.of(
                CascadesContext.initContext(ctx.getStatementContext(), originLogicalQuery, PhysicalProperties.ANY));
        Plan plan = InsertUtils.getPlanForExplain(ctx, analyzeContext, getLogicalQuery());
        if (cte.isPresent()) {
            plan = cte.get().withChildren(plan);
        }
        return plan;
    }

    @Override
    public Optional<NereidsPlanner> getExplainPlanner(LogicalPlan logicalPlan, StatementContext ctx) {
        ConnectContext connectContext = ctx.getConnectContext();
        TableIf targetTableIf = InsertUtils.getTargetTable(originLogicalQuery, connectContext);
        boolean supportFastInsertIntoValues = InsertUtils.supportFastInsertIntoValues(logicalPlan, targetTableIf,
                connectContext);
        return Optional.of(new FastInsertIntoValuesPlanner(ctx, supportFastInsertIntoValues));
    }

    @Override
    public StmtType stmtType() {
        return StmtType.INSERT;
    }

    @Override
    public RedirectStatus toRedirectStatus() {
        return RedirectStatus.FORWARD_WITH_SYNC;
    }

    @Override
    public boolean needAuditEncryption() {
        return originLogicalQuery
                .anyMatch(node -> node instanceof TVFRelation);
    }

    private boolean childIsEmptyRelation(PhysicalSink<? extends Plan> sink) {
        if (sink.children() != null && sink.children().size() == 1
                && sink.child(0) instanceof PhysicalEmptyRelation) {
            return true;
        }
        return false;
    }

    private static class ExecutorFactory {
        public final NereidsPlanner planner;
        public final DataSink dataSink;
        public final PhysicalSink<?> physicalSink;
        public final java.util.function.Supplier<AbstractInsertExecutor> executorSupplier;

        private ExecutorFactory(NereidsPlanner planner, DataSink dataSink, PhysicalSink<?> physicalSink,
                java.util.function.Supplier<AbstractInsertExecutor> executorSupplier) {
            this.planner = planner;
            this.dataSink = dataSink;
            this.physicalSink = physicalSink;
            this.executorSupplier = executorSupplier;
        }

        public static ExecutorFactory from(NereidsPlanner planner, DataSink dataSink, PhysicalSink<?> physicalSink,
                java.util.function.Supplier<AbstractInsertExecutor> executorSupplier) {
            return new ExecutorFactory(planner, dataSink, physicalSink, executorSupplier);
        }

        public BuildResult build() {
            AbstractInsertExecutor executor = executorSupplier.get();
            return new BuildResult(planner, executor, dataSink, physicalSink);
        }
    }

    private static class BuildResult {
        private final NereidsPlanner planner;
        private final AbstractInsertExecutor executor;
        private final DataSink dataSink;
        private final PhysicalSink<?> physicalSink;

        public BuildResult(NereidsPlanner planner, AbstractInsertExecutor executor, DataSink dataSink,
                PhysicalSink<?> physicalSink) {
            this.planner = planner;
            this.executor = executor;
            this.dataSink = dataSink;
            this.physicalSink = physicalSink;
        }
    }
}