IvmLinearDeltaStrategy.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv.ivm;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.scalar.AssertTrue;
import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;

import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
 * Delta rewrite strategy for MVs whose delta propagates linearly through the plan,
 * i.e. the output delta is a direct combination of input deltas without needing to
 * look up base-table rows or replay aggregate state. Currently covers
 * Scan / Project / Filter / Inner Join / Union All. Aggregate MVs are handled by
 * {@link IvmAggDeltaStrategy}; outer joins (which require null-padding lookups) are
 * intended to live in their own strategy.
 *
 * <p>Extends {@link PlanVisitor} with return type {@link RewriteResult} that carries both
 * the rewritten plan and the propagated dml_factor Slot. The visitor injects
 * {@code dml_factor} at the OlapScan level ��� either derived from the base table's
 * {@code binlog_op} column ({@code IF(binlog_op = 0, 1, -1)}) when present,
 * or as the literal
 * {@code 1} (insert-only assumption) when absent ��� and propagates it upward through
 * Projects, Filters, Inner Joins and Union All. Unsupported node types cause an
 * immediate {@link AnalysisException}.
 *
 * <p>Each instance is single-use: create a fresh instance per rewrite invocation.
 *
 * @see IvmAggDeltaStrategy
 */
public class IvmLinearDeltaStrategy extends PlanVisitor<IvmLinearDeltaStrategy.RewriteResult, Void>
        implements IvmDeltaStrategy {

    /** Result of a visitor rewrite step: the rewritten plan plus the dml_factor Slot. */
    protected static class RewriteResult {
        protected final Plan plan;
        protected final Slot dmlFactorSlot;
        /** True only for the terminal apply plan produced by the aggregate visitor. */
        protected final boolean isTerminal;

        protected RewriteResult(Plan plan, Slot dmlFactorSlot) {
            this(plan, dmlFactorSlot, false);
        }

        protected RewriteResult(Plan plan, Slot dmlFactorSlot, boolean isTerminal) {
            this.plan = plan;
            this.dmlFactorSlot = dmlFactorSlot;
            this.isTerminal = isTerminal;
        }
    }

    private static final String NON_DET_ROW_ID_MSG_PREFIX =
            "IVM fallback: delete on non-deterministic row_id in ";

    protected final IvmRefreshContext ctx;

    public IvmLinearDeltaStrategy(IvmRefreshContext ctx) {
        this.ctx = Objects.requireNonNull(ctx, "ctx can not be null");
    }

    @Override
    public List<Command> rewrite(Plan normalizedPlan) {
        RewriteResult result = rewritePlan(normalizedPlan);
        Plan finalPlan = buildSinkProject(result);
        Command insertCommand = buildInsertCommandWithDeleteSign(finalPlan);
        return Collections.singletonList(insertCommand);
    }

    /** Strips ResultSink and walks the plan tree via the visitor. */
    protected RewriteResult rewritePlan(Plan normalizedPlan) {
        Plan queryPlan = stripResultSink(normalizedPlan);
        return queryPlan.accept(this, null);
    }

    // ---- Visitor methods ----

    /** Unsupported node types throw immediately. */
    @Override
    public RewriteResult visit(Plan plan, Void ctx) {
        throw new AnalysisException(
                "IVM delta rewrite does not support: " + plan.getClass().getSimpleName());
    }

    /**
     * Wraps scan with Project(scan_output + dml_factor).
     *
     * <p>If the base table has a {@code binlog_op} column (following the delete-sign convention:
     * 0 = insert, 1 = delete), dml_factor is derived as {@code IF(binlog_op = 0, 1, -1)}.
     * Otherwise, falls back to the literal {@code dml_factor = 1} (insert-only assumption).
     */
    @Override
    public RewriteResult visitLogicalOlapScan(LogicalOlapScan scan, Void ctx) {
        if (!scan.isDelta()) {
            // Snapshot scan: no dml_factor injection; return the scan unchanged.
            return new RewriteResult(scan, null);
        }
        Expression factorExpr = buildDmlFactorExpr(scan);
        Alias factorAlias = new Alias(factorExpr, Column.IVM_DML_FACTOR_COL);
        ImmutableList.Builder<NamedExpression> outputs = ImmutableList.builderWithExpectedSize(
                scan.getOutput().size() + 1);
        scan.getOutput().forEach(slot -> outputs.add((NamedExpression) slot));
        outputs.add(factorAlias);
        LogicalProject<?> project = new LogicalProject<>(outputs.build(), scan);
        Slot dmlFactorSlot = project.getOutput().get(project.getOutput().size() - 1);
        return new RewriteResult(project, dmlFactorSlot);
    }

    /**
     * Builds the dml_factor expression for the given scan.
     *
     * <p>If the table contains a {@code binlog_op} column, returns
     * {@code IF(binlog_op = 0, 1, -1)}.
     * Otherwise, returns the literal {@code 1} (insert-only).
     */
    private Expression buildDmlFactorExpr(LogicalOlapScan scan) {
        if (scan.getTable().getColumn(Column.IVM_MOCK_BINLOG_OPERATION_COL) == null) {
            return new TinyIntLiteral((byte) 1);
        }
        Slot opSlot = findSlotByName(scan.getOutput(), Column.IVM_MOCK_BINLOG_OPERATION_COL);
        return new If(
                new EqualTo(opSlot, new TinyIntLiteral((byte) 0)),
                new TinyIntLiteral((byte) 1),
                new TinyIntLiteral((byte) -1));
    }

    /** Propagates dml_factor slot by appending it to the project output, unless already present. */
    @Override
    public RewriteResult visitLogicalProject(LogicalProject<? extends Plan> project, Void ctx) {
        RewriteResult childResult = project.child().accept(this, ctx);
        // Preserve normalize-added hidden columns (for example row_id on snapshot-side projects)
        // even when there is no dml_factor to propagate.
        if (childResult.dmlFactorSlot == null) {
            LogicalProject<?> newProject = project.withProjectsAndChild(project.getProjects(), childResult.plan);
            return new RewriteResult(newProject, null);
        }
        return propagateDmlFactorThroughProject(project, childResult);
    }

    /**
     * Appends (or reuses) dml_factor in the project's output list.
     * Called when the child result carries a non-null dml_factor slot.
     */
    protected RewriteResult propagateDmlFactorThroughProject(
            LogicalProject<? extends Plan> project, RewriteResult childResult) {
        // If this project already carries dml_factor, just update the child
        for (int i = 0; i < project.getProjects().size(); i++) {
            NamedExpression expr = project.getProjects().get(i);
            if (Column.IVM_DML_FACTOR_COL.equals(expr.getName())) {
                LogicalProject<?> newProject = project.withProjectsAndChild(
                        project.getProjects(), childResult.plan);
                return new RewriteResult(newProject, newProject.getOutput().get(i));
            }
        }
        ImmutableList.Builder<NamedExpression> newOutputs = ImmutableList.builderWithExpectedSize(
                project.getProjects().size() + 1);
        newOutputs.addAll(project.getProjects());
        newOutputs.add(childResult.dmlFactorSlot);
        LogicalProject<?> newProject = project.withProjectsAndChild(newOutputs.build(), childResult.plan);
        return new RewriteResult(newProject, childResult.dmlFactorSlot);
    }

    /** Filter: recurse into child, propagate dml_factor unchanged. */
    @Override
    public RewriteResult visitLogicalFilter(LogicalFilter<? extends Plan> filter, Void ctx) {
        RewriteResult childResult = filter.child().accept(this, ctx);
        Plan newFilter = filter.withChildren(ImmutableList.of(childResult.plan));
        return new RewriteResult(newFilter, childResult.dmlFactorSlot);
    }

    /**
     * Join: visit both children, propagate dml_factor from the delta side.
     * Only INNER_JOIN and CROSS_JOIN are supported (validated at normalize time).
     *
     * <p>At most one child has dml_factor (the delta side); the other (snapshot side)
     * has dml_factor = null. If the snapshot side's row_id is non-deterministic,
     * the dml_factor is wrapped with an assert_true guard so that delete deltas
     * trigger a runtime fallback to full refresh.
     */
    @Override
    public RewriteResult visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, Void ctx) {
        JoinType joinType = join.getJoinType();
        if (joinType != JoinType.INNER_JOIN && joinType != JoinType.CROSS_JOIN) {
            throw new AnalysisException(
                    "IVM delta rewrite does not support join type: " + joinType);
        }
        if (join.isMarkJoin()) {
            throw new AnalysisException(
                    "IVM delta rewrite does not support mark join (subquery with disjunction).");
        }

        RewriteResult leftResult = join.left().accept(this, ctx);
        RewriteResult rightResult = join.right().accept(this, ctx);

        if (leftResult.dmlFactorSlot != null && rightResult.dmlFactorSlot != null) {
            throw new AnalysisException(
                    "IVM: both sides of join have dml_factor ��� expected at most one delta side");
        }

        LogicalJoin<Plan, Plan> newJoin = (LogicalJoin<Plan, Plan>) join.withChildren(
                leftResult.plan, rightResult.plan);

        if (leftResult.dmlFactorSlot == null && rightResult.dmlFactorSlot == null) {
            return new RewriteResult(newJoin, null);
        }

        return addNonDetGuardForJoinDelta(newJoin, leftResult, rightResult);
    }

    protected RewriteResult addNonDetGuardForJoinDelta(LogicalJoin<Plan, Plan> join,
            RewriteResult leftResult, RewriteResult rightResult) {
        // IMPORTANT: We ONLY guard the SNAPSHOT side, NOT the delta side.
        //
        // Proof by induction that the delta side's row_id is always deterministic when
        // dml_factor < 0 (delete):
        //   Base case: dml_factor < 0 at a base scan ��� the table is MOW (Merge-on-Write),
        //     which always produces deterministic row_id (unique key).
        //   Inductive step: for a join, dml_factor < 0 can only come from the delta side.
        //     The delta child's row_id is deterministic (by induction hypothesis), and the
        //     snapshot side's row_id is enforced deterministic by THIS guard. So the composed
        //     row_id = hash(left_row_id, right_row_id) is also deterministic.
        //
        // Therefore, delta-side row_id non-determinism check is unnecessary and must NOT be
        // added ��� doing so would cause false-positive fallbacks.
        //
        boolean deltaOnLeft = leftResult.dmlFactorSlot != null;
        Slot dmlFactorSlot = deltaOnLeft ? leftResult.dmlFactorSlot : rightResult.dmlFactorSlot;
        Plan snapshotSidePlan = deltaOnLeft ? join.right() : join.left();

        if (needNonDetGuard(snapshotSidePlan)) {
            return wrapDmlFactorWithNonDetGuard(new RewriteResult(join, dmlFactorSlot), join.getJoinType());
        }
        return new RewriteResult(join, dmlFactorSlot);
    }

    /**
     * UNION ALL: visit all children, eliminate non-delta arms.
     *
     * <p>Semantics: {@code ��(a UNION ALL b) = ��a UNION ALL ��b}. Each arm's delta is independent.
     * Since IvmDeltaRewriter generates exactly one delta scan per plan, at most one arm has
     * dml_factor (the delta arm). Non-delta arms are eliminated entirely ��� unlike JOIN which
     * keeps the snapshot side.
     *
     * <p>When all children are snapshot (dmlFactorSlot == null for all), the UNION is rebuilt
     * with rewritten children and returned with null dmlFactorSlot (snapshot side of a higher join).
     *
     * <p>When exactly one child has dml_factor, the UNION is collapsed to that single arm
     * with a column-mapping Project that remaps child slots to the union's output ExprIds.
     *
     * <p><b>Why no non-deterministic row_id guard here (unlike JOIN):</b>
     * Non-delta arms are eliminated entirely, so there is no "snapshot side" whose non-deterministic
     * row_id could cause spurious dml_factor &lt; 0.  DUP tables (non-deterministic row_id) never
     * produce dml_factor &lt; 0 because they have no binlog_op column (always insert-only).
     * MOW tables always have deterministic row_id.  Therefore no assert_true guard is needed.
     */
    @Override
    public RewriteResult visitLogicalUnion(LogicalUnion union, Void ctx) {
        List<RewriteResult> childResults = new ArrayList<>();
        for (Plan child : union.children()) {
            childResults.add(child.accept(this, ctx));
        }

        int deltaIdx = -1;
        for (int i = 0; i < childResults.size(); i++) {
            if (childResults.get(i).dmlFactorSlot != null) {
                if (deltaIdx != -1) {
                    throw new AnalysisException(
                            "IVM: multiple UNION ALL arms have dml_factor ��� expected at most one delta arm");
                }
                deltaIdx = i;
            }
        }

        if (deltaIdx == -1) {
            // All snapshot: rebuild UNION with rewritten children
            ImmutableList.Builder<Plan> newChildren = ImmutableList.builder();
            for (RewriteResult r : childResults) {
                newChildren.add(r.plan);
            }
            Plan newUnion = union.withChildren(newChildren.build());
            return new RewriteResult(newUnion, null);
        }

        // Eliminate non-delta arms. Map delta child's slots to union's output ExprIds.
        RewriteResult deltaChild = childResults.get(deltaIdx);
        List<SlotReference> childMapping = union.getRegularChildrenOutputs().get(deltaIdx);
        List<NamedExpression> unionOutputs = union.getOutputs();

        // Create projection: for each union output, alias the child's corresponding slot
        // with the union's ExprId so parent references remain valid.
        ImmutableList.Builder<NamedExpression> projections = ImmutableList.builder();
        for (int j = 0; j < unionOutputs.size(); j++) {
            NamedExpression unionOut = unionOutputs.get(j);
            SlotReference childSlot = childMapping.get(j);
            projections.add(new Alias(unionOut.getExprId(), childSlot, unionOut.getName()));
        }
        // Pass through dml_factor
        projections.add(deltaChild.dmlFactorSlot);

        LogicalProject<Plan> mappedProject = new LogicalProject<>(projections.build(), deltaChild.plan);
        Slot newDmlFactor = mappedProject.getOutput().get(mappedProject.getOutput().size() - 1);
        return new RewriteResult(mappedProject, newDmlFactor);
    }

    /**
     * Checks if the snapshot side's row_id slot is non-deterministic.
     * Returns true (conservatively add guard) when normalizeResult or row_id slot is unavailable.
     */
    protected boolean needNonDetGuard(Plan snapshotSidePlan) {
        IvmNormalizeResult normalizeResult = this.ctx.getNormalizeResult();
        if (normalizeResult == null) {
            return true;
        }
        Slot rowIdSlot = IvmUtil.findRowIdSlotOrNull(snapshotSidePlan.getOutput());
        if (rowIdSlot == null) {
            return true;
        }
        return !normalizeResult.isDeterministic(rowIdSlot);
    }

    // ---- Helpers ----

    /**
     * Wraps the dml_factor slot with an assert_true guard that triggers a runtime exception
     * when dml_factor < 0 (i.e., delete delta), causing fallback to full refresh.
     *
     * <p>This is used when the MV's row_id is non-deterministic (e.g., join with DUP_KEYS table).
     * Insert deltas (dml_factor >= 0) pass through; delete deltas cause:
     * {@code IF(assert_true(dml_factor >= 0, msg), dml_factor, NULL)}.
     *
     * <p>The false branch (NullLiteral) must differ from the true branch to prevent
     * FoldConstantRuleOnFE from collapsing the IF.
     */
    protected RewriteResult wrapDmlFactorWithNonDetGuard(RewriteResult result, JoinType joinType) {
        String msg = NON_DET_ROW_ID_MSG_PREFIX + joinType;
        Expression guardedExpr = new If(
                new AssertTrue(new GreaterThanEqual(result.dmlFactorSlot,
                        new TinyIntLiteral((byte) 0)), new StringLiteral(msg)),
                result.dmlFactorSlot, new NullLiteral(result.dmlFactorSlot.getDataType()));
        Alias guardedAlias = new Alias(guardedExpr, Column.IVM_DML_FACTOR_COL);

        ImmutableList.Builder<NamedExpression> projectOutputs = ImmutableList.builder();
        for (Slot slot : result.plan.getOutput()) {
            if (Column.IVM_DML_FACTOR_COL.equals(slot.getName())) {
                projectOutputs.add(guardedAlias);
            } else {
                projectOutputs.add(slot);
            }
        }
        LogicalProject<?> guardProject = new LogicalProject<>(projectOutputs.build(), result.plan);
        Slot newDmlFactorSlot = guardProject.getOutput().stream()
                .filter(s -> Column.IVM_DML_FACTOR_COL.equals(s.getName()))
                .findFirst()
                .orElseThrow(() -> new AnalysisException("IVM: lost dml_factor after non-det guard"));
        return new RewriteResult(guardProject, newDmlFactorSlot);
    }

    /**
     * Wraps the visitor-rewritten plan with a final project that:
     * 1. Passes through columns matching mtmv.getInsertedColumnNames() in order
     * 2. Maps dml_factor to __DORIS_DELETE_SIGN__: CASE WHEN dml_factor < 0 THEN 1 ELSE 0 END
     */
    protected Plan buildSinkProject(RewriteResult result) {
        List<Slot> output = result.plan.getOutput();
        List<String> insertedColumns = ctx.getMtmv().getInsertedColumnNames();
        ImmutableList.Builder<NamedExpression> sinkOutputs = ImmutableList.builderWithExpectedSize(
                insertedColumns.size() + 1);
        for (String colName : insertedColumns) {
            sinkOutputs.add(findSlotByName(output, colName));
        }
        sinkOutputs.add(new Alias(
                new If(new LessThan(result.dmlFactorSlot, new TinyIntLiteral((byte) 0)),
                        new TinyIntLiteral((byte) 1), new TinyIntLiteral((byte) 0)),
                Column.DELETE_SIGN));
        return new LogicalProject<>(sinkOutputs.build(), result.plan);
    }

    protected Slot findSlotByName(List<Slot> slots, String name) {
        for (Slot slot : slots) {
            if (name.equals(slot.getName())) {
                return slot;
            }
        }
        throw new AnalysisException("IVM failed to find slot: " + name);
    }

    protected Plan stripResultSink(Plan plan) {
        while (plan instanceof LogicalResultSink) {
            plan = ((LogicalResultSink<?>) plan).child();
        }
        return plan;
    }

    protected Command buildInsertCommandWithDeleteSign(Plan queryPlan) {
        MTMV mtmv = ctx.getMtmv();
        List<String> sinkColumns = new ArrayList<>(mtmv.getInsertedColumnNames());
        sinkColumns.add(Column.DELETE_SIGN);
        List<String> mvNameParts = ImmutableList.of(
                InternalCatalog.INTERNAL_CATALOG_NAME,
                mtmv.getQualifiedDbName(),
                mtmv.getName());
        UnboundTableSink<LogicalPlan> sink = new UnboundTableSink<>(
                mvNameParts, sinkColumns, ImmutableList.of(),
                false, ImmutableList.of(), false,
                TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.INSERT,
                Optional.empty(), Optional.empty(), (LogicalPlan) queryPlan);
        return new InsertIntoTableCommand(sink, Optional.empty(), Optional.empty(), Optional.empty());
    }
}