DeferMaterializeTopNResult.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* rewrite simple top n query to defer materialize slot not use for sort or predicate
*/
public class DeferMaterializeTopNResult implements RewriteRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
).then(r -> deferMaterialize(r, r.child(),
Optional.empty(), Optional.empty(), r.child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
logicalProject(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
).then(r -> {
LogicalProject<LogicalOlapScan> project = r.child().child();
return deferMaterialize(r, r.child(), Optional.of(project),
Optional.empty(), project.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
).then(r -> {
LogicalFilter<LogicalOlapScan> filter = r.child().child();
return deferMaterialize(r, r.child(), Optional.empty(),
Optional.of(filter), filter.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
logicalProject(
logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
).then(r -> {
LogicalProject<LogicalFilter<LogicalOlapScan>> project = r.child().child();
LogicalFilter<LogicalOlapScan> filter = project.child();
return deferMaterialize(r, r.child(), Optional.of(project),
Optional.of(filter), filter.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
logicalProject(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
).when(project -> project.canMergeProjections(project.child().child()))).then(r -> {
LogicalProject<?> upperProject = r.child();
LogicalProject<LogicalOlapScan> bottomProject = r.child().child().child();
List<NamedExpression> projections = upperProject.mergeProjections(bottomProject);
LogicalProject<?> project = upperProject.withProjects(projections);
return deferMaterialize(r, r.child().child(), Optional.of(project),
Optional.empty(), bottomProject.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
}
return true;
})
)).then(r -> deferMaterialize(r, r.child().child(), Optional.of(r.child()),
Optional.empty(), r.child().child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
logicalProject(logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
).when(project -> project.canMergeProjections(project.child().child()))).then(r -> {
LogicalProject<?> upperProject = r.child();
LogicalProject<LogicalFilter<LogicalOlapScan>> bottomProject = r.child().child().child();
List<NamedExpression> projections = upperProject.mergeProjections(bottomProject);
LogicalProject<?> project = upperProject.withProjects(projections);
LogicalFilter<LogicalOlapScan> filter = bottomProject.child();
return deferMaterialize(r, r.child().child(), Optional.of(project),
Optional.of(filter), filter.child());
})
)
);
}
private Plan deferMaterialize(LogicalResultSink<? extends Plan> logicalResultSink,
LogicalTopN<? extends Plan> logicalTopN, Optional<LogicalProject<? extends Plan>> logicalProject,
Optional<LogicalFilter<? extends Plan>> logicalFilter, LogicalOlapScan logicalOlapScan) {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableTopnLazyMaterialization) {
return null;
}
Column rowId = new Column(Column.ROWID_COL, Type.STRING, false, null, false, "", "rowid column");
SlotReference columnId = SlotReference.fromColumn(
logicalOlapScan.getTable(), rowId, logicalOlapScan.getQualifier());
Set<Slot> orderKeys = Sets.newHashSet();
Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(logicalOlapScan.getOutputExprIdSet());
logicalFilter.ifPresent(filter -> filter.getConjuncts()
.forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds())));
logicalTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.peek(orderKeys::add)
.map(NamedExpression::getExprId)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
if (logicalProject.isPresent()) {
deferredMaterializedExprIds.retainAll(logicalProject.get().getInputSlots().stream()
.map(NamedExpression::getExprId).collect(Collectors.toSet()));
}
if (deferredMaterializedExprIds.isEmpty()) {
// nothing to deferred materialize
return null;
}
LogicalDeferMaterializeOlapScan deferOlapScan = new LogicalDeferMaterializeOlapScan(
logicalOlapScan, deferredMaterializedExprIds, columnId);
Plan root = logicalFilter.map(f -> f.withChildren(deferOlapScan)).orElse(deferOlapScan);
Set<Slot> inputSlots = Sets.newHashSet();
logicalFilter.ifPresent(filter -> inputSlots.addAll(filter.getInputSlots()));
if (logicalProject.isPresent()) {
ImmutableList.Builder<NamedExpression> requiredSlots = ImmutableList.builder();
inputSlots.addAll(logicalProject.get().getInputSlots());
for (Slot output : root.getOutput()) {
if (inputSlots.contains(output) || orderKeys.contains(output)) {
requiredSlots.add(output);
}
}
requiredSlots.add(columnId);
root = new LogicalProject<>(requiredSlots.build(), root);
}
root = new LogicalDeferMaterializeTopN<>((LogicalTopN<? extends Plan>) logicalTopN.withChildren(root),
deferredMaterializedExprIds, columnId);
if (logicalProject.isPresent()) {
// generate projections with the order exactly same as result output's
Map<Slot, NamedExpression> projectsMap = Maps.newHashMap();
logicalProject.get().getProjects().forEach(p -> projectsMap.put(p.toSlot(), p));
List<NamedExpression> outputProjects = logicalResultSink.getOutput().stream()
.map(projectsMap::get)
.collect(ImmutableList.toImmutableList());
root = logicalProject.get().withProjectsAndChild(outputProjects, root);
}
root = logicalResultSink.withChildren(root);
return new LogicalDeferMaterializeResultSink<>((LogicalResultSink<? extends Plan>) root,
logicalOlapScan.getTable(), logicalOlapScan.getSelectedIndexId());
}
private long getTopNOptLimitThreshold() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
return -1;
}
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
}
return -1;
}
}