IvmAggDeltaStrategy.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv.ivm;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.mtmv.ivm.IvmAggMeta.AggTarget;
import org.apache.doris.mtmv.ivm.IvmAggMeta.AggType;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.exploration.join.JoinReorderContext;
import org.apache.doris.nereids.trees.expressions.Add;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.CaseWhen;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Divide;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.GreaterThan;
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.Subtract;
import org.apache.doris.nereids.trees.expressions.WhenClause;
import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
import org.apache.doris.nereids.trees.expressions.functions.scalar.AssertTrue;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Coalesce;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Greatest;
import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Least;
import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.types.DataType;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * Aggregate delta rewrite strategy for IVM.
 *
 * <p>Handles single-table aggregate MVs with count/sum/avg/min/max.
 * Min/max use an assert_true guard: if a deleted row matches the current extreme,
 * execution fails and IvmRefreshManager falls back to COMPLETE refresh.
 *
 * <h3>Overall flow</h3>
 * <ol>
 *   <li><b>Delta sub-plan</b>: transforms the normalized aggregate into a signed delta aggregate
 *       where each output is weighted by {@code dml_factor} (+1 for inserts, -1 for deletes).</li>
 *   <li><b>Apply plan</b>: LEFT JOINs the delta against the MV's current state on {@code row_id},
 *       computes new hidden states (COALESCE(old,0) + delta), derives visible values, and
 *       determines the {@code __DORIS_DELETE_SIGN__}.</li>
 *   <li><b>Insert command</b>: wraps the result in an {@code InsertIntoTableCommand} that writes
 *       back to the MV via MOW upsert semantics.</li>
 * </ol>
 *
 * <h3>Visitor integration</h3>
 * <p>Inherits from {@link IvmLinearDeltaStrategy} and overrides:
 * <ul>
 *   <li>{@code visitLogicalProject}: skips the normalize top-project when its child is an aggregate,
 *       since the aggregate visitor produces a complete replacement plan.</li>
 *   <li>{@code visitLogicalAggregate}: main entry point that builds delta + apply + insert.</li>
 * </ul>
 *
 * @see IvmLinearDeltaStrategy
 */
public class IvmAggDeltaStrategy extends IvmLinearDeltaStrategy {

    /** Transient semantic key for MIN of deleted values (not stored in MV). */
    private static final String DELMIN = "DELMIN";
    /** Transient semantic key for MAX of deleted values (not stored in MV). */
    private static final String DELMAX = "DELMAX";

    /** Set via constructor, used by visitor methods. Single-use: create a fresh instance per rewrite. */

    public IvmAggDeltaStrategy(IvmDeltaRewriteContext ctx) {
        super(ctx);
    }

    /**
     * Intermediate result from {@link #buildDeltaSubPlan}.
     * Carries the delta aggregate project plus slot mappings needed by {@link #buildApplyPlan}.
     */
    private static final class DeltaPlanParts {
        /** Top project above the delta aggregate: [row_id, group_keys, delta_agg_outputs...] */
        private final LogicalProject<?> topDeltaProject;
        /** Row-id slot from the top project (hash of group keys, or 0 for scalar). */
        private final Slot rowIdSlot;
        /**
         * Semantic slot map keyed by "{ordinal}:{stateType}" (e.g. "2:SUM").
         * Maps each per-target delta output to the corresponding slot in topDeltaProject.
         * Also contains the delta_group_count under key {@link Column#IVM_DELTA_GROUP_COUNT_COL}.
         */
        private final Map<String, Slot> semanticSlots;
        /** Group key slots resolved from topDeltaProject output, keyed by column name. */
        private final Map<String, Slot> groupKeySlotsByName;

        private DeltaPlanParts(LogicalProject<?> topDeltaProject, Slot rowIdSlot,
                Map<String, Slot> semanticSlots, Map<String, Slot> groupKeySlotsByName) {
            this.topDeltaProject = topDeltaProject;
            this.rowIdSlot = rowIdSlot;
            this.semanticSlots = semanticSlots;
            this.groupKeySlotsByName = groupKeySlotsByName;
        }
    }

    @Override
    public List<Command> rewrite(Plan normalizedPlan) {
        RewriteResult result = rewritePlan(normalizedPlan);
        Command insertCommand = buildInsertCommandWithDeleteSign(result.plan);
        return ImmutableList.of(insertCommand);
    }

    /**
     * When the normalize top project sits above an aggregate, skip it entirely ���
     * the aggregate visitor produces the complete apply plan.
     */
    /**
     * Handles projects in the agg plan tree.
     *
     * <p>There are four roles a project can play in the agg normalized plan:
     * <ol>
     *   <li><b>Directly above Agg</b>: The normalize-added project that maps user-facing
     *       and IVM hidden columns. Skip it and dispatch directly to the aggregate visitor,
     *       which builds a complete apply plan (including DELETE_SIGN).</li>
     *   <li><b>Above the agg path</b> (e.g., an extra top-level project): The child subtree
     *       eventually reaches the aggregate visitor and returns a terminal apply plan
     *       ({@code isTerminal == true}). We must return that result as-is ��� wrapping it
     *       in this project's original projections would drop the DELETE_SIGN column.</li>
     *   <li><b>Below the Agg, snapshot side</b> (e.g., in a join subtree, snapshot child):
     *       {@code dmlFactorSlot == null} but NOT terminal. Preserve the normalize-added
     *       project (which carries row_id and other hidden columns) by wrapping like the
     *       parent class does.</li>
     *   <li><b>Below the Agg, delta side</b>: The child has a non-null dmlFactorSlot.
     *       Propagate it through the project using the parent's helper.</li>
     * </ol>
     */
    @Override
    public RewriteResult visitLogicalProject(LogicalProject<? extends Plan> project, Void context) {
        if (project.child() instanceof LogicalAggregate) {
            return project.child().accept(this, context);
        }
        RewriteResult childResult = project.child().accept(this, context);
        if (childResult.isTerminal) {
            // Terminal apply plan from the agg visitor ��� return as-is.
            return childResult;
        }
        if (childResult.dmlFactorSlot == null) {
            // Snapshot-side project (below agg, e.g. in a join subtree):
            // preserve normalize-added hidden columns such as row_id.
            LogicalProject<?> newProject = project.withProjectsAndChild(
                    project.getProjects(), childResult.plan);
            return new RewriteResult(newProject, null);
        }
        // Below the agg: propagate dml_factor through this project.
        return propagateDmlFactorThroughProject(project, childResult);
    }

    /**
     * Core entry point: builds the entire agg delta + apply plan.
     *
     * <p>Steps:
     * 1. Validates normalize result and agg metadata exist.
     * 2. Walks the aggregate's child subtree to inject dml_factor (via super's visitor).
     * 3. Builds the delta sub-plan (signed aggregate).
     * 4. Builds the apply plan (LEFT JOIN + state merge + visible derivation).
     * 5. Returns RewriteResult with null dmlFactorSlot (apply plan is terminal).
     */
    @Override
    public RewriteResult visitLogicalAggregate(LogicalAggregate<? extends Plan> agg, Void context) {
        IvmNormalizeResult normalizeResult = ctx.getNormalizeResult();
        if (normalizeResult == null) {
            throw new AnalysisException("IVM agg delta rewrite requires normalize result");
        }
        IvmAggMeta aggMeta = normalizeResult.getAggMeta();
        if (aggMeta == null) {
            throw new AnalysisException("IVM agg delta rewrite requires agg metadata");
        }

        // Walk agg child to inject dml_factor
        RewriteResult childResult = agg.child().accept(this, context);

        DeltaPlanParts delta = buildDeltaSubPlan(agg, childResult, aggMeta);
        LogicalProject<?> applyProject = buildApplyPlan(delta, aggMeta, ctx);
        return new RewriteResult(applyProject, null, true);
    }

    /**
     * Builds the delta sub-plan: a signed aggregate over the base table's changes.
     *
     * <p>Input shape (from normalize):
     * <pre>
     *   Aggregate(normalized) ��� child subtree (with dml_factor injected)
     * </pre>
     *
     * <p>Output shape:
     * <pre>
     *   Project(row_id, group_keys, coalesced delta outputs...)
     *     ��������� Aggregate(delta: SUM(signed_expr), SUM(case_when_not_null), ...)
     *           ��������� child subtree with dml_factor
     * </pre>
     *
     * <p>The delta aggregate replaces each original agg function with signed delta expressions:
     * <ul>
     *   <li>COUNT(*): delta = SUM(dml_factor)</li>
     *   <li>COUNT(expr): delta = SUM(IF(expr IS NULL, 0, dml_factor))</li>
     *   <li>SUM(expr): delta_sum = SUM(IF(dml_factor > 0, expr, -expr)),
     *                  delta_count = SUM(IF(expr IS NULL, 0, dml_factor))</li>
     *   <li>AVG(expr): same as SUM (visible value derived later from hidden sum/count)</li>
     * </ul>
     *
     * <p>A top project wraps the aggregate to:
     * 1. Compute row_id (hash of group keys for grouped, 0 for scalar).
     * 2. Apply COALESCE to NULL-susceptible outputs (SUM may return NULL for all-NULL groups).
     */
    private DeltaPlanParts buildDeltaSubPlan(LogicalAggregate<?> normalizedAgg,
            RewriteResult childResult, IvmAggMeta aggMeta) {
        Plan newAggChild = childResult.plan;
        Slot dmlFactorSlot = childResult.dmlFactorSlot;

        List<NamedExpression> deltaAggOutputs = new ArrayList<>();
        int groupKeySize = aggMeta.getGroupKeySlots().size();
        for (Expression groupByExpr : normalizedAgg.getGroupByExpressions()) {
            if (!(groupByExpr instanceof NamedExpression)) {
                throw new AnalysisException("IVM agg delta rewrite requires slot-like group key, but got: "
                        + groupByExpr);
            }
            deltaAggOutputs.add((NamedExpression) groupByExpr);
        }

        Alias deltaGroupCount = new Alias(new Sum(dmlFactorSlot), Column.IVM_DELTA_GROUP_COUNT_COL);
        deltaAggOutputs.add(deltaGroupCount);

        for (AggTarget target : aggMeta.getAggTargets()) {
            switch (target.getAggType()) {
                case COUNT:
                    if (!target.isCountStar()) {
                        deltaAggOutputs.add(new Alias(
                                new Sum(ifExprNotNull(target.getExprArgs().get(0), dmlFactorSlot)),
                                target.stateColumnName(AggType.COUNT)));
                    }
                    break;
                case SUM:
                case AVG:
                    deltaAggOutputs.add(new Alias(
                            new Sum(signedExpr(target.getExprArgs().get(0), dmlFactorSlot)),
                            target.stateColumnName(AggType.SUM)));
                    deltaAggOutputs.add(new Alias(
                            new Sum(ifExprNotNull(target.getExprArgs().get(0), dmlFactorSlot)),
                            target.stateColumnName(AggType.COUNT)));
                    break;
                case MIN:
                case MAX:
                    buildExtremalDeltaOutputs(deltaAggOutputs, target, dmlFactorSlot);
                    break;
                default:
                    throw new AnalysisException("IVM agg delta rewrite does not support aggregate type: "
                            + target.getAggType());
            }
        }

        LogicalAggregate<?> deltaAgg = normalizedAgg.withAggOutputChild(deltaAggOutputs, newAggChild);
        List<NamedExpression> topOutputs = new ArrayList<>();
        Alias rowIdAlias = new Alias(
                IvmUtil.buildRowIdHash(deltaAgg.getOutput().subList(0, groupKeySize)), Column.IVM_ROW_ID_COL);
        topOutputs.add(rowIdAlias);

        for (Slot slot : deltaAgg.getOutput()) {
            if (needsCoalesceInTopProject(slot, aggMeta)) {
                topOutputs.add(new Alias(new Coalesce(slot, zeroOf(slot.getDataType())), slot.getName()));
            } else {
                topOutputs.add(slot);
            }
        }

        LogicalProject<?> topDeltaProject = new LogicalProject<>(ImmutableList.copyOf(topOutputs), deltaAgg);
        Map<String, Slot> outputByName = indexSlotsByName(topDeltaProject.getOutput());
        Map<String, Slot> semanticSlots = new LinkedHashMap<>();
        semanticSlots.put(Column.IVM_DELTA_GROUP_COUNT_COL,
                outputByName.get(Column.IVM_DELTA_GROUP_COUNT_COL));
        for (AggTarget target : aggMeta.getAggTargets()) {
            switch (target.getAggType()) {
                case COUNT:
                    if (target.isCountStar()) {
                        semanticSlots.put(hiddenKey(target, AggType.COUNT),
                                outputByName.get(Column.IVM_DELTA_GROUP_COUNT_COL));
                    } else {
                        semanticSlots.put(hiddenKey(target, AggType.COUNT),
                                outputByName.get(target.stateColumnName(AggType.COUNT)));
                    }
                    break;
                case SUM:
                case AVG:
                    semanticSlots.put(hiddenKey(target, AggType.SUM),
                            outputByName.get(target.stateColumnName(AggType.SUM)));
                    semanticSlots.put(hiddenKey(target, AggType.COUNT),
                            outputByName.get(target.stateColumnName(AggType.COUNT)));
                    break;
                case MIN:
                case MAX:
                    putExtremalSemanticSlots(semanticSlots, outputByName, target);
                    break;
                default:
                    throw new AnalysisException("IVM agg delta rewrite does not support aggregate type: "
                            + target.getAggType());
            }
        }

        Map<String, Slot> groupKeySlotsByName = new LinkedHashMap<>();
        for (Slot groupKey : aggMeta.getGroupKeySlots()) {
            Slot resolved = outputByName.get(groupKey.getName());
            if (resolved == null) {
                throw new AnalysisException("IVM agg delta rewrite failed to resolve delta group key slot: "
                        + groupKey.getName());
            }
            groupKeySlotsByName.put(groupKey.getName(), resolved);
        }

        return new DeltaPlanParts(topDeltaProject, outputByName.get(Column.IVM_ROW_ID_COL),
                semanticSlots, groupKeySlotsByName);
    }

    /**
     * Builds the apply plan: merges delta into MV current state.
     *
     * <p>Plan shape:
     * <pre>
     *   Project(final sink output: [inserted_cols..., __DORIS_DELETE_SIGN__])
     *     ��������� Filter(net-zero)            // grouped agg only
     *         ��������� RightOuterJoin(mv.row_id = delta.row_id)
     *             ��������� MV current-state scan (with delete-sign filter)  [large, probe side]
     *             ��������� delta sub-plan                                   [small, build side]
     * </pre>
     *
     * <p>For each column in the MV's inserted column list, computes:
     * <ul>
     *   <li>row_id: from delta side</li>
     *   <li>group keys: from delta side</li>
     *   <li>hidden state: COALESCE(mv_old, 0) + delta (with assert_true for non-negative counts)</li>
     *   <li>visible value: derived from new hidden state (see {@link #buildTargetExpressions})</li>
     * </ul>
     *
     * <p>Delete sign: grouped agg uses IF(new_group_count <= 0, 1, 0);
     * scalar agg always 0 (single row never deleted).
     *
     * <p>Net-zero filter (grouped only): NOT(mv.row_id IS NULL AND delta_group_count <= 0)
     * prevents inserting delete-sign rows for groups that never existed in the MV.
     */
    private LogicalProject<?> buildApplyPlan(DeltaPlanParts delta, IvmAggMeta aggMeta, IvmDeltaRewriteContext ctx) {
        LogicalOlapScan rawMvScan = buildMvScan(ctx.getMtmv(), ctx);
        LogicalPlan mvPlan = BindRelation.checkAndAddDeleteSignFilter(
                rawMvScan, ctx.getConnectContext(), ctx.getMtmv());
        Slot mvRowId = findSlotByName(rawMvScan.getOutput(), Column.IVM_ROW_ID_COL);
        // MV (large) on left as probe side, delta (small) on right as build side.
        LogicalJoin<Plan, Plan> join = new LogicalJoin<>(JoinType.RIGHT_OUTER_JOIN,
                ImmutableList.of(new EqualTo(mvRowId, delta.rowIdSlot)),
                mvPlan, delta.topDeltaProject, JoinReorderContext.EMPTY);
        Plan joinInput = aggMeta.isScalarAgg() ? join : buildNetZeroFilter(join, delta, mvRowId);

        Map<String, Expression> finalByColumnName = new LinkedHashMap<>();
        Expression newGroupCount = assertNonNegative(
                new Add(coalesceMvSlot(rawMvScan, aggMeta.getGroupCountSlot().getName()), deltaGroupCount(delta)),
                "negative group count");
        finalByColumnName.put(Column.IVM_ROW_ID_COL, delta.rowIdSlot);
        finalByColumnName.put(aggMeta.getGroupCountSlot().getName(), newGroupCount);
        for (Slot groupKey : aggMeta.getGroupKeySlots()) {
            finalByColumnName.put(groupKey.getName(), deltaGroupKey(delta, groupKey.getName()));
        }

        for (AggTarget target : aggMeta.getAggTargets()) {
            buildTargetExpressions(finalByColumnName, rawMvScan, delta, target, newGroupCount);
        }

        Expression deleteSign = aggMeta.isScalarAgg()
                ? new TinyIntLiteral((byte) 0)
                : new If(new LessThanEqual(newGroupCount, new BigIntLiteral(0)),
                        new TinyIntLiteral((byte) 1), new TinyIntLiteral((byte) 0));

        List<NamedExpression> finalOutputs = new ArrayList<>();
        for (String columnName : ctx.getMtmv().getInsertedColumnNames()) {
            Expression expr = finalByColumnName.get(columnName);
            if (expr == null) {
                throw new AnalysisException("IVM agg delta rewrite missing sink expression for column: " + columnName);
            }
            finalOutputs.add(aliasIfNeeded(expr, columnName));
        }
        finalOutputs.add(new Alias(deleteSign, Column.DELETE_SIGN));
        return new LogicalProject<>(ImmutableList.copyOf(finalOutputs), joinInput);
    }

    /**
     * Computes new hidden state and visible value for one aggregate target.
     *
     * <p>State merging formula: {@code new_X = COALESCE(mv_old_X, 0) + delta_X}
     *
     * <p>Visible value derivation per type:
     * <ul>
     *   <li>COUNT(*): new_group_count (cast if needed) ��� no hidden columns</li>
     *   <li>COUNT(expr): IF(new_count > 0, new_count, 0) ��� no hidden columns,
     *       old count read from visible column</li>
     *   <li>SUM(expr): IF(new_count > 0, new_sum, NULL) ��� no hidden SUM,
     *       old sum read from visible column; hidden COUNT persisted</li>
     *   <li>AVG(expr): IF(new_count > 0, CAST(new_sum / new_count AS visible_type), NULL)
     *       ��� hidden SUM + COUNT persisted</li>
     * </ul>
     *
     * <p>Count values are wrapped with {@link #assertNonNegative} to catch data corruption.
     */
    private void buildTargetExpressions(Map<String, Expression> finalByColumnName, LogicalOlapScan rawMvScan,
            DeltaPlanParts delta, AggTarget target, Expression newGroupCount) {
        switch (target.getAggType()) {
            case COUNT: {
                if (target.isCountStar()) {
                    // No hidden columns. Visible value equals the global group count.
                    finalByColumnName.put(target.getVisibleSlot().getName(),
                            castIfNeeded(newGroupCount, target.getVisibleSlot().getDataType()));
                } else {
                    // No hidden columns. Old count read from visible column.
                    Expression newCount = assertNonNegative(new Add(
                            coalesceMvSlot(rawMvScan, target.getVisibleSlot().getName()),
                            delta.semanticSlots.get(hiddenKey(target, AggType.COUNT))),
                            "negative count for " + target.getVisibleSlot().getName());
                    finalByColumnName.put(target.getVisibleSlot().getName(),
                            new If(isPositive(newCount),
                                    castIfNeeded(newCount, target.getVisibleSlot().getDataType()),
                                    zeroOf(target.getVisibleSlot().getDataType())));
                }
                return;
            }
            case SUM: {
                // No hidden SUM column. Old sum read from visible column.
                // Hidden COUNT is persisted for assertNonNegative and null-count logic.
                Expression newSum = new Add(
                        coalesceMvSlot(rawMvScan, target.getVisibleSlot().getName()),
                        delta.semanticSlots.get(hiddenKey(target, AggType.SUM)));
                Expression newCount = buildNewCount(rawMvScan, delta, target);
                finalByColumnName.put(target.getHiddenStateSlot(AggType.COUNT).getName(), newCount);
                Expression visibleValue = castIfNeeded(newSum, target.getVisibleSlot().getDataType());
                finalByColumnName.put(target.getVisibleSlot().getName(),
                        new If(isPositive(newCount), visibleValue,
                                new NullLiteral(target.getVisibleSlot().getDataType())));
                return;
            }
            case AVG: {
                // Both hidden SUM and COUNT are persisted (visible is AVG ��� SUM or COUNT).
                Expression newSum = new Add(
                        coalesceMvSlot(rawMvScan, target.getHiddenStateSlot(AggType.SUM).getName()),
                        delta.semanticSlots.get(hiddenKey(target, AggType.SUM)));
                Expression newCount = buildNewCount(rawMvScan, delta, target);
                finalByColumnName.put(target.getHiddenStateSlot(AggType.SUM).getName(), newSum);
                finalByColumnName.put(target.getHiddenStateSlot(AggType.COUNT).getName(), newCount);
                Expression divisor = castIfNeeded(newCount, newSum.getDataType());
                Expression visibleValue = castIfNeeded(new Divide(newSum, divisor),
                        target.getVisibleSlot().getDataType());
                finalByColumnName.put(target.getVisibleSlot().getName(),
                        new If(isPositive(newCount), visibleValue,
                                new NullLiteral(target.getVisibleSlot().getDataType())));
                return;
            }
            case MIN:
            case MAX:
                buildExtremalTargetExpressions(finalByColumnName, rawMvScan, delta, target);
                return;
            default:
                throw new AnalysisException("IVM agg delta rewrite does not support aggregate type: "
                        + target.getAggType());
        }
    }

    private LogicalFilter<Plan> buildNetZeroFilter(LogicalJoin<Plan, Plan> join, DeltaPlanParts delta, Slot mvRowId) {
        Expression filter = new Not(new And(new IsNull(mvRowId),
                new LessThanEqual(deltaGroupCount(delta), new BigIntLiteral(0))));
        return new LogicalFilter<>(ImmutableSet.of(filter), join);
    }

    private LogicalOlapScan buildMvScan(MTMV mtmv, IvmDeltaRewriteContext ctx) {
        return new LogicalOlapScan(
                ctx.getConnectContext().getStatementContext().getNextRelationId(),
                mtmv,
                ImmutableList.of(mtmv.getQualifiedDbName()),
                ImmutableList.of(),
                mtmv.getPartitionIds(),
                mtmv.getBaseIndexId(),
                PreAggStatus.unset(),
                ImmutableList.of(),
                ImmutableList.of(),
                Optional.empty(),
                ImmutableList.of());
    }

    /**
     * Signs an expression by dml_factor: positive factor ��� expr, negative ��� -expr.
     * Uses conditional branch (not multiplication) to avoid TinyInt �� Decimal precision loss.
     */
    private Expression signedExpr(Expression expr, Slot dmlFactorSlot) {
        return new If(new GreaterThan(dmlFactorSlot, new TinyIntLiteral((byte) 0)),
                expr, new Subtract(zeroOf(expr.getDataType()), expr));
    }

    /**
     * Produces a NULL-aware count contribution:
     * IF(expr IS NULL, 0, dml_factor).
     * Used for COUNT(expr) and hidden count of SUM/AVG targets.
     */
    private Expression ifExprNotNull(Expression expr, Slot dmlFactorSlot) {
        return new If(new IsNull(expr), new TinyIntLiteral((byte) 0), dmlFactorSlot);
    }

    /**
     * Wraps expr in assert_true(expr >= 0, message); throws at runtime if violated.
     *
     * <p>IMPORTANT: the false branch must differ from the true branch to prevent
     * {@code FoldConstantRuleOnFE.visitIf} from collapsing {@code IF(cond, x, x)} into {@code x},
     * which would silently discard the assert_true guard. Since assert_true either returns TRUE
     * (condition satisfied) or throws (condition violated), the false branch is unreachable ���
     * we use a NullLiteral as a distinct, never-reached placeholder.
     */
    private Expression assertNonNegative(Expression expr, String message) {
        return new If(new AssertTrue(new GreaterThanEqual(expr,
                new BigIntLiteral(0)), new StringLiteral(message)),
                expr, new NullLiteral(expr.getDataType()));
    }

    /** Predicate: expr > 0. Used to guard visible value derivation. */
    private Expression isPositive(Expression expr) {
        return new GreaterThan(expr, new BigIntLiteral(0));
    }

    /** Looks up the delta_group_count slot from delta plan parts. */
    private Expression deltaGroupCount(DeltaPlanParts delta) {
        return delta.semanticSlots.get(Column.IVM_DELTA_GROUP_COUNT_COL);
    }

    private Expression deltaGroupKey(DeltaPlanParts delta, String name) {
        Slot slot = delta.groupKeySlotsByName.get(name);
        if (slot == null) {
            throw new AnalysisException("IVM agg delta rewrite failed to resolve delta group key: " + name);
        }
        return slot;
    }

    /** Reads old MV hidden state with NULL-safe default: COALESCE(mv_slot, 0). */
    private Expression coalesceMvSlot(LogicalOlapScan rawMvScan, String slotName) {
        Slot slot = findSlotByName(rawMvScan.getOutput(), slotName);
        return new Coalesce(slot, zeroOf(slot.getDataType()));
    }

    /**
     * Determines whether a delta aggregate output slot needs COALESCE wrapping.
     *
     * <p>Needed when the SUM might return NULL:
     * <ul>
     *   <li>Scalar agg: all outputs can be NULL when base table is empty</li>
     *   <li>SUM/AVG hidden sum: SUM(signedExpr) returns NULL when all input exprs are NULL</li>
     * </ul>
     */
    private boolean needsCoalesceInTopProject(Slot slot, IvmAggMeta aggMeta) {
        if (aggMeta.isScalarAgg() && Column.IVM_DELTA_GROUP_COUNT_COL.equals(slot.getName())) {
            return true;
        }
        for (AggTarget target : aggMeta.getAggTargets()) {
            if (aggMeta.isScalarAgg()
                    && slot.getName().equals(target.stateColumnName(AggType.COUNT))) {
                return true;
            }
            if ((target.getAggType() == AggType.SUM || target.getAggType() == AggType.AVG)
                    && slot.getName().equals(target.stateColumnName(AggType.SUM))) {
                return true;
            }
            // MIN/MAX visible values and their transient DELMIN/DELMAX slots carry semantic NULLs
            // and must NOT be coalesced ��� fall through to default false.
        }
        return false;
    }

    private Map<String, Slot> indexSlotsByName(List<Slot> slots) {
        Map<String, Slot> slotByName = new LinkedHashMap<>();
        for (Slot slot : slots) {
            slotByName.put(slot.getName(), slot);
        }
        return slotByName;
    }

    /** Semantic key for per-target delta slots: "{ordinal}:{aggType}", e.g. "2:SUM". */
    private String hiddenKey(AggTarget target, AggType aggType) {
        return target.getOrdinal() + ":" + aggType.name();
    }

    /** Semantic key for transient delta slots: "{ordinal}:{suffix}", e.g. "0:DELMIN". */
    private String hiddenKey(AggTarget target, String transientSuffix) {
        return target.getOrdinal() + ":" + transientSuffix;
    }

    /** Produces a zero literal of the given numeric type via checked cast from TinyInt(0). */
    private Expression zeroOf(DataType dataType) {
        return new TinyIntLiteral((byte) 0).checkedCastTo(dataType);
    }

    private Expression castIfNeeded(Expression expr, DataType dataType) {
        return expr.getDataType().equals(dataType) ? expr : new Cast(expr, dataType);
    }

    private NamedExpression aliasIfNeeded(Expression expr, String name) {
        if (expr instanceof NamedExpression && name.equals(((NamedExpression) expr).getName())) {
            return (NamedExpression) expr;
        }
        return new Alias(expr, name);
    }

    /**
     * Expression for the insert-only stream: IF(dml_factor > 0, expr, NULL).
     * Used for MIN/MAX delta aggregates ��� only insert rows contribute to the new extreme.
     */
    private Expression insertOnlyExpr(Expression expr, Slot dmlFactorSlot) {
        return new If(new GreaterThan(dmlFactorSlot, new TinyIntLiteral((byte) 0)),
                expr, new NullLiteral(expr.getDataType()));
    }

    /**
     * Expression for the delete-only stream: IF(dml_factor < 0, expr, NULL).
     * Used as input to MIN/MAX over deleted values, to detect boundary violations.
     */
    private Expression deleteOnlyExpr(Expression expr, Slot dmlFactorSlot) {
        return new If(new LessThan(dmlFactorSlot, new TinyIntLiteral((byte) 0)),
                expr, new NullLiteral(expr.getDataType()));
    }

    /**
     * Transient column name for min/max of deleted values.
     * This column exists only in the delta sub-plan aggregate and is NOT stored in the MV.
     */
    private String transientDelHiddenName(AggTarget target, String suffix) {
        return Column.IVM_HIDDEN_COLUMN_PREFIX + "TRANSIENT_" + target.getOrdinal() + "_" + suffix + "_COL__";
    }

    // ---- Extracted common methods for MIN/MAX and SUM/AVG deduplication ----

    /**
     * Builds delta aggregate outputs for a MIN or MAX target.
     *
     * <p>For MIN, produces: MIN(insertOnly), MIN(deleteOnly), SUM(ifExprNotNull).
     * For MAX, produces: MAX(insertOnly), MAX(deleteOnly), SUM(ifExprNotNull).
     * The insert-only agg computes the new extremal from inserted rows; the delete-only
     * agg captures deleted extremal values for the boundary guard check.
     */
    private void buildExtremalDeltaOutputs(List<NamedExpression> deltaAggOutputs,
            AggTarget target, Slot dmlFactorSlot) {
        boolean isMin = target.getAggType() == AggType.MIN;
        AggType stateType = isMin ? AggType.MIN : AggType.MAX;
        String delKey = isMin ? DELMIN : DELMAX;
        Expression exprArg = target.getExprArgs().get(0);

        Expression insertAgg = isMin
                ? new Min(insertOnlyExpr(exprArg, dmlFactorSlot))
                : new Max(insertOnlyExpr(exprArg, dmlFactorSlot));
        Expression deleteAgg = isMin
                ? new Min(deleteOnlyExpr(exprArg, dmlFactorSlot))
                : new Max(deleteOnlyExpr(exprArg, dmlFactorSlot));

        deltaAggOutputs.add(new Alias(insertAgg,
                IvmUtil.ivmAggHiddenColumnName(target.getOrdinal(), stateType.name())));
        deltaAggOutputs.add(new Alias(deleteAgg, transientDelHiddenName(target, delKey)));
        deltaAggOutputs.add(new Alias(
                new Sum(ifExprNotNull(exprArg, dmlFactorSlot)),
                target.getHiddenStateSlot(AggType.COUNT).getName()));
    }

    /**
     * Puts semantic slot mappings for a MIN or MAX target into the semantic slots map.
     *
     * <p>Maps: stateType (MIN/MAX), delKey (DELMIN/DELMAX), and COUNT.
     */
    private void putExtremalSemanticSlots(Map<String, Slot> semanticSlots,
            Map<String, Slot> outputByName, AggTarget target) {
        boolean isMin = target.getAggType() == AggType.MIN;
        AggType stateType = isMin ? AggType.MIN : AggType.MAX;
        String delKey = isMin ? DELMIN : DELMAX;

        semanticSlots.put(hiddenKey(target, stateType),
                outputByName.get(IvmUtil.ivmAggHiddenColumnName(target.getOrdinal(), stateType.name())));
        semanticSlots.put(hiddenKey(target, delKey),
                outputByName.get(transientDelHiddenName(target, delKey)));
        semanticSlots.put(hiddenKey(target, AggType.COUNT),
                outputByName.get(target.getHiddenStateSlot(AggType.COUNT).getName()));
    }

    /**
     * Computes the new count for a target: assertNonNegative(COALESCE(old, 0) + delta).
     *
     * <p>Only called for targets that have a physical hidden COUNT column
     * (SUM, AVG, MIN, MAX). COUNT targets handle their counts
     * directly in {@link #buildTargetExpressions}.
     */
    private Expression buildNewCount(LogicalOlapScan rawMvScan, DeltaPlanParts delta, AggTarget target) {
        return assertNonNegative(new Add(
                coalesceMvSlot(rawMvScan, target.getHiddenStateSlot(AggType.COUNT).getName()),
                delta.semanticSlots.get(hiddenKey(target, AggType.COUNT))),
                "negative hidden count for " + target.getVisibleSlot().getName());
    }

    /**
     * Builds target expressions for a MIN or MAX aggregate target.
     *
     * <p>The structure is identical for MIN and MAX; only the comparison direction,
     * merge function (LEAST vs GREATEST), and aggregate type differ:
     * <ul>
     *   <li><b>Guard</b>: assert_true(newCount == 0 OR deltaDelExtreme IS NULL
     *       OR oldExtreme IS NULL OR deltaDelExtreme {&gt;|&lt;} oldExtreme) ��� bypassed
     *       when all non-null rows are deleted (count drops to 0), since the result
     *       is NULL regardless; otherwise, if a deleted row matches the current
     *       extreme, incremental computation is impossible and falls back to COMPLETE.</li>
     *   <li><b>Merge</b>: CASE WHEN newCount=0 THEN NULL WHEN old IS NULL THEN deltaInsert
     *       WHEN deltaInsert IS NULL THEN old ELSE {LEAST|GREATEST}(old, deltaInsert).</li>
     *   <li><b>Outputs</b>: visible value (guarded extreme or NULL when count=0), and count.</li>
     * </ul>
     */
    private void buildExtremalTargetExpressions(Map<String, Expression> finalByColumnName,
            LogicalOlapScan rawMvScan, DeltaPlanParts delta, AggTarget target) {
        boolean isMin = target.getAggType() == AggType.MIN;
        AggType stateType = isMin ? AggType.MIN : AggType.MAX;
        String delKey = isMin ? DELMIN : DELMAX;
        String guardMsg = isMin
                ? "IVM: deleted row may be current MIN value, fallback to COMPLETE"
                : "IVM: deleted row may be current MAX value, fallback to COMPLETE";

        Slot oldExtreme = findSlotByName(rawMvScan.getOutput(),
                target.getVisibleSlot().getName());
        Expression deltaInsert = delta.semanticSlots.get(hiddenKey(target, stateType));
        Expression deltaDel = delta.semanticSlots.get(hiddenKey(target, delKey));

        // Compute new non-null count first ��� needed for both guard bypass and visible value.
        Expression newCount = buildNewCount(rawMvScan, delta, target);

        // Guard: when the non-null count drops to 0 (all non-null rows deleted), skip the
        // boundary check because visible will be set to NULL regardless. Otherwise, assert
        // that the deleted extremal value does not match the current extreme.
        // For MIN: deleted value must be > current min (otherwise we lose the min).
        // For MAX: deleted value must be < current max (otherwise we lose the max).
        Expression guardComparison = isMin
                ? new GreaterThan(deltaDel, oldExtreme)
                : new LessThan(deltaDel, oldExtreme);
        Expression guardCond = new Or(ImmutableList.of(
                new EqualTo(newCount, new BigIntLiteral(0L)),
                new IsNull(deltaDel),
                new IsNull(oldExtreme),
                guardComparison
        ));
        Expression guard = new AssertTrue(guardCond, new StringLiteral(guardMsg));

        // Null-safe merge via CASE WHEN with count-zero fallback to NULL:
        Expression mergeFunc = isMin
                ? new Least(oldExtreme, deltaInsert)
                : new Greatest(oldExtreme, deltaInsert);
        Expression newExtremeRaw = new CaseWhen(
                ImmutableList.of(
                        new WhenClause(new EqualTo(newCount, new BigIntLiteral(0L)),
                                new NullLiteral(oldExtreme.getDataType())),
                        new WhenClause(new IsNull(oldExtreme), deltaInsert),
                        new WhenClause(new IsNull(deltaInsert), oldExtreme)
                ),
                mergeFunc
        );
        // Embed guard: false branch uses NullLiteral to prevent IF constant folding.
        // assert_true either returns TRUE (pass) or throws (fail), so false branch is unreachable.
        Expression newExtremeGuarded = new If(guard, newExtremeRaw,
                new NullLiteral(newExtremeRaw.getDataType()));

        // No hidden MIN/MAX column: the visible column stores the extremal value directly.
        // When count drops to 0, visible becomes NULL; next refresh reads NULL as old,
        // which is correctly handled by the merge logic (IF old IS NULL, take deltaInsert).
        finalByColumnName.put(target.getHiddenStateSlot(AggType.COUNT).getName(), newCount);
        finalByColumnName.put(target.getVisibleSlot().getName(),
                new If(isPositive(newCount),
                        castIfNeeded(newExtremeGuarded, target.getVisibleSlot().getDataType()),
                        new NullLiteral(target.getVisibleSlot().getDataType())));
    }
}