IvmNormalizeMtmv.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.info.TableNameInfoUtils;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.ivm.IvmAggMeta;
import org.apache.doris.mtmv.ivm.IvmAggMeta.AggTarget;
import org.apache.doris.mtmv.ivm.IvmAggMeta.AggType;
import org.apache.doris.mtmv.ivm.IvmException;
import org.apache.doris.mtmv.ivm.IvmFailureReason;
import org.apache.doris.mtmv.ivm.IvmNormalizeResult;
import org.apache.doris.mtmv.ivm.IvmUtil;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.Avg;
import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
import org.apache.doris.nereids.trees.expressions.functions.scalar.UuidNumeric;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.types.LargeIntType;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Normalizes the MV define plan for IVM at both CREATE MV and REFRESH MV time.
*
* <h3>Example: aggregate MV rewrite</h3>
* <p>Given MV definition:
* <pre>{@code
* SELECT sum(v1+v2), count(v3+v4), avg(v5+v6), min(v7+v8)
* FROM t GROUP BY k1, k2
* }</pre>
*
* <p>After IvmNormalizeMtmv the plan shape is:
* <pre>{@code
* ResultSink [row_id, visible outputs, hidden state cols]
* ��������� Project [
* __DORIS_IVM_ROW_ID__ = hash(k1, k2),
* k1, k2,
* sum(v1+v2), -- ordinal 0 visible (SUM)
* count(v3+v4), -- ordinal 1 visible (COUNT(expr), no hidden col)
* avg(v5+v6), -- ordinal 2 visible (AVG)
* min(v7+v8), -- ordinal 3 visible (MIN)
* __DORIS_IVM_AGG_COUNT_COL__, -- group COUNT(*)
* __DORIS_IVM_AGG_0_COUNT__, -- SUM: hidden COUNT(v1+v2) (no hidden SUM; visible stores it)
* __DORIS_IVM_AGG_2_SUM__, -- AVG: hidden SUM(v5+v6)
* __DORIS_IVM_AGG_2_COUNT__, -- AVG: hidden COUNT(v5+v6)
* __DORIS_IVM_AGG_3_COUNT__ -- MIN: hidden COUNT(v7+v8) (no hidden MIN; visible stores it)
* ]
* ��������� Aggregate [GROUP BY k1, k2]
* outputs: [k1, k2,
* sum(v1+v2), count(v3+v4), avg(v5+v6), min(v7+v8),
* COUNT(*), COUNT(v1+v2),
* SUM(v5+v6), COUNT(v5+v6),
* COUNT(v7+v8)]
* ��������� Scan(t) with base-table row-id
* }</pre>
*
* <h3>Hidden column strategy per aggregate type</h3>
* <ul>
* <li><b>COUNT(*)</b>: no hidden columns (visible = global group count)</li>
* <li><b>COUNT(expr)</b>: no hidden columns (visible stores the count directly)</li>
* <li><b>SUM</b>: hidden COUNT only (visible stores SUM; COUNT for guard)</li>
* <li><b>AVG</b>: hidden SUM + COUNT (visible is AVG ��� SUM or COUNT)</li>
* <li><b>MIN/MAX</b>: hidden COUNT only (visible stores extremal value)</li>
* </ul>
*
* <h3>Scan-level row-id injection</h3>
* <ul>
* <li>MOW (UNIQUE_KEYS + merge-on-write): hash(uk columns) ��� deterministic
* <li>Excluded AGG_KEYS table: hash(agg key columns) ��� deterministic
* <li>DUP_KEYS: uuid_numeric() ��� non-deterministic
* <li>Other key types: not supported, throws.
* </ul>
*
* <p>IVM row-id invariant: every row that really exists in a normalized child plan must have
* a non-null {@code __DORIS_IVM_ROW_ID_COL__}. Outer join null-padding is the only place where
* a child row-id slot may become NULL, and that NULL means the corresponding side has no
* matching row. This lets left outer join compose the MV row-id as
* {@code hash(left_row_id, right_row_id)} and use {@code right_row_id = NULL} for padded rows
* without confusing them with real right rows.
*
* <h3>Supported plan nodes</h3>
* OlapScan, filter, project, aggregate, inner/cross join, root left outer join, UNION ALL,
* result sink, logical olap table sink.
*/
public class IvmNormalizeMtmv extends DefaultPlanRewriter<Boolean> implements CustomRewriter {
private static final Set<Class<? extends AggregateFunction>> SUPPORTED_AGG_FUNCTIONS =
ImmutableSet.of(Count.class, Sum.class, Avg.class, Min.class, Max.class);
private final IvmNormalizeResult normalizeResult = new IvmNormalizeResult();
private StatementContext statementContext;
@Override
public Plan rewriteRoot(Plan plan, JobContext jobContext) {
ConnectContext connectContext = jobContext.getCascadesContext().getConnectContext();
if (connectContext == null || !connectContext.getSessionVariable().isEnableIvmNormalRewrite()) {
return plan;
}
// Idempotency: if already normalized (e.g. rewritten plan re-entering), skip.
if (jobContext.getCascadesContext().getIvmNormalizeResult().isPresent()) {
return plan;
}
statementContext = jobContext.getCascadesContext().getStatementContext();
jobContext.getCascadesContext().setIvmNormalizeResult(normalizeResult);
Plan result = plan.accept(this, true);
normalizeResult.setNormalizedPlan(result);
return result;
}
// unsupported: any plan node not explicitly whitelisted below
@Override
public Plan visit(Plan plan, Boolean isFirstNonSink) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED, "IVM does not support plan node: "
+ plan.getClass().getSimpleName());
}
// whitelisted: only OlapScan ��� inject IVM row-id at index 0
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan scan, Boolean isFirstNonSink) {
OlapTable table = scan.getTable();
Pair<Expression, Boolean> rowId = buildRowId(table, scan);
validateBinlogEnabled(scan);
Alias rowIdAlias = new Alias(rowId.first, Column.IVM_ROW_ID_COL);
normalizeResult.addRowId(rowIdAlias.toSlot(), rowId.second);
List<NamedExpression> outputs = ImmutableList.<NamedExpression>builder()
.add(rowIdAlias)
.addAll(scan.getOutput())
.build();
return new LogicalProject<>(outputs, scan);
}
// whitelisted: project ��� recurse into child, then propagate row-id if not already present
@Override
public Plan visitLogicalProject(LogicalProject<? extends Plan> project, Boolean isFirstNonSink) {
Plan newChild = project.child().accept(this, isFirstNonSink);
List<NamedExpression> newOutputs = rewriteOutputsWithIvmHiddenColumns(newChild, project.getProjects());
if (newChild == project.child() && newOutputs.equals(project.getProjects())) {
return project;
}
return project.withProjectsAndChild(newOutputs, newChild);
}
@Override
public Plan visitLogicalFilter(LogicalFilter<? extends Plan> filter, Boolean isFirstNonSink) {
Plan newChild = filter.child().accept(this, false);
return newChild == filter.child() ? filter : filter.withChildren(ImmutableList.of(newChild));
}
/**
* Handles inner join / cross join / root left outer join normalization.
*
* <ol>
* <li>Validates join type is INNER_JOIN, CROSS_JOIN, or root LEFT_OUTER_JOIN</li>
* <li>Normalizes both children (isFirstNonSink = false)</li>
* <li>Composes a single row_id = hash(left_row_id, right_row_id)</li>
* <li>Wraps with Project that replaces child row_id slots with the composed one</li>
* </ol>
*
* <p>The composed row_id is deterministic iff both children's row_ids are deterministic.
* Child row_id slots are removed from the output to prevent merge conflicts in
* {@link #collectIvmHiddenSlots} when multiple {@code __DORIS_IVM_ROW_ID_COL__} exist.
* The child entries in {@code rowIdDeterminism} are kept (not cleared) so that the
* strategy phase can look up individual child row_id determinism.
*/
@Override
public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, Boolean isFirstNonSink) {
JoinType joinType = join.getJoinType();
if (joinType != JoinType.INNER_JOIN && joinType != JoinType.CROSS_JOIN
&& joinType != JoinType.LEFT_OUTER_JOIN) {
throw new IvmException(IvmFailureReason.OUTER_JOIN_RETRACTION_UNSUPPORTED,
"IVM does not support join type: " + joinType
+ ". Only INNER_JOIN, CROSS_JOIN and LEFT_OUTER_JOIN are supported.");
}
if (join.isMarkJoin()) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM does not support mark join (subquery with disjunction).");
}
if (joinType.isOuterJoin()) {
if (!isFirstNonSink) {
throw new IvmException(IvmFailureReason.OUTER_JOIN_RETRACTION_UNSUPPORTED,
"IVM OUTER JOIN must be the top-level operator "
+ "(only sinks and projects allowed above it)");
}
// TODO: tighten nullable-side snapshot validation to an explicit allowlist:
// OlapScan, Project, Filter, Inner/Cross Join, and excluded-only UNION ALL.
checkNullableSideSnapshotSupported(join.right());
normalizeResult.setOuterJoinMv(true);
}
Plan newLeft = join.left().accept(this, false);
Plan newRight = join.right().accept(this, false);
LogicalJoin<Plan, Plan> newJoin = (LogicalJoin<Plan, Plan>) join.withChildren(newLeft, newRight);
// Find left and right row_id slots from children's output
Slot leftRowIdSlot = IvmUtil.findRowIdSlot(newLeft.getOutput(), "left child of join");
Slot rightRowIdSlot = IvmUtil.findRowIdSlot(newRight.getOutput(), "right child of join");
// Look up each child's row_id determinism from the accumulated map
boolean leftDet = normalizeResult.isDeterministic(leftRowIdSlot);
boolean rightDet = normalizeResult.isDeterministic(rightRowIdSlot);
if (joinType == JoinType.LEFT_OUTER_JOIN && !leftDet) {
// Nullable-side deltas may emit +1/-1 pad-null repair rows keyed by
// hash(left_row_id, NULL). A non-deterministic preserved-side row_id cannot
// be reproduced across refreshes, so LEFT OUTER JOIN IVM cannot maintain it.
throw new IvmException(IvmFailureReason.NON_DETERMINISTIC_ROW_ID,
"IVM LEFT OUTER JOIN requires deterministic row_id on preserved side");
}
// Compose join row_id = hash(left_row_id, right_row_id).
// Valid child row_ids are non-null by the normalize invariant above. For LEFT OUTER JOIN,
// a NULL right row_id can only come from join null-padding and means no matching right row.
Expression joinRowIdExpr = IvmUtil.buildRowIdHash(ImmutableList.of(leftRowIdSlot, rightRowIdSlot));
Alias joinRowIdAlias = new Alias(joinRowIdExpr, Column.IVM_ROW_ID_COL);
// Build Project output: [composedRowId, joinOutput minus child row_ids]
ImmutableList.Builder<NamedExpression> projectOutputs = ImmutableList.builder();
projectOutputs.add(joinRowIdAlias);
for (Slot slot : newJoin.getOutput()) {
if (!Column.IVM_ROW_ID_COL.equals(slot.getName())) {
projectOutputs.add(slot);
}
}
// Add composed row_id to map (don't clear ��� child entries are kept for strategy lookup)
normalizeResult.addRowId(joinRowIdAlias.toSlot(), leftDet && rightDet);
return new LogicalProject<>(projectOutputs.build(), newJoin);
}
/**
* Handles UNION ALL normalization.
*
* <p>Validates: only UNION ALL (rejects DISTINCT), no constant expression arms.
*
* <p>For each child arm:
* <ol>
* <li>Normalizes the child (injects row_id at scan/join level)</li>
* <li>Wraps with a Project that computes {@code hash(arm_index, child_row_id)} as the
* new row_id ��� the arm_index literal prevents cross-arm row_id collision (e.g. self-union)</li>
* <li>Strips the original child row_id from the output</li>
* </ol>
*
* <p>Then rebuilds the UNION with an additional union-level row_id output column prepended.
* The union row_id is deterministic iff all arms' row_ids are deterministic.
*/
@Override
public Plan visitLogicalUnion(LogicalUnion union, Boolean isFirstNonSink) {
if (union.getQualifier() != Qualifier.ALL) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM does not support UNION DISTINCT. Only UNION ALL is supported.");
}
if (!union.getConstantExprsList().isEmpty()) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM does not support UNION ALL with constant expressions.");
}
List<Plan> newChildren = new ArrayList<>();
List<List<SlotReference>> newChildrenOutputs = new ArrayList<>();
boolean allDet = true;
for (int i = 0; i < union.children().size(); i++) {
Plan normalizedChild = union.child(i).accept(this, false);
Slot childRowId = IvmUtil.findRowIdSlot(normalizedChild.getOutput(),
"child " + i + " of union");
allDet &= normalizeResult.isDeterministic(childRowId);
// Wrap with Project: hash(arm_index, child_row_id) as row_id, plus other cols
Expression hashExpr = IvmUtil.buildRowIdHash(
ImmutableList.of(new IntegerLiteral(i), childRowId));
Alias hashAlias = new Alias(hashExpr, Column.IVM_ROW_ID_COL);
ImmutableList.Builder<NamedExpression> projOutputs = ImmutableList.builder();
projOutputs.add(hashAlias);
for (Slot slot : normalizedChild.getOutput()) {
if (!Column.IVM_ROW_ID_COL.equals(slot.getName())) {
projOutputs.add(slot);
}
}
LogicalProject<Plan> hashedChild = new LogicalProject<>(projOutputs.build(), normalizedChild);
newChildren.add(hashedChild);
// Build child's regularChildrenOutputs: [hashed_row_id, ...original_mapping...]
SlotReference hashedRowIdSlot = (SlotReference) hashedChild.getOutput().get(0);
ImmutableList.Builder<SlotReference> childMapping = ImmutableList.builder();
childMapping.add(hashedRowIdSlot);
childMapping.addAll(union.getRegularChildrenOutputs().get(i));
newChildrenOutputs.add(childMapping.build());
}
// Create union-level row_id output
SlotReference unionRowId = new SlotReference(
StatementScopeIdGenerator.newExprId(),
Column.IVM_ROW_ID_COL, LargeIntType.INSTANCE, false, ImmutableList.of());
normalizeResult.addRowId(unionRowId, allDet);
// Rebuild UNION: [union_row_id, ...original_outputs...]
ImmutableList.Builder<NamedExpression> newOutputs = ImmutableList.builder();
newOutputs.add(unionRowId);
newOutputs.addAll(union.getOutputs());
return union.withNewOutputsChildrenAndConstExprsList(
newOutputs.build(), newChildren, newChildrenOutputs, union.getConstantExprsList());
}
/**
* Handles aggregate MV normalization. Post-NormalizeAggregate plan shape:
* {@code Project(top) ��� Aggregate(normalized) ��� Project(bottom) ��� ... ��� Scan}
*
* <p>This method:
* <ol>
* <li>Recurses into child (injects base scan row-id, unused at agg level)</li>
* <li>Validates all aggregate functions via {@link #checkAggFunctions}</li>
* <li>Adds hidden state aggregate columns to the Aggregate output</li>
* <li>Wraps with a Project that computes row-id = hash(group keys) or constant</li>
* <li>Stores {@link IvmAggMeta} in {@link IvmNormalizeResult}</li>
* </ol>
*
* <p>Returns: {@code Project(ivm hidden cols + original agg outputs) ��� Aggregate(with hidden aggs)}
*/
@Override
public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> agg, Boolean isFirstNonSink) {
if (!isFirstNonSink) {
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"IVM aggregate must be the top-level operator (only sinks and projects allowed above it)");
}
Plan newChild = agg.child().accept(this, false);
// After NormalizeAggregate, outputs are: group-by key Slots + Alias(AggFunc)
List<NamedExpression> origOutputs = agg.getOutputExpressions();
List<Expression> groupByExprs = agg.getGroupByExpressions();
boolean scalarAgg = groupByExprs.isEmpty();
List<Alias> aggAliases = new ArrayList<>();
for (NamedExpression output : origOutputs) {
if (output instanceof Slot) {
// group-by key slot ��� validated but not collected separately
} else if (output instanceof Alias && ((Alias) output).child() instanceof AggregateFunction) {
aggAliases.add((Alias) output);
} else {
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"IVM: unexpected expression in normalized aggregate output: " + output);
}
}
// Validate aggregate functions
List<AggregateFunction> aggFunctions = new ArrayList<>();
for (Alias alias : aggAliases) {
aggFunctions.add((AggregateFunction) alias.child());
}
checkAggFunctions(aggFunctions);
// Build hidden aggregate expressions and AggTarget metadata
// __DORIS_IVM_AGG_COUNT_COL__ = COUNT(*) for group multiplicity
Alias groupCountAlias = new Alias(new Count(), Column.IVM_AGG_COUNT_COL);
List<NamedExpression> hiddenAggOutputs = new ArrayList<>();
hiddenAggOutputs.add(groupCountAlias);
List<AggTarget> aggTargets = new ArrayList<>();
for (int i = 0; i < aggAliases.size(); i++) {
Alias origAlias = aggAliases.get(i);
AggregateFunction aggFunc = (AggregateFunction) origAlias.child();
buildHiddenStateForAgg(i, aggFunc, origAlias, hiddenAggOutputs, aggTargets);
}
// Build new Aggregate with hidden agg outputs AFTER original outputs
ImmutableList.Builder<NamedExpression> newAggOutputs = ImmutableList.builder();
newAggOutputs.addAll(origOutputs);
newAggOutputs.addAll(hiddenAggOutputs);
LogicalAggregate<Plan> newAgg = agg.withAggOutputChild(newAggOutputs.build(), newChild);
// Build wrapping Project that computes row-id and exposes all slots
// Layout: [row_id, original visible outputs, hidden state outputs]
// groupByExprs are already Slots after NormalizeAggregate
Expression rowIdExpr = IvmUtil.buildRowIdHash(groupByExprs);
Alias rowIdAlias = new Alias(rowIdExpr, Column.IVM_ROW_ID_COL);
// Add agg-level row-id to IvmNormalizeResult (child entries are kept for strategy lookup)
normalizeResult.addRowId(rowIdAlias.toSlot(), !scalarAgg);
// Project output: row_id first, then all Aggregate output slots (original + hidden)
ImmutableList.Builder<NamedExpression> projectOutputs = ImmutableList.builder();
projectOutputs.add(rowIdAlias);
for (NamedExpression aggOutput : newAgg.getOutputExpressions()) {
projectOutputs.add(aggOutput.toSlot());
}
// Resolve AggTarget slots from the new Aggregate output
List<Slot> newAggSlots = newAgg.getOutput();
// groupCountSlot is at origOutputs.size() (first hidden output after original outputs)
Slot groupCountSlot = newAggSlots.get(origOutputs.size());
List<AggTarget> resolvedTargets = resolveAggTargetSlots(aggTargets, newAggSlots);
// After NormalizeAggregate, group-by exprs are all Slots; cast directly
List<Slot> resolvedGroupKeys = groupByExprs.stream()
.map(expr -> (Slot) expr)
.collect(ImmutableList.toImmutableList());
IvmAggMeta aggMeta = new IvmAggMeta(scalarAgg, resolvedGroupKeys,
groupCountSlot, resolvedTargets);
normalizeResult.setAggMeta(aggMeta);
return new LogicalProject<>(projectOutputs.build(), newAgg);
}
private void checkNullableSideSnapshotSupported(Plan nullableSide) {
// Outer-join pad-null repair needs the nullable side's full pre/post snapshot.
// For UNION ALL, the linear delta strategy prunes non-delta arms and keeps only
// the changed arm. If such a UNION ALL contains a trigger-table OlapScan, O1 cannot
// reconstruct the full nullable-side snapshot from the rewritten delta plan reliably.
// Excluded trigger tables are ignored because their changes do not produce delta arms.
if (containsUnionAllWithOlapScan(nullableSide)) {
throw new IvmException(IvmFailureReason.SNAPSHOT_ALIGNMENT_UNSUPPORTED,
"IVM OUTER JOIN does not support UNION ALL with OlapScan on nullable side");
}
}
private boolean containsUnionAllWithOlapScan(Plan plan) {
if (!plan.containsType(LogicalUnion.class)) {
return false;
}
return plan.<LogicalUnion>collectFirst(node -> {
if (!(node instanceof LogicalUnion)) {
return false;
}
LogicalUnion union = (LogicalUnion) node;
if (!union.containsType(LogicalOlapScan.class)) {
return false;
}
return union.getQualifier() == Qualifier.ALL
&& union.<LogicalOlapScan>collectFirst(child -> {
if (!(child instanceof LogicalOlapScan)) {
return false;
}
LogicalOlapScan scan = (LogicalOlapScan) child;
return !isExcludedTriggerTable(scan);
}).isPresent();
}).isPresent();
}
/**
* For each user-visible aggregate, creates the hidden state columns needed for IVM delta.
* Appends hidden Alias expressions to {@code hiddenAggOutputs} and builds an AggTarget
* (with placeholder slots that will be resolved later from the new Aggregate output).
*/
private void buildHiddenStateForAgg(int ordinal, AggregateFunction aggFunc, Alias origAlias,
List<NamedExpression> hiddenAggOutputs, List<AggTarget> aggTargets) {
AggType aggType;
Map<AggType, Alias> hiddenAliases = new LinkedHashMap<>();
if (aggFunc instanceof Count) {
aggType = AggType.COUNT;
// No hidden columns for either COUNT(*) or COUNT(expr).
// COUNT(*) visible = global group count; COUNT(expr) visible stores count directly.
} else if (aggFunc instanceof Sum) {
aggType = AggType.SUM;
// No hidden SUM column: visible column stores SUM directly.
// Hidden COUNT is needed for the assertNonNegative guard and null-count logic.
addHiddenAlias(hiddenAliases, ordinal, AggType.COUNT, new Count(aggFunc.child(0)));
} else if (aggFunc instanceof Avg) {
aggType = AggType.AVG;
addHiddenAlias(hiddenAliases, ordinal, AggType.SUM, new Sum(aggFunc.child(0)));
addHiddenAlias(hiddenAliases, ordinal, AggType.COUNT, new Count(aggFunc.child(0)));
} else if (aggFunc instanceof Min) {
aggType = AggType.MIN;
// No hidden MIN column: the visible column already stores the extremal value.
// Only a hidden COUNT is needed for the guard / zero-count NULL logic.
addHiddenAlias(hiddenAliases, ordinal, AggType.COUNT, new Count(aggFunc.child(0)));
} else if (aggFunc instanceof Max) {
aggType = AggType.MAX;
addHiddenAlias(hiddenAliases, ordinal, AggType.COUNT, new Count(aggFunc.child(0)));
} else {
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"IVM: unsupported aggregate function: " + aggFunc.getName());
}
hiddenAggOutputs.addAll(hiddenAliases.values());
// Build AggTarget with placeholder slots (to be resolved after Aggregate is rebuilt)
ImmutableMap.Builder<AggType, Slot> placeholderHiddenSlots = ImmutableMap.builder();
for (Map.Entry<AggType, Alias> entry : hiddenAliases.entrySet()) {
placeholderHiddenSlots.put(entry.getKey(), entry.getValue().toSlot());
}
List<Expression> exprArgs = ImmutableList.of();
if (!(aggFunc instanceof Count && ((Count) aggFunc).isCountStar())) {
exprArgs = ImmutableList.of(aggFunc.child(0));
}
aggTargets.add(new AggTarget(ordinal, aggType, origAlias.toSlot(),
placeholderHiddenSlots.build(), exprArgs));
}
/** Adds a single hidden alias to the map with the standard IVM column name. */
private void addHiddenAlias(Map<AggType, Alias> hiddenAliases, int ordinal,
AggType stateType, AggregateFunction aggFunc) {
hiddenAliases.put(stateType, new Alias(aggFunc,
IvmUtil.ivmAggHiddenColumnName(ordinal, stateType.name())));
}
/**
* Resolves placeholder AggTarget slots to actual slots from the rebuilt Aggregate output.
* Matching is done by column name.
*/
private List<AggTarget> resolveAggTargetSlots(List<AggTarget> placeholderTargets,
List<Slot> newAggSlots) {
// Build name���slot map from the new Aggregate output
Map<String, Slot> slotByName = new LinkedHashMap<>();
for (Slot slot : newAggSlots) {
slotByName.put(slot.getName(), slot);
}
List<AggTarget> resolved = new ArrayList<>();
for (AggTarget target : placeholderTargets) {
// Resolve visible slot
Slot resolvedVisible = slotByName.get(target.getVisibleSlot().getName());
if (resolvedVisible == null) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM: failed to resolve visible slot '"
+ target.getVisibleSlot().getName() + "' from rebuilt aggregate output");
}
// Resolve hidden state slots
ImmutableMap.Builder<AggType, Slot> resolvedHidden = ImmutableMap.builder();
for (Map.Entry<AggType, Slot> entry : target.getHiddenStateSlots().entrySet()) {
Slot resolvedSlot = slotByName.get(entry.getValue().getName());
if (resolvedSlot == null) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM: failed to resolve hidden state slot '"
+ entry.getValue().getName() + "' from rebuilt aggregate output");
}
resolvedHidden.put(entry.getKey(), resolvedSlot);
}
resolved.add(new AggTarget(target.getOrdinal(), target.getAggType(),
resolvedVisible, resolvedHidden.build(), target.getExprArgs()));
}
return resolved;
}
// whitelisted: result sink ��� recurse into child, then prepend row-id to output exprs
@Override
public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> sink, Boolean isFirstNonSink) {
Plan newChild = sink.child().accept(this, isFirstNonSink);
List<NamedExpression> newOutputs = rewriteOutputsWithIvmHiddenColumns(newChild, sink.getOutputExprs());
if (newChild == sink.child() && newOutputs.equals(sink.getOutputExprs())) {
return sink;
}
return sink.withOutputExprs(newOutputs).withChildren(ImmutableList.of(newChild));
}
@Override
public Plan visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> sink,
Boolean isFirstNonSink) {
Plan newChild = sink.child().accept(this, isFirstNonSink);
if (newChild == sink.child()) {
return sink;
}
return sink.withChildAndUpdateOutput(newChild, sink.getPartitionExprList(),
sink.getSyncMvWhereClauses(), sink.getTargetTableSlots());
}
/**
* Rewrites output expressions to include IVM hidden columns from the child.
* Layout: [row_id, original visible outputs, other hidden cols (count, per-agg states)].
*/
private List<NamedExpression> rewriteOutputsWithIvmHiddenColumns(
Plan normalizedChild, List<NamedExpression> outputs) {
Map<String, Slot> ivmHiddenSlotsByName = collectIvmHiddenSlots(normalizedChild);
if (!ivmHiddenSlotsByName.containsKey(Column.IVM_ROW_ID_COL)) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM normalization error: child plan has no row-id slot after normalization");
}
// Separate row-id from other hidden slots
Slot rowIdSlot = ivmHiddenSlotsByName.get(Column.IVM_ROW_ID_COL);
Map<String, Slot> otherHiddenSlots = new LinkedHashMap<>(ivmHiddenSlotsByName);
otherHiddenSlots.remove(Column.IVM_ROW_ID_COL);
ImmutableList.Builder<NamedExpression> rewrittenOutputs = ImmutableList.builder();
if (outputs.stream().noneMatch(o -> IvmUtil.isIvmHiddenColumn(o.getName()))) {
// No hidden outputs in original list: prepend row_id, then originals, then other hidden
rewrittenOutputs.add(rowIdSlot);
rewrittenOutputs.addAll(outputs);
rewrittenOutputs.addAll(otherHiddenSlots.values());
return rewrittenOutputs.build();
}
// Outputs already contain some hidden columns (e.g. BindSink placeholders).
// Replace hidden outputs in-place to preserve positions and ExprIds.
for (NamedExpression output : outputs) {
if (IvmUtil.isIvmHiddenColumn(output.getName())) {
rewrittenOutputs.add(rewriteIvmHiddenOutput(output, ivmHiddenSlotsByName));
} else {
rewrittenOutputs.add(output);
}
}
// Append any new hidden slots from child that weren't in the original outputs
for (Map.Entry<String, Slot> entry : ivmHiddenSlotsByName.entrySet()) {
String name = entry.getKey();
if (outputs.stream().noneMatch(o -> name.equals(o.getName()))) {
rewrittenOutputs.add(entry.getValue());
}
}
return rewrittenOutputs.build();
}
private Map<String, Slot> collectIvmHiddenSlots(Plan normalizedChild) {
return normalizedChild.getOutput().stream()
.filter(slot -> IvmUtil.isIvmHiddenColumn(slot.getName()))
.collect(Collectors.toMap(Slot::getName, slot -> slot, (left, right) -> left, LinkedHashMap::new));
}
private NamedExpression rewriteIvmHiddenOutput(NamedExpression output, Map<String, Slot> ivmHiddenSlotsByName) {
Slot ivmHiddenSlot = ivmHiddenSlotsByName.get(output.getName());
if (ivmHiddenSlot == null) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM normalization error: child plan has no hidden slot named "
+ output.getName() + " after normalization");
}
if (output instanceof Slot) {
return ivmHiddenSlot;
}
if (output instanceof Alias) {
Alias alias = (Alias) output;
return new Alias(alias.getExprId(), ImmutableList.of(ivmHiddenSlot), alias.getName(),
alias.getQualifier(), alias.isNameFromChild());
}
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM normalization error: unsupported hidden output expression: "
+ output.getClass().getSimpleName());
}
/**
* Builds the row-id expression and returns whether it is deterministic as a pair.
* - UNIQUE_KEYS (MOW or excluded): (buildRowIdHash(uk...), true) ��� stable across refreshes
* - DUP_KEYS: (UuidNumeric(), false) ��� random per insert
* - Excluded AGG_KEYS: (buildRowIdHash(agg key...), true) ��� stable across refreshes
* - Other key types: throws IvmException (unless excluded trigger table)
*/
private Pair<Expression, Boolean> buildRowId(OlapTable table, LogicalOlapScan scan) {
KeysType keysType = table.getKeysType();
boolean isExcludedTriggerTable = isExcludedTriggerTable(scan);
if (keysType == KeysType.UNIQUE_KEYS) {
if (!table.getEnableUniqueKeyMergeOnWrite() && !isExcludedTriggerTable) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"INCREMENTAL materialized view requires UNIQUE_KEYS base tables "
+ "to enable Merge-On-Write. Table '"
+ table.getName() + "' has MOW disabled."
+ " If this table does not participate in incremental refresh, "
+ "add it to 'excluded_trigger_tables'.");
}
return buildDeterministicRowIdFromBaseKeys(table, scan);
}
if (keysType == KeysType.DUP_KEYS) {
return Pair.of(new UuidNumeric(), false);
}
if (keysType == KeysType.AGG_KEYS && isExcludedTriggerTable) {
return buildDeterministicRowIdFromBaseKeys(table, scan);
}
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"INCREMENTAL materialized view requires base tables to be "
+ "UNIQUE_KEYS with Merge-On-Write or DUP_KEYS. Table '"
+ table.getName() + "' is " + keysType
+ ". If this table does not participate in incremental refresh, "
+ "add it to 'excluded_trigger_tables'.");
}
private Pair<Expression, Boolean> buildDeterministicRowIdFromBaseKeys(OlapTable table, LogicalOlapScan scan) {
Set<String> keyColNames = table.getBaseSchemaKeyColumns().stream()
.map(Column::getName)
.collect(Collectors.toSet());
List<Expression> keySlots = scan.getOutput().stream()
.filter(slot -> keyColNames.contains(slot.getName()))
.collect(Collectors.toList());
if (keySlots.isEmpty()) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"IVM: no key columns found for "
+ table.getKeysType() + " table: " + table.getName());
}
return Pair.of(IvmUtil.buildRowIdHash(keySlots), true);
}
private void validateBinlogEnabled(LogicalOlapScan scan) {
if (isExcludedTriggerTable(scan)) {
return;
}
OlapTable table = scan.getTable();
if (!table.getBinlogConfig().isEnableForStreaming()) {
throw new IvmException(IvmFailureReason.BINLOG_NOT_ENABLED,
"SQL can be incrementally refreshed, but row binlog is not enabled for table: "
+ table.getName()
+ ". Please set 'binlog.enable' = 'true' and 'binlog.format' = 'ROW'.");
}
}
private boolean isExcludedTriggerTable(LogicalOlapScan scan) {
if (statementContext == null || statementContext.getExcludedTriggerTables().isEmpty()) {
return false;
}
OlapTable table = scan.getTable();
TableNameInfo tableNameInfo = TableNameInfoUtils.fromTableOrNull(table);
if (tableNameInfo == null) {
List<String> qualifier = scan.getQualifier();
String dbName = qualifier.isEmpty() ? table.getDBName() : qualifier.get(qualifier.size() - 1);
if (dbName == null) {
return false;
}
String ctlName = qualifier.size() >= 2 ? qualifier.get(qualifier.size() - 2)
: InternalCatalog.INTERNAL_CATALOG_NAME;
tableNameInfo = new TableNameInfo(ctlName, dbName, table.getName());
}
return MTMVPartitionUtil.isTableExcluded(statementContext.getExcludedTriggerTables(), tableNameInfo);
}
/**
* Validates that all aggregate functions are supported for IVM.
*
* <p>Rules enforced:
* <ol>
* <li>Bare GROUP BY (no aggregate functions) is allowed ��� the group-level count
* hidden column alone is sufficient for incremental maintenance.</li>
* <li>DISTINCT aggregates are not supported.</li>
* <li>Only count, sum, avg, min, and max are supported.</li>
* </ol>
*
* @throws IvmException if validation fails
*/
private static void checkAggFunctions(List<AggregateFunction> aggFunctions) {
for (AggregateFunction aggFunc : aggFunctions) {
if (aggFunc.isDistinct()) {
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"Aggregate DISTINCT is not supported for IVM: " + aggFunc.toSql());
}
if (!SUPPORTED_AGG_FUNCTIONS.contains(aggFunc.getClass())) {
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"Unsupported aggregate function for IVM: " + aggFunc.getName());
}
}
}
}