PushDownLimit.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Limit;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import com.google.common.collect.ImmutableList;
import java.util.List;
/**
* Rules to push {@link org.apache.doris.nereids.trees.plans.logical.LogicalLimit} down.
* <p>
* Limit can't be push down if it has a valid offset info.
* splitLimit run before this rule, so the limit match the patterns is local limit
*/
public class PushDownLimit implements RewriteRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
// limit -> join
logicalLimit(logicalJoin(any(), any())).whenNot(Limit::hasValidOffset)
.then(limit -> {
Plan newJoin = pushLimitThroughJoin(limit, limit.child());
if (newJoin == null || limit.child().children().equals(newJoin.children())) {
return null;
}
return limit.withChildren(newJoin);
})
.toRule(RuleType.PUSH_LIMIT_THROUGH_JOIN),
// limit -> project -> join
logicalLimit(logicalProject(logicalJoin(any(), any()))).whenNot(Limit::hasValidOffset)
.then(limit -> {
LogicalProject<LogicalJoin<Plan, Plan>> project = limit.child();
LogicalJoin<Plan, Plan> join = project.child();
Plan newJoin = pushLimitThroughJoin(limit, join);
if (newJoin == null || join.children().equals(newJoin.children())) {
return null;
}
return limit.withChildren(project.withChildren(newJoin));
}).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_JOIN),
// limit -> window
logicalLimit(logicalWindow())
.then(limit -> {
LogicalWindow<Plan> window = limit.child();
long partitionLimit = limit.getLimit() + limit.getOffset();
if (partitionLimit <= 0) {
return limit;
}
Pair<WindowExpression, Long> windowFuncLongPair = window
.getPushDownWindowFuncAndLimit(null, partitionLimit);
if (windowFuncLongPair == null) {
return limit;
}
Plan newWindow = window.pushPartitionLimitThroughWindow(windowFuncLongPair.first,
windowFuncLongPair.second, true);
return limit.withChildren(newWindow);
}).toRule(RuleType.PUSH_LIMIT_THROUGH_WINDOW),
// limit -> project -> window
logicalLimit(logicalProject(logicalWindow()))
.then(limit -> {
LogicalProject<LogicalWindow<Plan>> project = limit.child();
LogicalWindow<Plan> window = project.child();
long partitionLimit = limit.getLimit() + limit.getOffset();
if (partitionLimit <= 0) {
return limit;
}
Pair<WindowExpression, Long> windowFuncLongPair = window
.getPushDownWindowFuncAndLimit(null, partitionLimit);
if (windowFuncLongPair == null) {
return limit;
}
Plan newWindow = window.pushPartitionLimitThroughWindow(windowFuncLongPair.first,
windowFuncLongPair.second, true);
return limit.withChildren(project.withChildren(newWindow));
}).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_WINDOW),
// limit -> union
logicalLimit(logicalUnion(multi()).when(union -> union.getQualifier() == Qualifier.ALL))
.whenNot(Limit::hasValidOffset)
.then(limit -> {
LogicalUnion union = limit.child();
ImmutableList<Plan> newUnionChildren = union.children()
.stream()
.map(limit::withChildren)
.collect(ImmutableList.toImmutableList());
if (union.children().equals(newUnionChildren)) {
return null;
}
return limit.withChildren(union.withChildren(newUnionChildren));
})
.toRule(RuleType.PUSH_LIMIT_THROUGH_UNION),
new MergeLimits().build()
);
}
private Plan pushLimitThroughJoin(LogicalLimit<? extends Plan> limit, LogicalJoin<Plan, Plan> join) {
switch (join.getJoinType()) {
case LEFT_OUTER_JOIN:
return join.withChildren(
limit.withChildren(join.left()),
join.right()
);
case RIGHT_OUTER_JOIN:
return join.withChildren(
join.left(),
limit.withChildren(join.right())
);
case CROSS_JOIN:
return join.withChildren(
limit.withChildren(join.left()),
limit.withChildren(join.right())
);
default:
// don't push limit.
return null;
}
}
}