Memo.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.memo;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.cost.Cost;
import org.apache.doris.nereids.cost.CostCalculator;
import org.apache.doris.nereids.metrics.EventChannel;
import org.apache.doris.nereids.metrics.EventProducer;
import org.apache.doris.nereids.metrics.consumer.LogConsumer;
import org.apache.doris.nereids.metrics.event.GroupMergeEvent;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.properties.RequestPropertyDeriver;
import org.apache.doris.nereids.properties.RequirePropertiesSupplier;
import org.apache.doris.nereids.rules.exploration.mv.AbstractMaterializedViewRule;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.LeafPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
/**
* Representation for memo in cascades optimizer.
*/
public class Memo {
public static final Logger LOG = LogManager.getLogger(Memo.class);
// generate group id in memo is better for test, since we can reproduce exactly same Memo.
private static final EventProducer GROUP_MERGE_TRACER = new EventProducer(GroupMergeEvent.class,
EventChannel.getDefaultChannel().addConsumers(new LogConsumer(GroupMergeEvent.class, EventChannel.LOG)));
private static long stateId = 0;
private final ConnectContext connectContext;
private final AtomicLong refreshVersion = new AtomicLong(1);
private final Map<Class<? extends AbstractMaterializedViewRule>, Set<Long>> materializationCheckSuccessMap =
new LinkedHashMap<>();
private final Map<Class<? extends AbstractMaterializedViewRule>, Set<Long>> materializationCheckFailMap =
new LinkedHashMap<>();
private final IdGenerator<GroupId> groupIdGenerator = GroupId.createGenerator();
private final Map<GroupId, Group> groups = Maps.newLinkedHashMap();
// we could not use Set, because Set does not have get method.
private final Map<GroupExpression, GroupExpression> groupExpressions = Maps.newHashMap();
private Group root;
// FOR TEST ONLY
public Memo() {
this.root = null;
this.connectContext = null;
}
public Memo(ConnectContext connectContext, Plan plan) {
this.root = init(plan);
this.connectContext = connectContext;
}
public static long getStateId() {
return stateId;
}
public Group getRoot() {
return root;
}
/**
* This function used to update the root group when DPHyp change the root Group
* Note it only used in DPHyp
*/
public void setRoot(Group root) {
this.root = root;
}
public List<Group> getGroups() {
return ImmutableList.copyOf(groups.values());
}
public Group getGroup(GroupId groupId) {
return groups.get(groupId);
}
public Map<GroupExpression, GroupExpression> getGroupExpressions() {
return groupExpressions;
}
public int getGroupExpressionsSize() {
return groupExpressions.size();
}
public long getRefreshVersion() {
return refreshVersion.get();
}
/**
* Record materialization check result for performance
*/
public void recordMaterializationCheckResult(Class<? extends AbstractMaterializedViewRule> target,
Long checkedMaterializationId, boolean isSuccess) {
if (isSuccess) {
Set<Long> checkedSet = materializationCheckSuccessMap.get(target);
if (checkedSet == null) {
checkedSet = new HashSet<>();
materializationCheckSuccessMap.put(target, checkedSet);
}
checkedSet.add(checkedMaterializationId);
} else {
Set<Long> checkResultSet = materializationCheckFailMap.get(target);
if (checkResultSet == null) {
checkResultSet = new HashSet<>();
materializationCheckFailMap.put(target, checkResultSet);
}
checkResultSet.add(checkedMaterializationId);
}
}
/**
* Get the info for materialization context is checked
*
* @return if true, check successfully, if false check fail, if null not checked
*/
public Boolean materializationHasChecked(Class<? extends AbstractMaterializedViewRule> target,
long materializationId) {
Set<Long> checkSuccessSet = materializationCheckSuccessMap.get(target);
if (checkSuccessSet != null && checkSuccessSet.contains(materializationId)) {
return true;
}
Set<Long> checkFailSet = materializationCheckFailMap.get(target);
if (checkFailSet != null && checkFailSet.contains(materializationId)) {
return false;
}
return null;
}
public int countMaxContinuousJoin() {
return countGroupJoin(root).second;
}
/**
* return the max continuous join operator
*/
public Pair<Integer, Integer> countGroupJoin(Group group) {
GroupExpression logicalExpr = group.getLogicalExpression();
List<Pair<Integer, Integer>> children = new ArrayList<>();
for (Group child : logicalExpr.children()) {
children.add(countGroupJoin(child));
}
if (group.isProjectGroup()) {
return children.get(0);
}
int maxJoinCount = 0;
int continuousJoinCount = 0;
for (Pair<Integer, Integer> child : children) {
maxJoinCount = Math.max(maxJoinCount, child.second);
}
if (group.getLogicalExpression().getPlan() instanceof LogicalJoin) {
for (Pair<Integer, Integer> child : children) {
continuousJoinCount += child.first;
}
continuousJoinCount += 1;
} else if (group.isProjectGroup()) {
return children.get(0);
}
return Pair.of(continuousJoinCount, Math.max(continuousJoinCount, maxJoinCount));
}
/**
* Add plan to Memo.
*/
public CopyInResult copyIn(Plan plan, @Nullable Group target, boolean rewrite, HashMap<Long, Group> planTable) {
CopyInResult result;
if (rewrite) {
result = doRewrite(plan, target);
} else {
result = doCopyIn(plan, target, planTable);
}
maybeAddStateId(result);
return result;
}
/**
* Add plan to Memo.
*
* @param plan {@link Plan} or {@link Expression} to be added
* @param target target group to add node. null to generate new Group
* @param rewrite whether to rewrite the node to the target group
* @return CopyInResult, in which the generateNewExpression is true if a newly generated
* groupExpression added into memo, and the correspondingExpression
* is the corresponding group expression of the plan
*/
public CopyInResult copyIn(Plan plan, @Nullable Group target, boolean rewrite) {
CopyInResult result;
if (rewrite) {
result = doRewrite(plan, target);
} else {
result = doCopyIn(plan, target, null);
}
maybeAddStateId(result);
return result;
}
private void maybeAddStateId(CopyInResult result) {
if (connectContext != null && connectContext.getSessionVariable().isEnableNereidsTrace()
&& result.generateNewExpression) {
stateId++;
}
}
public List<Plan> copyOutAll() {
return copyOutAll(root);
}
private List<Plan> copyOutAll(Group group) {
List<GroupExpression> logicalExpressions = group.getLogicalExpressions();
return logicalExpressions.stream()
.flatMap(groupExpr -> copyOutAll(groupExpr).stream())
.collect(Collectors.toList());
}
private List<Plan> copyOutAll(GroupExpression logicalExpression) {
if (logicalExpression.arity() == 0) {
return Lists.newArrayList(logicalExpression.getPlan().withChildren(ImmutableList.of()));
} else if (logicalExpression.arity() == 1) {
List<Plan> multiChild = copyOutAll(logicalExpression.child(0));
return multiChild.stream()
.map(children -> logicalExpression.getPlan().withChildren(children))
.collect(Collectors.toList());
} else if (logicalExpression.arity() == 2) {
int leftCount = logicalExpression.child(0).getLogicalExpressions().size();
int rightCount = logicalExpression.child(1).getLogicalExpressions().size();
int count = leftCount * rightCount;
List<Plan> leftChildren = copyOutAll(logicalExpression.child(0));
List<Plan> rightChildren = copyOutAll(logicalExpression.child(1));
List<Plan> result = new ArrayList<>(count);
for (Plan leftChild : leftChildren) {
for (Plan rightChild : rightChildren) {
result.add(logicalExpression.getPlan().withChildren(leftChild, rightChild));
}
}
return result;
} else {
throw new RuntimeException("arity > 2");
}
}
public Plan copyOut() {
return copyOut(root, false);
}
/**
* copyOut the group.
* @param group the group what want to copyOut
* @param includeGroupExpression whether include group expression in the plan
* @return plan
*/
public Plan copyOut(Group group, boolean includeGroupExpression) {
GroupExpression logicalExpression = group.getLogicalExpression();
return copyOut(logicalExpression, includeGroupExpression);
}
/**
* copyOut the logicalExpression.
* @param logicalExpression the logicalExpression what want to copyOut
* @param includeGroupExpression whether include group expression in the plan
* @return plan
*/
public Plan copyOut(GroupExpression logicalExpression, boolean includeGroupExpression) {
List<Plan> children = Lists.newArrayList();
for (Group child : logicalExpression.children()) {
children.add(copyOut(child, includeGroupExpression));
}
Plan planWithChildren = logicalExpression.getPlan().withChildren(children);
Optional<GroupExpression> groupExpression = includeGroupExpression
? Optional.of(logicalExpression)
: Optional.empty();
return planWithChildren.withGroupExpression(groupExpression);
}
/**
* init memo by a first plan.
* @param plan first plan
* @return plan's corresponding group
*/
private Group init(Plan plan) {
Preconditions.checkArgument(!(plan instanceof GroupPlan), "Cannot init memo by a GroupPlan");
// initialize children recursively
List<Group> childrenGroups = new ArrayList<>(plan.arity());
for (Plan child : plan.children()) {
childrenGroups.add(init(child));
}
plan = replaceChildrenToGroupPlan(plan, childrenGroups);
GroupExpression newGroupExpression = new GroupExpression(plan, childrenGroups);
Group group = new Group(groupIdGenerator.getNextId(), newGroupExpression, plan.getLogicalProperties());
groups.put(group.getGroupId(), group);
if (groupExpressions.containsKey(newGroupExpression)) {
throw new IllegalStateException("groupExpression already exists in memo, maybe a bug");
}
groupExpressions.put(newGroupExpression, newGroupExpression);
return group;
}
/**
* add or replace the plan into the target group.
* <p>
* the result truth table:
* <pre>
* +---------------------------------------+-----------------------------------+--------------------------------+
* | case | is generated new group expression | corresponding group expression |
* +---------------------------------------+-----------------------------------+--------------------------------+
* | case 1: | | |
* | if plan is GroupPlan | false | existed group expression |
* | or plan has groupExpression | | |
* +---------------------------------------+-----------------------------------+--------------------------------+
* | case 2: | | |
* | if targetGroup is null | true | new group expression |
* | and same group expression not exist | | |
* +---------------------------------------+-----------------------------------+--------------------------------+
* | case 3: | | |
* | if targetGroup is not null | true | new group expression |
* | and same group expression not exist | | |
* +---------------------------------------+-----------------------------------+--------------------------------+
* | case 4: | | |
* | if targetGroup is not null and not | true | new group expression |
* | equal to the existed group | | |
* | expression's owner group | | |
* +---------------------------------------+-----------------------------------+--------------------------------+
* | case 5: | | |
* | if targetGroup is null or equal to | false | existed group expression |
* | the existed group expression's owner | | |
* | group | | |
* +---------------------------------------+-----------------------------------+--------------------------------+
* </pre>
*
* @param plan the plan which want to rewrite or added
* @param targetGroup target group to replace plan. null to generate new Group. It should be the ancestors
* of the plan's group, or equals to the plan's group, we do not check this constraint
* completely because of performance.
* @return a pair, in which the first element is true if a newly generated groupExpression added into memo,
* and the second element is a reference of node in Memo
*/
private CopyInResult doRewrite(Plan plan, @Nullable Group targetGroup) {
Preconditions.checkArgument(plan != null, "plan can not be null");
Preconditions.checkArgument(plan instanceof LogicalPlan, "only logical plan can be rewrite");
// case 1: fast check the plan whether exist in the memo
if (plan instanceof GroupPlan || plan.getGroupExpression().isPresent()) {
return rewriteByExistedPlan(targetGroup, plan);
}
List<Group> childrenGroups = rewriteChildrenPlansToGroups(plan, targetGroup);
plan = replaceChildrenToGroupPlan(plan, childrenGroups);
// try to create a new group expression
GroupExpression newGroupExpression = new GroupExpression(plan, childrenGroups);
// slow check the groupExpression/plan whether exists in the memo
GroupExpression existedExpression = groupExpressions.get(newGroupExpression);
if (existedExpression == null) {
// case 2 or case 3
return rewriteByNewGroupExpression(targetGroup, plan, newGroupExpression);
} else {
// case 4 or case 5
return rewriteByExistedGroupExpression(targetGroup, plan, existedExpression, newGroupExpression);
}
}
/**
* add the plan into the target group
* @param plan the plan which want added
* @param targetGroup target group to add plan. null to generate new Group. It should be the ancestors
* of the plan's group, or equals to the plan's group, we do not check this constraint
* completely because of performance.
* @return a pair, in which the first element is true if a newly generated groupExpression added into memo,
* and the second element is a reference of node in Memo
*/
private CopyInResult doCopyIn(Plan plan, @Nullable Group targetGroup, @Nullable HashMap<Long, Group> planTable) {
Preconditions.checkArgument(!(plan instanceof GroupPlan), "plan can not be GroupPlan");
// check logicalproperties, must same output in a Group.
if (targetGroup != null && !plan.getLogicalProperties().equals(targetGroup.getLogicalProperties())) {
LOG.info("Insert a plan into targetGroup but differ in logicalproperties."
+ "\nPlan logicalproperties: {}\n targetGroup logicalproperties: {}",
plan.getLogicalProperties(), targetGroup.getLogicalProperties());
throw new IllegalStateException("Insert a plan into targetGroup but differ in logicalproperties");
}
// TODO Support sync materialized view in the future
if (connectContext != null
&& connectContext.getSessionVariable().isEnableMaterializedViewNestRewrite()
&& plan instanceof LogicalCatalogRelation
&& ((CatalogRelation) plan).getTable() instanceof MTMV
&& !plan.getGroupExpression().isPresent()) {
refreshVersion.incrementAndGet();
}
Optional<GroupExpression> groupExpr = plan.getGroupExpression();
if (groupExpr.isPresent()) {
Preconditions.checkState(groupExpressions.containsKey(groupExpr.get()));
return CopyInResult.of(false, groupExpr.get());
}
List<Group> childrenGroups = Lists.newArrayList();
for (int i = 0; i < plan.children().size(); i++) {
// skip useless project.
Plan child = plan.child(i);
if (child instanceof GroupPlan) {
childrenGroups.add(((GroupPlan) child).getGroup());
} else if (child.getGroupExpression().isPresent()) {
childrenGroups.add(child.getGroupExpression().get().getOwnerGroup());
} else {
childrenGroups.add(doCopyIn(child, null, planTable).correspondingExpression.getOwnerGroup());
}
}
plan = replaceChildrenToGroupPlan(plan, childrenGroups);
GroupExpression newGroupExpression = new GroupExpression(plan, childrenGroups);
return insertGroupExpression(newGroupExpression, targetGroup, plan.getLogicalProperties(), planTable);
// TODO: need to derive logical property if generate new group. currently we not copy logical plan into
}
private List<Group> rewriteChildrenPlansToGroups(Plan plan, Group targetGroup) {
List<Group> childrenGroups = Lists.newArrayList();
for (int i = 0; i < plan.children().size(); i++) {
Plan child = plan.children().get(i);
if (child instanceof GroupPlan) {
GroupPlan childGroupPlan = (GroupPlan) child;
validateRewriteChildGroup(childGroupPlan.getGroup(), targetGroup);
childrenGroups.add(childGroupPlan.getGroup());
} else if (child.getGroupExpression().isPresent()) {
Group childGroup = child.getGroupExpression().get().getOwnerGroup();
validateRewriteChildGroup(childGroup, targetGroup);
childrenGroups.add(childGroup);
} else {
childrenGroups.add(doRewrite(child, null).correspondingExpression.getOwnerGroup());
}
}
return childrenGroups;
}
private void validateRewriteChildGroup(Group childGroup, Group targetGroup) {
/*
* 'A => B(A)' is invalid equivalent transform because of dead loop.
* see 'MemoTest.a2ba()'
*/
if (childGroup == targetGroup) {
throw new IllegalStateException("Can not add plan which is ancestor of the target plan");
}
}
/**
* Insert groupExpression to target group.
* If group expression is already in memo and target group is not null, we merge two groups.
* If target is null, generate new group.
* If target is not null, add group expression to target group
*
* @param groupExpression groupExpression to insert
* @param target target group to insert groupExpression
* @return a pair, in which the first element is true if a newly generated groupExpression added into memo,
* and the second element is a reference of node in Memo
*/
private CopyInResult insertGroupExpression(GroupExpression groupExpression, Group target,
LogicalProperties logicalProperties, HashMap<Long, Group> planTable) {
GroupExpression existedGroupExpression = groupExpressions.get(groupExpression);
if (existedGroupExpression != null) {
if (target != null && !target.getGroupId().equals(existedGroupExpression.getOwnerGroup().getGroupId())) {
mergeGroup(target, existedGroupExpression.getOwnerGroup(), planTable);
}
// When we create a GroupExpression, we will add it into ParentExpression of childGroup.
// But if it already exists, we should remove it from ParentExpression of childGroup.
groupExpression.children().forEach(childGroup -> childGroup.removeParentExpression(groupExpression));
return CopyInResult.of(false, existedGroupExpression);
}
if (target != null) {
target.addGroupExpression(groupExpression);
} else {
Group group = new Group(groupIdGenerator.getNextId(), groupExpression, logicalProperties);
groups.put(group.getGroupId(), group);
}
groupExpressions.put(groupExpression, groupExpression);
return CopyInResult.of(true, groupExpression);
}
/**
* Merge two groups.
* 1. find all group expression which has source as child
* 2. replace its child with destination
* 3. remove redundant group expression after replace child
* 4. move all group expression in source to destination
*
* @param source source group
* @param destination destination group
*/
public void mergeGroup(Group source, Group destination, HashMap<Long, Group> planTable) {
if (source.equals(destination)) {
return;
}
List<GroupExpression> needReplaceChild = Lists.newArrayList();
for (GroupExpression dstParent : destination.getParentGroupExpressions()) {
if (dstParent.getOwnerGroup().equals(source)) {
// cycle, we should not merge
return;
}
}
for (GroupExpression srcParent : source.getParentGroupExpressions()) {
if (srcParent.getOwnerGroup().equals(destination)) {
// cycle, we should not merge
return;
}
Group parentOwnerGroup = srcParent.getOwnerGroup();
if (parentOwnerGroup.getEnforcers().containsKey(srcParent)) {
continue;
}
needReplaceChild.add(srcParent);
}
GROUP_MERGE_TRACER.log(GroupMergeEvent.of(source, destination, needReplaceChild));
for (GroupExpression reinsertGroupExpr : needReplaceChild) {
// After change GroupExpression children, hashcode will change, so need to reinsert into map.
groupExpressions.remove(reinsertGroupExpr);
reinsertGroupExpr.replaceChild(source, destination);
GroupExpression existGroupExpr = groupExpressions.get(reinsertGroupExpr);
if (existGroupExpr != null) {
Preconditions.checkState(existGroupExpr.getOwnerGroup() != null);
// remove reinsertGroupExpr from its owner group to avoid adding it to existGroupExpr.getOwnerGroup()
// existGroupExpr.getOwnerGroup() already has this reinsertGroupExpr.
reinsertGroupExpr.setUnused(true);
if (existGroupExpr.getOwnerGroup().equals(reinsertGroupExpr.getOwnerGroup())) {
// reinsertGroupExpr & existGroupExpr are in same Group, so merge them.
if (reinsertGroupExpr.getPlan() instanceof PhysicalPlan) {
reinsertGroupExpr.getOwnerGroup().replaceBestPlanGroupExpr(reinsertGroupExpr, existGroupExpr);
}
// existingGroupExpression merge the state of reinsertGroupExpr
reinsertGroupExpr.mergeTo(existGroupExpr);
} else {
// reinsertGroupExpr & existGroupExpr aren't in same group, need to merge their OwnerGroup.
mergeGroup(reinsertGroupExpr.getOwnerGroup(), existGroupExpr.getOwnerGroup(), planTable);
}
} else {
groupExpressions.put(reinsertGroupExpr, reinsertGroupExpr);
}
}
// replace source with destination in groups of planTable
if (planTable != null) {
planTable.forEach((bitset, group) -> {
if (group.equals(source)) {
planTable.put(bitset, destination);
}
});
}
source.mergeTo(destination);
if (source == root) {
root = destination;
}
groups.remove(source.getGroupId());
}
private CopyInResult rewriteByExistedPlan(Group targetGroup, Plan existedPlan) {
GroupExpression existedLogicalExpression = existedPlan instanceof GroupPlan
? ((GroupPlan) existedPlan).getGroup().getLogicalExpression() // get first logicalGroupExpression
: existedPlan.getGroupExpression().get();
if (targetGroup != null) {
Group existedGroup = existedLogicalExpression.getOwnerGroup();
// clear targetGroup, from exist group move all logical groupExpression
// and logicalProperties to target group
eliminateFromGroupAndMoveToTargetGroup(existedGroup, targetGroup, existedPlan.getLogicalProperties());
}
return CopyInResult.of(false, existedLogicalExpression);
}
public Group newGroup(LogicalProperties logicalProperties) {
Group group = new Group(groupIdGenerator.getNextId(), logicalProperties);
groups.put(group.getGroupId(), group);
return group;
}
private CopyInResult rewriteByNewGroupExpression(Group targetGroup, Plan newPlan,
GroupExpression newGroupExpression) {
if (targetGroup == null) {
// case 2:
// if not exist target group and not exist the same group expression,
// then create new group with the newGroupExpression
Group newGroup = new Group(groupIdGenerator.getNextId(), newGroupExpression,
newPlan.getLogicalProperties());
groups.put(newGroup.getGroupId(), newGroup);
groupExpressions.put(newGroupExpression, newGroupExpression);
} else {
// case 3:
// if exist the target group, clear all origin group expressions in the
// existedExpression's owner group and reset logical properties, the
// newGroupExpression is the init logical group expression.
reInitGroup(targetGroup, newGroupExpression, newPlan.getLogicalProperties());
// note: put newGroupExpression must behind recycle existedExpression(reInitGroup method),
// because existedExpression maybe equal to the newGroupExpression and recycle
// existedExpression will recycle newGroupExpression
groupExpressions.put(newGroupExpression, newGroupExpression);
}
return CopyInResult.of(true, newGroupExpression);
}
private CopyInResult rewriteByExistedGroupExpression(Group targetGroup, Plan transformedPlan,
GroupExpression existedExpression, GroupExpression newExpression) {
if (targetGroup != null && !targetGroup.equals(existedExpression.getOwnerGroup())) {
// case 4:
existedExpression.propagateApplied(newExpression);
moveParentExpressionsReference(existedExpression.getOwnerGroup(), targetGroup);
recycleGroup(existedExpression.getOwnerGroup());
reInitGroup(targetGroup, newExpression, transformedPlan.getLogicalProperties());
// note: put newGroupExpression must behind recycle existedExpression(reInitGroup method),
// because existedExpression maybe equal to the newGroupExpression and recycle
// existedExpression will recycle newGroupExpression
groupExpressions.put(newExpression, newExpression);
return CopyInResult.of(true, newExpression);
} else {
// case 5:
// if targetGroup is null or targetGroup equal to the existedExpression's ownerGroup,
// then recycle the temporary new group expression
// No ownerGroup, don't need ownerGroup.removeChild()
recycleExpression(newExpression);
return CopyInResult.of(false, existedExpression);
}
}
/**
* eliminate fromGroup, clear targetGroup, then move the logical group expressions in the fromGroup to the toGroup.
* <p>
* the scenario is:
* <pre>
* Group 1(project, the targetGroup) Group 1(logicalOlapScan, the targetGroup)
* | =>
* Group 0(logicalOlapScan, the fromGroup)
* </pre>
* we should recycle the group 0, and recycle all group expressions in group 1, then move the logicalOlapScan to
* the group 1, and reset logical properties of the group 1.
*/
private void eliminateFromGroupAndMoveToTargetGroup(Group fromGroup, Group targetGroup,
LogicalProperties logicalProperties) {
if (fromGroup == targetGroup) {
return;
}
// simple check targetGroup is the ancestors of the fromGroup, not check completely because of performance
if (fromGroup == root) {
throw new IllegalStateException(
"TargetGroup should be ancestors of fromGroup, but fromGroup is root. Maybe a bug");
}
List<GroupExpression> logicalExpressions = fromGroup.clearLogicalExpressions();
recycleGroup(fromGroup);
recycleLogicalAndPhysicalExpressions(targetGroup);
for (GroupExpression logicalExpression : logicalExpressions) {
targetGroup.addLogicalExpression(logicalExpression);
}
targetGroup.setLogicalProperties(logicalProperties);
}
private void reInitGroup(Group group, GroupExpression initLogicalExpression, LogicalProperties logicalProperties) {
recycleLogicalAndPhysicalExpressions(group);
group.setLogicalProperties(logicalProperties);
group.addLogicalExpression(initLogicalExpression);
}
private Plan replaceChildrenToGroupPlan(Plan plan, List<Group> childrenGroups) {
if (childrenGroups.isEmpty()) {
return plan;
}
List<Plan> groupPlanChildren = childrenGroups.stream()
.map(GroupPlan::new)
.collect(ImmutableList.toImmutableList());
return plan.withChildren(groupPlanChildren);
}
/*
* the scenarios that 'parentGroupExpression == toGroup': eliminate the root group.
* the fromGroup is group 1, the toGroup is group 2, we can not replace group2's
* groupExpressions reference the child group which is group 2 (reference itself).
*
* A(group 2) B(group 2)
* | |
* B(group 1) => C(group 0)
* |
* C(group 0)
*
*
* note: the method don't save group and groupExpression to the memo, so you need
* save group and groupExpression to the memo at other place.
*/
private void moveParentExpressionsReference(Group fromGroup, Group toGroup) {
for (GroupExpression parentGroupExpression : fromGroup.getParentGroupExpressions()) {
if (parentGroupExpression.getOwnerGroup() != toGroup) {
parentGroupExpression.replaceChild(fromGroup, toGroup);
}
}
}
/**
* Notice: this func don't replace { Parent GroupExpressions -> this Group }.
*/
private void recycleGroup(Group group) {
// recycle in memo.
if (groups.get(group.getGroupId()) == group) {
groups.remove(group.getGroupId());
}
// recycle children GroupExpression
recycleLogicalAndPhysicalExpressions(group);
}
private void recycleLogicalAndPhysicalExpressions(Group group) {
group.getLogicalExpressions().forEach(this::recycleExpression);
group.clearLogicalExpressions();
group.getPhysicalExpressions().forEach(this::recycleExpression);
group.clearPhysicalExpressions();
}
/**
* Notice: this func don't clear { OwnerGroup() -> this GroupExpression }.
*/
private void recycleExpression(GroupExpression groupExpression) {
// recycle in memo.
if (groupExpressions.get(groupExpression) == groupExpression) {
groupExpressions.remove(groupExpression);
}
// recycle parentGroupExpr in childGroup
groupExpression.children().forEach(childGroup -> {
// if not any groupExpression reference child group, then recycle the child group
if (childGroup.removeParentExpression(groupExpression) == 0) {
recycleGroup(childGroup);
}
});
groupExpression.setOwnerGroup(null);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("root:").append(getRoot()).append("\n");
for (Group group : groups.values()) {
builder.append("\n\n").append(group).append("\n");
}
return builder.toString();
}
/**
* rank all plan and select n-th plan, we write the algorithm according paper:
* * Counting,Enumerating, and Sampling of Execution Plans in a Cost-Based Query Optimizer
* Specifically each physical plan in memo is assigned a unique ID in rank(). And then we sort the
* plan according their cost and choose the n-th plan. Note we don't generate any physical plan in rank
* function.
* <p>
* In unrank() function, we will extract the actual physical function according the unique ID
*/
public Pair<Long, Double> rank(long n) {
double threshold = 0.000000001;
Preconditions.checkArgument(n > 0, "the n %d must be greater than 0 in nthPlan", n);
List<Pair<Long, Cost>> plans = rankGroup(root, PhysicalProperties.GATHER);
plans = plans.stream()
.filter(p -> Double.isFinite(p.second.getValue()))
.collect(Collectors.toList());
// This is big heap, it always pops the element with larger cost or larger id.
PriorityQueue<Pair<Long, Cost>> pq = new PriorityQueue<>((l, r) ->
Math.abs(l.second.getValue() - r.second.getValue()) < threshold
? -Long.compare(l.first, r.first)
: -Double.compare(l.second.getValue(), r.second.getValue()));
for (Pair<Long, Cost> p : plans) {
pq.add(p);
if (pq.size() > n) {
pq.poll();
}
}
Preconditions.checkArgument(pq.peek() != null, "rank error because there is no valid plan");
return Pair.of(pq.peek().first, pq.peek().second.getValue());
}
/**
* return number of plan that can be ranked
*/
public int getRankSize() {
List<Pair<Long, Cost>> plans = rankGroup(root, PhysicalProperties.GATHER);
plans = plans.stream().filter(
p -> !p.second.equals(Double.NaN)
&& !p.second.equals(Double.POSITIVE_INFINITY)
&& !p.second.equals(Double.NEGATIVE_INFINITY))
.collect(Collectors.toList());
return plans.size();
}
private List<Pair<Long, Cost>> rankGroup(Group group, PhysicalProperties prop) {
List<Pair<Long, Cost>> res = new ArrayList<>();
int prefix = 0;
List<GroupExpression> validGroupExprList = extractGroupExpressionSatisfyProp(group, prop);
for (GroupExpression groupExpression : validGroupExprList) {
for (Pair<Long, Cost> idCostPair : rankGroupExpression(groupExpression, prop)) {
res.add(Pair.of(idCostPair.first + prefix, idCostPair.second));
}
prefix = res.size();
// avoid ranking all plans
if (res.size() > 1e2) {
break;
}
}
return res;
}
private List<Pair<Long, Cost>> rankGroupExpression(GroupExpression groupExpression,
PhysicalProperties prop) {
if (!groupExpression.getLowestCostTable().containsKey(prop)) {
return new ArrayList<>();
}
List<Pair<Long, Cost>> res = new ArrayList<>();
if (groupExpression.getPlan() instanceof LeafPlan) {
res.add(Pair.of(0L, groupExpression.getCostValueByProperties(prop)));
return res;
}
List<List<PhysicalProperties>> inputPropertiesList = extractInputProperties(groupExpression, prop);
for (List<PhysicalProperties> inputProperties : inputPropertiesList) {
int prefix = res.size();
List<List<Pair<Long, Cost>>> children = new ArrayList<>();
for (int i = 0; i < inputProperties.size(); i++) {
// To avoid reach a circle, we don't allow ranking the same group with the same physical properties.
Preconditions.checkArgument(!groupExpression.child(i).equals(groupExpression.getOwnerGroup())
|| !prop.equals(inputProperties.get(i)));
List<Pair<Long, Cost>> idCostPair
= rankGroup(groupExpression.child(i), inputProperties.get(i));
children.add(idCostPair);
}
List<Pair<Long, List<Integer>>> childrenId = new ArrayList<>();
permute(children, 0, childrenId, new ArrayList<>());
Cost cost = CostCalculator.calculateCost(connectContext, groupExpression, inputProperties);
for (Pair<Long, List<Integer>> c : childrenId) {
Cost totalCost = cost;
for (int i = 0; i < children.size(); i++) {
totalCost = CostCalculator.addChildCost(connectContext,
groupExpression.getPlan(),
totalCost,
children.get(i).get(c.second.get(i)).second,
i);
}
if (res.isEmpty()) {
Preconditions.checkArgument(
Math.abs(totalCost.getValue() - groupExpression.getCostByProperties(prop)) < 0.0001,
"Please check operator %s, expected cost %s but found %s",
groupExpression.getPlan().shapeInfo(), totalCost.getValue(),
groupExpression.getCostByProperties(prop));
}
res.add(Pair.of(prefix + c.first, totalCost));
}
}
return res;
}
/**
* we permute all children, e.g.,
* for children [1, 2] [1, 2, 3]
* we can get: 0: [1,1] 1:[1, 2] 2:[1, 3] 3:[2, 1] 4:[2, 2] 5:[2, 3]
*/
private void permute(List<List<Pair<Long, Cost>>> children, int index,
List<Pair<Long, List<Integer>>> result, List<Integer> current) {
if (index == children.size()) {
result.add(Pair.of(getUniqueId(children, current), current));
return;
}
for (int i = 0; i < children.get(index).size(); i++) {
List<Integer> next = new ArrayList<>(current);
next.add(i);
permute(children, index + 1, result, next);
}
}
/**
* This method is used to calculate the unique ID for one combination,
* The current is used to represent the index of the child in lists e.g.,
* for children [1], [1, 2], The possible indices and IDs are:
* [0, 0]: 0*1 + 0*1*2
* [0, 1]: 0*1 + 1*1*2
*/
private static long getUniqueId(List<List<Pair<Long, Cost>>> lists, List<Integer> current) {
long id = 0;
long factor = 1;
for (int i = 0; i < lists.size(); i++) {
id += factor * current.get(i);
factor *= lists.get(i).size();
}
return id;
}
private List<GroupExpression> extractGroupExpressionSatisfyProp(Group group, PhysicalProperties prop) {
GroupExpression bestExpr = group.getLowestCostPlan(prop).get().second;
List<GroupExpression> exprs = Lists.newArrayList(bestExpr);
Set<GroupExpression> hasVisited = new HashSet<>();
hasVisited.add(bestExpr);
Stream.concat(group.getPhysicalExpressions().stream(), group.getEnforcers().keySet().stream())
.forEach(groupExpression -> {
if (!groupExpression.getInputPropertiesListOrEmpty(prop).isEmpty()
&& !groupExpression.equals(bestExpr) && !hasVisited.contains(groupExpression)) {
hasVisited.add(groupExpression);
exprs.add(groupExpression);
}
});
return exprs;
}
// ----------------------------------------------------------------
// extract input properties for a given group expression and required output properties
// There are three cases:
// 1. If group expression is enforcer, return the input properties of the best expression
// 2. If group expression require any, return any input properties
// 3. Otherwise, return all input properties that satisfies the required output properties
private List<List<PhysicalProperties>> extractInputProperties(GroupExpression groupExpression,
PhysicalProperties prop) {
List<List<PhysicalProperties>> res = new ArrayList<>();
res.add(groupExpression.getInputPropertiesList(prop));
// return optimized input for enforcer
if (groupExpression.getOwnerGroup().getEnforcers().containsKey(groupExpression)) {
return res;
}
// return any if exits except RequirePropertiesSupplier and SetOperators
// Because PropRegulator could change their input properties
RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(connectContext, prop);
List<List<PhysicalProperties>> requestList = requestPropertyDeriver
.getRequestChildrenPropertyList(groupExpression);
Optional<List<PhysicalProperties>> any = requestList.stream()
.filter(e -> e.stream().allMatch(PhysicalProperties.ANY::equals))
.findAny();
if (any.isPresent()
&& !(groupExpression.getPlan() instanceof RequirePropertiesSupplier)
&& !(groupExpression.getPlan() instanceof SetOperation)) {
res.clear();
res.add(any.get());
return res;
}
// return all optimized inputs
Set<List<PhysicalProperties>> inputProps = groupExpression.getLowestCostTable().keySet().stream()
.filter(physicalProperties -> physicalProperties.satisfy(prop))
.map(groupExpression::getInputPropertiesList)
.collect(Collectors.toSet());
res.addAll(inputProps);
return res;
}
private int getGroupSize(Group group, PhysicalProperties prop,
Map<GroupExpression, List<List<Integer>>> exprSizeCache) {
List<GroupExpression> validGroupExprs = extractGroupExpressionSatisfyProp(group, prop);
int groupCount = 0;
for (GroupExpression groupExpression : validGroupExprs) {
int exprCount = getExprSize(groupExpression, prop, exprSizeCache);
groupCount += exprCount;
if (groupCount > 1e2) {
break;
}
}
return groupCount;
}
// return size for each input properties
private int getExprSize(GroupExpression groupExpression, PhysicalProperties properties,
Map<GroupExpression, List<List<Integer>>> exprChildSizeCache) {
List<List<Integer>> exprCount = new ArrayList<>();
if (!groupExpression.getLowestCostTable().containsKey(properties)) {
exprCount.add(Lists.newArrayList(0));
} else if (groupExpression.getPlan() instanceof LeafPlan) {
exprCount.add(Lists.newArrayList(1));
} else {
List<List<PhysicalProperties>> inputPropertiesList = extractInputProperties(groupExpression, properties);
for (List<PhysicalProperties> inputProperties : inputPropertiesList) {
List<Integer> groupExprSize = new ArrayList<>();
for (int i = 0; i < inputProperties.size(); i++) {
groupExprSize.add(
getGroupSize(groupExpression.child(i), inputProperties.get(i), exprChildSizeCache));
}
exprCount.add(groupExprSize);
}
}
exprChildSizeCache.put(groupExpression, exprCount);
return exprCount.stream()
.mapToInt(s -> s.stream().reduce(1, (a, b) -> a * b))
.sum();
}
private PhysicalPlan unrankGroup(Group group, PhysicalProperties prop, long rank,
Map<GroupExpression, List<List<Integer>>> exprSizeCache) {
int prefix = 0;
for (GroupExpression groupExpression : extractGroupExpressionSatisfyProp(group, prop)) {
int exprCount = exprSizeCache.get(groupExpression).stream()
.mapToInt(s -> s.stream().reduce(1, (a, b) -> a * b))
.sum();
// rank is start from 0
if (exprCount != 0 && rank + 1 - prefix <= exprCount) {
return unrankGroupExpression(groupExpression, prop, rank - prefix,
exprSizeCache);
}
prefix += exprCount;
}
throw new RuntimeException("the group has no plan for prop %s in rank job");
}
private PhysicalPlan unrankGroupExpression(GroupExpression groupExpression, PhysicalProperties prop, long rank,
Map<GroupExpression, List<List<Integer>>> exprSizeCache) {
if (groupExpression.getPlan() instanceof LeafPlan) {
Preconditions.checkArgument(rank == 0,
"leaf plan's %s rank must be 0 but is %d", groupExpression, rank);
return ((PhysicalPlan) groupExpression.getPlan()).withPhysicalPropertiesAndStats(
groupExpression.getOutputProperties(prop),
groupExpression.getOwnerGroup().getStatistics());
}
List<List<PhysicalProperties>> inputPropertiesList = extractInputProperties(groupExpression, prop);
for (int i = 0; i < inputPropertiesList.size(); i++) {
List<PhysicalProperties> properties = inputPropertiesList.get(i);
List<Integer> childrenSize = exprSizeCache.get(groupExpression).get(i);
int count = childrenSize.stream().reduce(1, (a, b) -> a * b);
if (rank >= count) {
rank -= count;
continue;
}
List<Long> childrenRanks = extractChildRanks(rank, childrenSize);
List<Plan> childrenPlan = new ArrayList<>();
for (int j = 0; j < properties.size(); j++) {
Plan plan = unrankGroup(groupExpression.child(j), properties.get(j),
childrenRanks.get(j), exprSizeCache);
Preconditions.checkArgument(plan != null, "rank group get null");
childrenPlan.add(plan);
}
Plan plan = groupExpression.getPlan().withChildren(childrenPlan);
return ((PhysicalPlan) plan).withPhysicalPropertiesAndStats(
groupExpression.getOutputProperties(prop),
groupExpression.getOwnerGroup().getStatistics());
}
throw new RuntimeException("the groupExpr has no plan for prop in rank job");
}
/**
* This method is used to decode ID for each child, which is the opposite of getUniqueID method, e.g.,
* 0: [0%1, 0%(1*2)]
* 1: [1%1, 1%(1*2)]
* 2: [2%1, 2%(1*2)]
*/
private List<Long> extractChildRanks(long rank, List<Integer> childrenSize) {
Preconditions.checkArgument(!childrenSize.isEmpty(), "children should not empty in extractChildRanks");
List<Long> indices = new ArrayList<>();
for (int i = 0; i < childrenSize.size(); i++) {
int factor = childrenSize.get(i);
indices.add(rank % factor);
rank = rank / factor;
}
return indices;
}
public PhysicalPlan unrank(long id) {
Map<GroupExpression, List<List<Integer>>> exprSizeCache = new HashMap<>();
getGroupSize(getRoot(), PhysicalProperties.GATHER, exprSizeCache);
return unrankGroup(getRoot(), PhysicalProperties.GATHER, id, exprSizeCache);
}
}