RequestPropertyDeriver.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.properties;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.PlanContext;
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.DistributeType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Used for parent property drive.
*/
public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
/*
* requestPropertyFromParent
* │
* ▼
* curNode (current plan node in current CostAndEnforcerJob)
* │
* ▼
* requestPropertyToChildren
*/
private final ConnectContext connectContext;
private final PhysicalProperties requestPropertyFromParent;
private List<List<PhysicalProperties>> requestPropertyToChildren;
public RequestPropertyDeriver(ConnectContext connectContext, JobContext context) {
this.connectContext = connectContext;
this.requestPropertyFromParent = context.getRequiredProperties();
}
public RequestPropertyDeriver(ConnectContext connectContext, PhysicalProperties requestPropertyFromParent) {
this.connectContext = connectContext;
this.requestPropertyFromParent = requestPropertyFromParent;
}
/**
* get request children property list
*/
public List<List<PhysicalProperties>> getRequestChildrenPropertyList(GroupExpression groupExpression) {
requestPropertyToChildren = Lists.newArrayList();
groupExpression.getPlan().accept(this, new PlanContext(connectContext, groupExpression));
return requestPropertyToChildren;
}
@Override
public Void visit(Plan plan, PlanContext context) {
if (plan instanceof RequirePropertiesSupplier) {
RequireProperties requireProperties = ((RequirePropertiesSupplier<?>) plan).getRequireProperties();
List<PhysicalProperties> requestPhysicalProperties =
requireProperties.computeRequirePhysicalProperties(plan, requestPropertyFromParent);
addRequestPropertyToChildren(requestPhysicalProperties);
return null;
}
List<PhysicalProperties> requiredPropertyList =
Lists.newArrayListWithCapacity(context.arity());
for (int i = context.arity(); i > 0; --i) {
requiredPropertyList.add(PhysicalProperties.ANY);
}
addRequestPropertyToChildren(requiredPropertyList);
return null;
}
/* ********************************************************************************************
* sink Node, in lexicographical order
* ******************************************************************************************** */
@Override
public Void visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink, PlanContext context) {
if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(olapTableSink.getRequirePhysicalProperties());
}
return null;
}
@Override
public Void visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink, PlanContext context) {
if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(hiveTableSink.getRequirePhysicalProperties());
}
return null;
}
@Override
public Void visitPhysicalIcebergTableSink(
PhysicalIcebergTableSink<? extends Plan> icebergTableSink, PlanContext context) {
if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(icebergTableSink.getRequirePhysicalProperties());
}
return null;
}
@Override
public Void visitPhysicalJdbcTableSink(
PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, PlanContext context) {
// Always use gather properties for jdbcTableSink
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}
@Override
public Void visitPhysicalDictionarySink(PhysicalDictionarySink<? extends Plan> dictionarySink,
PlanContext context) {
addRequestPropertyToChildren(dictionarySink.getRequirePhysicalProperties());
return null;
}
@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
if (context.getSessionVariable().enableParallelResultSink()
&& !context.getStatementContext().isShortCircuitQuery()) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
}
return null;
}
@Override
public Void visitPhysicalDeferMaterializeResultSink(
PhysicalDeferMaterializeResultSink<? extends Plan> sink,
PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}
/* ********************************************************************************************
* Other Node, in lexicographical order
* ******************************************************************************************** */
@Override
public Void visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}
@Override
public Void visitPhysicalCTEAnchor(PhysicalCTEAnchor<? extends Plan, ? extends Plan> cteAnchor,
PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.ANY, requestPropertyFromParent);
return null;
}
@Override
public Void visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, PlanContext context) {
DistributeHint hint = hashJoin.getDistributeHint();
if (hint.distributeType == DistributeType.BROADCAST_RIGHT && JoinUtils.couldBroadcast(hashJoin)) {
addBroadcastJoinRequestProperty();
hint.setStatus(Hint.HintStatus.SUCCESS);
return null;
}
if (hint.distributeType == DistributeType.SHUFFLE_RIGHT && JoinUtils.couldShuffle(hashJoin)) {
addShuffleJoinRequestProperty(hashJoin);
hint.setStatus(Hint.HintStatus.SUCCESS);
return null;
}
// for shuffle join
if (JoinUtils.couldShuffle(hashJoin)) {
addShuffleJoinRequestProperty(hashJoin);
}
// for broadcast join
if (JoinUtils.couldBroadcast(hashJoin)
&& (JoinUtils.checkBroadcastJoinStats(hashJoin) || requestPropertyToChildren.isEmpty())) {
addBroadcastJoinRequestProperty();
}
return null;
}
@Override
public Void visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, PlanContext context) {
if (limit.isGlobal()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else {
addRequestPropertyToChildren(PhysicalProperties.ANY);
}
return null;
}
@Override
public Void visitPhysicalNestedLoopJoin(
PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> nestedLoopJoin, PlanContext context) {
// see canParallelize() in NestedLoopJoinNode
if (nestedLoopJoin.getJoinType().isCrossJoin() || nestedLoopJoin.getJoinType().isInnerJoin()
|| nestedLoopJoin.getJoinType().isLeftJoin()) {
addRequestPropertyToChildren(PhysicalProperties.ANY, PhysicalProperties.REPLICATED);
} else {
addRequestPropertyToChildren(PhysicalProperties.GATHER, PhysicalProperties.GATHER);
}
return null;
}
@Override
public Void visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanContext context) {
// intersect and except need do distinct, so we must do distribution on it.
DistributionSpec distributionRequestFromParent = requestPropertyFromParent.getDistributionSpec();
if (distributionRequestFromParent instanceof DistributionSpecHash) {
// shuffle according to parent require
DistributionSpecHash distributionSpecHash = (DistributionSpecHash) distributionRequestFromParent;
addRequestPropertyToChildren(createHashRequestAccordingToParent(
setOperation, distributionSpecHash, context));
} else {
// shuffle all column
// TODO: for wide table, may be we should add a upper limit of shuffle columns
addRequestPropertyToChildren(setOperation.getRegularChildrenOutputs().stream()
.map(childOutputs -> childOutputs.stream()
.map(SlotReference::getExprId)
.collect(ImmutableList.toImmutableList()))
.map(l -> PhysicalProperties.createHash(l, ShuffleType.EXECUTION_BUCKETED))
.collect(Collectors.toList()));
}
return null;
}
@Override
public Void visitPhysicalUnion(PhysicalUnion union, PlanContext context) {
// TODO: we do not generate gather union until we could do better cost computation on set operation
List<PhysicalProperties> requiredPropertyList =
Lists.newArrayListWithCapacity(context.arity());
if (union.getConstantExprsList().isEmpty()) {
// translate requestPropertyFromParent to other children's request.
DistributionSpec distributionRequestFromParent = requestPropertyFromParent.getDistributionSpec();
if (distributionRequestFromParent instanceof DistributionSpecHash) {
DistributionSpecHash distributionSpecHash = (DistributionSpecHash) distributionRequestFromParent;
requiredPropertyList = createHashRequestAccordingToParent(union, distributionSpecHash, context);
} else {
for (int i = context.arity(); i > 0; --i) {
requiredPropertyList.add(PhysicalProperties.ANY);
}
}
} else {
// current be could not run const expr on appropriate node,
// so if we have constant exprs on union, the output of union always any
// then any other request on children is useless.
for (int i = context.arity(); i > 0; --i) {
requiredPropertyList.add(PhysicalProperties.ANY);
}
}
addRequestPropertyToChildren(requiredPropertyList);
return null;
}
@Override
public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, PlanContext context) {
if (!sort.getSortPhase().isLocal()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else {
addRequestPropertyToChildren(PhysicalProperties.ANY);
}
return null;
}
@Override
public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
if (partitionTopN.getPhase().isTwoPhaseLocal()) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal()
|| partitionTopN.getPhase().isOnePhaseGlobal(),
"partition topn phase is not two phase global or one phase global");
PhysicalProperties properties = PhysicalProperties.createHash(partitionTopN.getPartitionKeys(),
ShuffleType.REQUIRE);
addRequestPropertyToChildren(properties);
}
return null;
}
@Override
public Void visitPhysicalProject(PhysicalProject<? extends Plan> project, PlanContext context) {
DistributionSpec parentDist = requestPropertyFromParent.getDistributionSpec();
if (!(parentDist instanceof DistributionSpecHash)) {
return super.visitPhysicalProject(project, context);
}
DistributionSpecHash hashDist = (DistributionSpecHash) parentDist;
Map<ExprId, NamedExpression> exprIdToProjection = project.getProjects().stream()
.collect(Collectors.toMap(NamedExpression::getExprId, n -> n, (n1, n2) -> n1));
Map<ExprId, ExprId> exprIdMap = Maps.newHashMap();
for (ExprId exprId : hashDist.getExprIdToEquivalenceSet().keySet()) {
if (!exprIdToProjection.containsKey(exprId)) {
return super.visitPhysicalProject(project, context);
}
NamedExpression projection = exprIdToProjection.get(exprId);
if (projection instanceof Alias) {
if (((Alias) projection).child() instanceof SlotReference) {
exprIdMap.put(exprId, ((SlotReference) ((Alias) projection).child()).getExprId());
} else {
return super.visitPhysicalProject(project, context);
}
} else if (projection instanceof SlotReference) {
exprIdMap.put(exprId, exprId);
} else {
return super.visitPhysicalProject(project, context);
}
}
addRequestPropertyToChildren(PhysicalProperties.ANY);
addRequestPropertyToChildren(new PhysicalProperties(
hashDist.project(exprIdMap, ImmutableSet.of(), DistributionSpecAny.INSTANCE)));
return null;
}
@Override
public Void visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, PlanContext context) {
DistributionSpec parentDist = requestPropertyFromParent.getDistributionSpec();
if (!(parentDist instanceof DistributionSpecHash)) {
return super.visitPhysicalFilter(filter, context);
}
addRequestPropertyToChildren(PhysicalProperties.ANY);
addRequestPropertyToChildren(new PhysicalProperties(parentDist));
return null;
}
@Override
public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
addRequestPropertyToChildren(fileSink.requestProperties(connectContext));
return null;
}
@Override
public Void visitPhysicalWindow(PhysicalWindow<? extends Plan> window, PlanContext context) {
// requiredProperties:
// Distribution: partitionKeys
// Order: requiredOrderKeys
WindowFrameGroup windowFrameGroup = window.getWindowFrameGroup();
// all keys that need to be sorted, which includes BOTH partitionKeys and orderKeys from this group
List<OrderKey> keysNeedToBeSorted = Lists.newArrayList();
if (!windowFrameGroup.getPartitionKeys().isEmpty()) {
keysNeedToBeSorted.addAll(windowFrameGroup.getPartitionKeys().stream().map(partitionKey -> {
// todo: haven't support isNullFirst, and its default value is false(see AnalyticPlanner,
// but in LogicalPlanBuilder, its default value is true)
return new OrderKey(partitionKey, true, false);
}).collect(Collectors.toList()));
}
if (!windowFrameGroup.getOrderKeys().isEmpty()) {
keysNeedToBeSorted.addAll(windowFrameGroup.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(Collectors.toList())
);
}
if (windowFrameGroup.getPartitionKeys().isEmpty() && windowFrameGroup.getOrderKeys().isEmpty()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else if (windowFrameGroup.getPartitionKeys().isEmpty() && !windowFrameGroup.getOrderKeys().isEmpty()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(keysNeedToBeSorted)));
} else if (!windowFrameGroup.getPartitionKeys().isEmpty()) {
addRequestPropertyToChildren(PhysicalProperties.createHash(
windowFrameGroup.getPartitionKeys(), ShuffleType.REQUIRE)
.withOrderSpec(new OrderSpec(keysNeedToBeSorted)));
}
return null;
}
private List<PhysicalProperties> createHashRequestAccordingToParent(
SetOperation setOperation, DistributionSpecHash distributionRequestFromParent, PlanContext context) {
List<PhysicalProperties> requiredPropertyList =
Lists.newArrayListWithCapacity(context.arity());
int[] outputOffsets = new int[distributionRequestFromParent.getOrderedShuffledColumns().size()];
List<NamedExpression> setOperationOutputs = setOperation.getOutputs();
// get the offset of bucketed columns of set operation
for (int i = 0; i < setOperationOutputs.size(); i++) {
int offset = distributionRequestFromParent.getExprIdToEquivalenceSet()
.getOrDefault(setOperationOutputs.get(i).getExprId(), -1);
if (offset >= 0) {
outputOffsets[offset] = i;
}
}
// use the offset to generate children's request
for (int i = 0; i < context.arity(); i++) {
List<SlotReference> childOutput = setOperation.getRegularChildOutput(i);
ImmutableList.Builder<ExprId> childRequest = ImmutableList.builder();
for (int offset : outputOffsets) {
childRequest.add(childOutput.get(offset).getExprId());
}
requiredPropertyList.add(PhysicalProperties.createHash(
childRequest.build(), distributionRequestFromParent.getShuffleType()));
}
return requiredPropertyList;
}
private void addBroadcastJoinRequestProperty() {
addRequestPropertyToChildren(PhysicalProperties.ANY, PhysicalProperties.REPLICATED);
}
private void addShuffleJoinRequestProperty(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin) {
Pair<List<ExprId>, List<ExprId>> onClauseUsedSlots = hashJoin.getHashConjunctsExprIds();
// shuffle join
addRequestPropertyToChildren(
PhysicalProperties.createHash(
new DistributionSpecHash(onClauseUsedSlots.first, ShuffleType.REQUIRE)),
PhysicalProperties.createHash(
new DistributionSpecHash(onClauseUsedSlots.second, ShuffleType.REQUIRE)));
}
/**
* helper function to assemble request children physical properties
* @param physicalProperties one set request properties for children
*/
private void addRequestPropertyToChildren(PhysicalProperties... physicalProperties) {
requestPropertyToChildren.add(Lists.newArrayList(physicalProperties));
}
private void addRequestPropertyToChildren(List<PhysicalProperties> physicalProperties) {
requestPropertyToChildren.add(physicalProperties);
}
}