IvmAggFunctionRegistry.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.ivm.agg;
import org.apache.doris.mtmv.ivm.IvmException;
import org.apache.doris.mtmv.ivm.IvmFailureReason;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import com.google.common.collect.ImmutableList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Explicit registry for aggregate functions supported by IVM.
*
* <p>Each processor owns the function-specific work across the IVM aggregate lifecycle:
* normalize hidden state, build delta aggregate outputs, map delta outputs for apply, and build apply expressions.
*
* <p>Data flow:
* <ol>
* <li>{@code IvmNormalizeMtmv}: {@link #buildTargetSpec} records visible output, hidden state outputs, and
* original aggregate arguments into {@link IvmAggTargetSpec}. After normalize resolves final aggregate output
* slots, the spec becomes the stable {@link IvmAggTarget} stored in {@link IvmAggMeta}.</li>
* <li>{@code IvmAggDeltaHandler#buildDeltaSubPlan}: {@link #appendDeltaAggregateOutputs} creates the signed delta
* aggregate columns, and {@link #collectZeroDefaultDeltaOutputNames} tells the top delta project which
* arithmetic operands need {@code COALESCE(x, 0)}.</li>
* <li>{@code IvmAggDeltaHandler#buildApplyPlan}: {@link #mapApplyDeltaSlots} converts delta output slots into
* {@link IvmAggDeltaSlotRef} keys, then {@link #appendApplyExpressions} derives the final MV columns.</li>
* </ol>
*/
public class IvmAggFunctionRegistry {
public static final IvmAggFunctionRegistry INSTANCE = new IvmAggFunctionRegistry();
private final List<IvmAggFunctionProcessor> processors;
private final Map<IvmAggFunctionKind, IvmAggFunctionProcessor> processorByKind;
private IvmAggFunctionRegistry() {
processors = ImmutableList.of(
new IvmAggCountProcessor(),
new IvmAggSumProcessor(),
new IvmAggAvgProcessor(),
new IvmAggMinProcessor(),
new IvmAggMaxProcessor(),
new IvmAggBitmapUnionProcessor(),
new IvmAggBitmapUnionCountProcessor());
processorByKind = new EnumMap<>(IvmAggFunctionKind.class);
for (IvmAggFunctionProcessor processor : processors) {
processorByKind.put(processor.handledFunctionKind(), processor);
}
}
/**
* Builds target metadata for one original aggregate function and its optional hidden state columns.
*/
public IvmAggTargetSpec buildTargetSpec(int ordinal, AggregateFunction function, Alias visibleAlias) {
return findProcessor(function).buildTargetSpec(ordinal, function, visibleAlias);
}
/**
* Appends the aggregate expressions produced by the delta aggregate for one visible MV aggregate column.
*/
public void appendDeltaAggregateOutputs(IvmAggTarget target, Slot dmlFactorSlot, List<NamedExpression> outputs,
IvmAggExpressionBuilder ctx) {
processorFor(target).appendDeltaAggregateOutputs(target, dmlFactorSlot, outputs, ctx);
}
/**
* Maps the delta plan output slots to stable logical keys consumed by the apply expressions.
*/
public void mapApplyDeltaSlots(IvmAggTarget target, Map<String, Slot> outputByName,
Map<IvmAggDeltaSlotRef, Slot> applyDeltaSlots, Slot deltaGroupCountSlot, IvmAggExpressionBuilder ctx) {
processorFor(target).mapApplyDeltaSlots(target, outputByName, applyDeltaSlots, deltaGroupCountSlot, ctx);
}
/**
* Adds delta output names where NULL should be normalized to zero before apply consumes them.
*/
public void collectZeroDefaultDeltaOutputNames(IvmAggTarget target, boolean scalarAgg,
Set<String> outputNames, IvmAggExpressionBuilder ctx) {
processorFor(target).collectZeroDefaultDeltaOutputNames(target, scalarAgg, outputNames, ctx);
}
/**
* Appends this aggregate target's final visible and hidden-state expressions to the apply projection.
*/
public void appendApplyExpressions(IvmAggTarget target, IvmAggApplyContext ctx) {
processorFor(target).appendApplyExpressions(target, ctx);
}
private IvmAggFunctionProcessor processorFor(IvmAggTarget target) {
IvmAggFunctionProcessor processor = processorByKind.get(target.getFunctionKind());
if (processor == null) {
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"Unsupported aggregate function for IVM: " + target.getFunctionKind());
}
return processor;
}
private IvmAggFunctionProcessor findProcessor(AggregateFunction function) {
if (function.isDistinct()) {
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"Aggregate DISTINCT is not supported for IVM: " + function.toSql());
}
for (IvmAggFunctionProcessor processor : processors) {
if (processor.supportsOriginalFunction(function)) {
return processor;
}
}
throw new IvmException(IvmFailureReason.AGG_UNSUPPORTED,
"Unsupported aggregate function for IVM: " + function.getName());
}
}