IvmLinearDeltaHandler.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv.ivm;

import org.apache.doris.catalog.Column;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;

import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.List;

/**
 * Rewrites plan nodes whose delta propagates linearly through the plan.
 */
class IvmLinearDeltaHandler {
    private final IvmDeltaRewriteHelper helper = IvmDeltaRewriteHelper.INSTANCE;

    /**
     * Wraps delta scan with Project(scan_output + dml_factor).
     */
    IvmDeltaRewriteResult rewriteScan(LogicalOlapScan scan) {
        if (!scan.isDelta()) {
            return new IvmDeltaRewriteResult(scan, null);
        }
        Expression factorExpr = buildDmlFactorExpr(scan);
        Alias factorAlias = new Alias(factorExpr, Column.IVM_DML_FACTOR_COL);
        ImmutableList.Builder<NamedExpression> outputs = ImmutableList.builderWithExpectedSize(
                scan.getOutput().size() + 1);
        scan.getOutput().forEach(slot -> outputs.add((NamedExpression) slot));
        outputs.add(factorAlias);
        LogicalProject<?> project = new LogicalProject<>(outputs.build(), scan);
        Slot dmlFactorSlot = project.getOutput().get(project.getOutput().size() - 1);
        return new IvmDeltaRewriteResult(project, dmlFactorSlot);
    }

    /**
     * Builds the dml_factor expression for the given scan.
     */
    private Expression buildDmlFactorExpr(LogicalOlapScan scan) {
        if (scan.getTable().getColumn(Column.IVM_MOCK_BINLOG_OPERATION_COL) == null) {
            return new TinyIntLiteral((byte) 1);
        }
        Slot opSlot = helper.findSlotByName(scan.getOutput(), Column.IVM_MOCK_BINLOG_OPERATION_COL);
        return new If(
                new EqualTo(opSlot, new TinyIntLiteral((byte) 0)),
                new TinyIntLiteral((byte) 1),
                new TinyIntLiteral((byte) -1));
    }

    IvmDeltaRewriteResult rewriteProject(LogicalProject<? extends Plan> project,
            IvmDeltaRewriteVisitor visitor, IvmRefreshContext ctx) {
        IvmDeltaRewriteResult childResult = project.child().accept(visitor, ctx);
        if (childResult.terminal) {
            return childResult;
        }
        if (childResult.dmlFactorSlot == null) {
            LogicalProject<?> newProject = project.withProjectsAndChild(project.getProjects(), childResult.plan);
            return new IvmDeltaRewriteResult(newProject, null);
        }
        return propagateDmlFactorThroughProject(project, childResult);
    }

    /**
     * Appends (or reuses) dml_factor in the project's output list.
     */
    IvmDeltaRewriteResult propagateDmlFactorThroughProject(
            LogicalProject<? extends Plan> project, IvmDeltaRewriteResult childResult) {
        for (int i = 0; i < project.getProjects().size(); i++) {
            NamedExpression expr = project.getProjects().get(i);
            if (Column.IVM_DML_FACTOR_COL.equals(expr.getName())) {
                LogicalProject<?> newProject = project.withProjectsAndChild(
                        project.getProjects(), childResult.plan);
                return new IvmDeltaRewriteResult(newProject, newProject.getOutput().get(i));
            }
        }
        ImmutableList.Builder<NamedExpression> newOutputs = ImmutableList.builderWithExpectedSize(
                project.getProjects().size() + 1);
        newOutputs.addAll(project.getProjects());
        newOutputs.add(childResult.dmlFactorSlot);
        LogicalProject<?> newProject = project.withProjectsAndChild(newOutputs.build(), childResult.plan);
        return new IvmDeltaRewriteResult(newProject, childResult.dmlFactorSlot);
    }

    IvmDeltaRewriteResult rewriteFilter(LogicalFilter<? extends Plan> filter,
            IvmDeltaRewriteVisitor visitor, IvmRefreshContext ctx) {
        IvmDeltaRewriteResult childResult = filter.child().accept(visitor, ctx);
        Plan newFilter = filter.withChildren(ImmutableList.of(childResult.plan));
        return new IvmDeltaRewriteResult(newFilter, childResult.dmlFactorSlot);
    }

    IvmDeltaRewriteResult rewriteJoin(LogicalJoin<? extends Plan, ? extends Plan> join,
            IvmDeltaRewriteVisitor visitor, IvmRefreshContext ctx) {
        JoinType joinType = join.getJoinType();
        if (joinType != JoinType.INNER_JOIN && joinType != JoinType.CROSS_JOIN) {
            throw new AnalysisException(
                    "IVM delta rewrite does not support join type: " + joinType);
        }
        if (join.isMarkJoin()) {
            throw new AnalysisException(
                    "IVM delta rewrite does not support mark join (subquery with disjunction).");
        }

        IvmDeltaRewriteResult leftResult = join.left().accept(visitor, ctx);
        IvmDeltaRewriteResult rightResult = join.right().accept(visitor, ctx);

        if (leftResult.dmlFactorSlot != null && rightResult.dmlFactorSlot != null) {
            throw new AnalysisException(
                    "IVM: both sides of join have dml_factor ��� expected at most one delta side");
        }

        LogicalJoin<Plan, Plan> newJoin = (LogicalJoin<Plan, Plan>) join.withChildren(
                leftResult.plan, rightResult.plan);

        if (leftResult.dmlFactorSlot == null && rightResult.dmlFactorSlot == null) {
            return new IvmDeltaRewriteResult(newJoin, null);
        } else {
            return helper.addNonDetGuardForJoinDelta(new JoinAdapter(newJoin), leftResult, rightResult, ctx);
        }
    }

    IvmDeltaRewriteResult rewriteUnion(LogicalUnion union, IvmDeltaRewriteVisitor visitor, IvmRefreshContext ctx) {
        List<IvmDeltaRewriteResult> childResults = new ArrayList<>();
        for (Plan child : union.children()) {
            childResults.add(child.accept(visitor, ctx));
        }

        int deltaIdx = -1;
        for (int i = 0; i < childResults.size(); i++) {
            if (childResults.get(i).dmlFactorSlot != null) {
                if (deltaIdx != -1) {
                    throw new AnalysisException(
                            "IVM: multiple UNION ALL arms have dml_factor ��� expected at most one delta arm");
                }
                deltaIdx = i;
            }
        }

        if (deltaIdx == -1) {
            ImmutableList.Builder<Plan> newChildren = ImmutableList.builder();
            for (IvmDeltaRewriteResult result : childResults) {
                newChildren.add(result.plan);
            }
            Plan newUnion = union.withChildren(newChildren.build());
            return new IvmDeltaRewriteResult(newUnion, null);
        }

        IvmDeltaRewriteResult deltaChild = childResults.get(deltaIdx);
        List<SlotReference> childMapping = union.getRegularChildrenOutputs().get(deltaIdx);
        List<NamedExpression> unionOutputs = union.getOutputs();

        ImmutableList.Builder<NamedExpression> projections = ImmutableList.builder();
        for (int j = 0; j < unionOutputs.size(); j++) {
            NamedExpression unionOut = unionOutputs.get(j);
            SlotReference childSlot = childMapping.get(j);
            projections.add(new Alias(unionOut.getExprId(), childSlot, unionOut.getName()));
        }
        projections.add(deltaChild.dmlFactorSlot);

        LogicalProject<Plan> mappedProject = new LogicalProject<>(projections.build(), deltaChild.plan);
        Slot newDmlFactor = mappedProject.getOutput().get(mappedProject.getOutput().size() - 1);
        return new IvmDeltaRewriteResult(mappedProject, newDmlFactor);
    }

    private static class JoinAdapter implements IvmDeltaRewriteHelper.JoinPlanView {
        private final LogicalJoin<Plan, Plan> join;

        private JoinAdapter(LogicalJoin<Plan, Plan> join) {
            this.join = join;
        }

        @Override
        public Plan plan() {
            return join;
        }

        @Override
        public Plan left() {
            return join.left();
        }

        @Override
        public Plan right() {
            return join.right();
        }

        @Override
        public JoinType joinType() {
            return join.getJoinType();
        }
    }
}