Window.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.algebra;
import org.apache.doris.analysis.AnalyticWindow;
import org.apache.doris.analysis.Expr;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundType;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* interface for LogicalWindow and PhysicalWindow
*/
public interface Window {
List<NamedExpression> getWindowExpressions();
/**
* translate WindowFrame to AnalyticWindow
*/
default AnalyticWindow translateWindowFrame(WindowFrame windowFrame, PlanTranslatorContext context) {
FrameUnitsType frameUnits = windowFrame.getFrameUnits();
FrameBoundary leftBoundary = windowFrame.getLeftBoundary();
FrameBoundary rightBoundary = windowFrame.getRightBoundary();
AnalyticWindow.Type type = frameUnits == FrameUnitsType.ROWS
? AnalyticWindow.Type.ROWS : AnalyticWindow.Type.RANGE;
AnalyticWindow.Boundary left = withFrameBoundary(leftBoundary, context);
AnalyticWindow.Boundary right = withFrameBoundary(rightBoundary, context);
return new AnalyticWindow(type, left, right);
}
/**
* translate FrameBoundary to Boundary
*/
default AnalyticWindow.Boundary withFrameBoundary(FrameBoundary boundary, PlanTranslatorContext context) {
FrameBoundType boundType = boundary.getFrameBoundType();
BigDecimal offsetValue = null;
Expr e = null;
if (boundary.hasOffset()) {
Expression boundOffset = boundary.getBoundOffset().get();
offsetValue = new BigDecimal(((Literal) boundOffset).getDouble());
e = ExpressionTranslator.translate(boundOffset, context);
}
switch (boundType) {
case UNBOUNDED_PRECEDING:
return new AnalyticWindow.Boundary(AnalyticWindow.BoundaryType.UNBOUNDED_PRECEDING, null);
case UNBOUNDED_FOLLOWING:
return new AnalyticWindow.Boundary(AnalyticWindow.BoundaryType.UNBOUNDED_FOLLOWING, null);
case CURRENT_ROW:
return new AnalyticWindow.Boundary(AnalyticWindow.BoundaryType.CURRENT_ROW, null);
case PRECEDING:
return new AnalyticWindow.Boundary(AnalyticWindow.BoundaryType.PRECEDING, e, offsetValue);
case FOLLOWING:
return new AnalyticWindow.Boundary(AnalyticWindow.BoundaryType.FOLLOWING, e, offsetValue);
default:
throw new AnalysisException("This WindowFrame hasn't be resolved in REWRITE");
}
}
/**
*
* select rank() over (partition by A, B) as r, sum(x) over(A, C) as s from T;
* A is a common partition key for all windowExpressions.
* for a common Partition key A, we could push filter A=1 through this window.
*/
default Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
ImmutableSet.Builder<SlotReference> commonPartitionKeySet = ImmutableSet.builder();
Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
for (Expression expr : getWindowExpressions()) {
if (expr instanceof Alias && expr.child(0) instanceof WindowExpression) {
WindowExpression winExpr = (WindowExpression) expr.child(0);
for (Expression partitionKey : winExpr.getPartitionKeys()) {
int count = partitionKeyCount.getOrDefault(partitionKey, 0);
partitionKeyCount.put(partitionKey, count + 1);
}
}
}
int winExprCount = getWindowExpressions().size();
for (Map.Entry<Expression, Integer> entry : partitionKeyCount.entrySet()) {
if (entry.getValue() == winExprCount && entry.getKey() instanceof SlotReference) {
SlotReference slot = (SlotReference) entry.getKey();
if (this instanceof LogicalWindow) {
LogicalWindow lw = (LogicalWindow) this;
Set<Slot> equalSlots = lw.getLogicalProperties().getTrait().calEqualSet(slot);
for (Slot other : equalSlots) {
if (other instanceof SlotReference) {
commonPartitionKeySet.add((SlotReference) other);
}
}
}
commonPartitionKeySet.add((SlotReference) entry.getKey());
}
}
return commonPartitionKeySet.build();
}
}