IvmNormalizeMtmvPlan.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.ivm.IvmContext;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.scalar.MurmurHash364;
import org.apache.doris.nereids.trees.expressions.functions.scalar.UuidNumeric;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.types.LargeIntType;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.stream.Collectors;

/**
 * Normalizes the MV define plan for IVM at both CREATE MV and REFRESH MV time.
 * - Injects __DORIS_IVM_ROW_ID_COL__ at index 0 of each OlapScan output via a wrapping LogicalProject:
 *   - MOW (UNIQUE_KEYS + merge-on-write): Alias(cast(murmur_hash3_64(uk...) as LargeInt),
 *     "__DORIS_IVM_ROW_ID_COL__")
 *     → deterministic (stable across refreshes)
 *   - DUP_KEYS: Alias(uuid_numeric(), "__DORIS_IVM_ROW_ID_COL__") → non-deterministic (random per insert)
 *   - Other key types: not supported, throws.
 * - Records (rowIdSlot → isDeterministic) in IvmContext on CascadesContext.
 * - visitLogicalProject propagates child's row-id slot if not already in outputs.
 * - visitLogicalFilter recurses into the child and preserves filter predicates/output shape.
 * - visitLogicalResultSink recurses into the child and prepends the row-id to output exprs.
 * - Whitelists supported plan nodes; throws AnalysisException for unsupported nodes.
 * Supported: OlapScan, filter, project, result sink, unbound table sink.
 * TODO: avg rewrite, join support.
 */
public class IvmNormalizeMtmvPlan extends DefaultPlanRewriter<IvmContext> implements CustomRewriter {

    @Override
    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
        ConnectContext connectContext = jobContext.getCascadesContext().getConnectContext();
        if (connectContext == null || !connectContext.getSessionVariable().isEnableIvmNormalRewrite()) {
            return plan;
        }
        IvmContext ivmContext = new IvmContext();
        jobContext.getCascadesContext().setIvmContext(ivmContext);
        Plan result = plan.accept(this, ivmContext);
        ivmContext.setNormalizedPlan(result);
        return result;
    }

    // unsupported: any plan node not explicitly whitelisted below
    @Override
    public Plan visit(Plan plan, IvmContext ivmContext) {
        throw new AnalysisException("IVM does not support plan node: "
                + plan.getClass().getSimpleName());
    }

    // whitelisted: only OlapScan — inject IVM row-id at index 0
    @Override
    public Plan visitLogicalOlapScan(LogicalOlapScan scan, IvmContext ivmContext) {
        OlapTable table = scan.getTable();
        Pair<Expression, Boolean> rowId = buildRowId(table, scan);
        Alias rowIdAlias = new Alias(rowId.first, Column.IVM_ROW_ID_COL);
        ivmContext.addRowId(rowIdAlias.toSlot(), rowId.second);
        List<NamedExpression> outputs = ImmutableList.<NamedExpression>builder()
                .add(rowIdAlias)
                .addAll(scan.getOutput())
                .build();
        return new LogicalProject<>(outputs, scan);
    }

    // whitelisted: project — recurse into child, then propagate row-id if not already present
    @Override
    public Plan visitLogicalProject(LogicalProject<? extends Plan> project, IvmContext ivmContext) {
        Plan newChild = project.child().accept(this, ivmContext);
        if (hasRowIdInOutputs(project.getProjects())) {
            return newChild == project.child() ? project : project.withChildren(ImmutableList.of(newChild));
        }
        List<NamedExpression> newOutputs = prependRowId(newChild, project.getProjects());
        return new LogicalProject<>(newOutputs, newChild);
    }

    @Override
    public Plan visitLogicalFilter(LogicalFilter<? extends Plan> filter, IvmContext ivmContext) {
        Plan newChild = filter.child().accept(this, ivmContext);
        return newChild == filter.child() ? filter : filter.withChildren(ImmutableList.of(newChild));
    }

    // whitelisted: result sink — recurse into child, then prepend row-id to output exprs
    @Override
    public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> sink, IvmContext ivmContext) {
        Plan newChild = sink.child().accept(this, ivmContext);
        if (hasRowIdInOutputs(sink.getOutputExprs())) {
            return newChild == sink.child() ? sink : sink.withChildren(ImmutableList.of(newChild));
        }
        List<NamedExpression> newOutputs = prependRowId(newChild, sink.getOutputExprs());
        return sink.withOutputExprs(newOutputs).withChildren(ImmutableList.of(newChild));
    }

    @Override
    public Plan visitUnboundTableSink(UnboundTableSink<? extends Plan> sink, IvmContext ivmContext) {
        Plan newChild = sink.child().accept(this, ivmContext);
        return newChild == sink.child() ? sink : sink.withChildren(ImmutableList.of(newChild));
    }

    private boolean hasRowIdInOutputs(List<NamedExpression> outputs) {
        return outputs.stream()
                .anyMatch(e -> e instanceof Slot && Column.IVM_ROW_ID_COL.equals(((Slot) e).getName()));
    }

    private List<NamedExpression> prependRowId(Plan normalizedChild, List<NamedExpression> outputs) {
        Slot rowId = normalizedChild.getOutput().stream()
                .filter(s -> Column.IVM_ROW_ID_COL.equals(s.getName()))
                .findFirst()
                .orElseThrow(() -> new AnalysisException(
                        "IVM normalization error: child plan has no row-id slot after normalization"));
        return ImmutableList.<NamedExpression>builder()
                .add(rowId)
                .addAll(outputs)
                .build();
    }

    /**
     * Builds the row-id expression and returns whether it is deterministic as a pair.
     * - MOW: (buildRowIdHash(uk...), true)  — stable across refreshes
     * - DUP_KEYS: (UuidNumeric(), false)    — random per insert
     * - Other key types: throws AnalysisException
     */
    private Pair<Expression, Boolean> buildRowId(OlapTable table, LogicalOlapScan scan) {
        KeysType keysType = table.getKeysType();
        if (keysType == KeysType.UNIQUE_KEYS && table.getEnableUniqueKeyMergeOnWrite()) {
            List<String> keyColNames = table.getBaseSchemaKeyColumns().stream()
                    .map(Column::getName)
                    .collect(Collectors.toList());
            List<Expression> keySlots = scan.getOutput().stream()
                    .filter(s -> keyColNames.contains(s.getName()))
                    .collect(Collectors.toList());
            if (keySlots.isEmpty()) {
                throw new AnalysisException("IVM: no unique key columns found for MOW table: "
                        + table.getName());
            }
            return Pair.of(buildRowIdHash(keySlots), true);
        }
        if (keysType == KeysType.DUP_KEYS) {
            return Pair.of(new UuidNumeric(), false);
        }
        throw new AnalysisException("IVM does not support table key type: " + keysType
                + " for table: " + table.getName()
                + ". Only MOW (UNIQUE_KEYS with merge-on-write) and DUP_KEYS are supported.");
    }

    /**
     * Builds a hash expression over the given key slots for use as a deterministic row-id.
     * Currently uses murmur_hash3_64 (64-bit) which is not collision-safe for large tables.
     * TODO: replace with a 128-bit hash once BE supports it or a Java UDF is available.
     */
    private Expression buildRowIdHash(List<Expression> keySlots) {
        Expression first = keySlots.get(0);
        Expression[] rest = keySlots.subList(1, keySlots.size()).toArray(new Expression[0]);
        return new Cast(new MurmurHash364(first, rest), LargeIntType.INSTANCE);
    }
}