EnforceMissingPropertiesHelper.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.properties;
import org.apache.doris.nereids.cost.Cost;
import org.apache.doris.nereids.cost.CostCalculator;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.metrics.EventChannel;
import org.apache.doris.nereids.metrics.EventProducer;
import org.apache.doris.nereids.metrics.consumer.LogConsumer;
import org.apache.doris.nereids.metrics.event.EnforcerEvent;
import org.apache.doris.nereids.minidump.NereidsTracer;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
/**
* When parent node request some properties but children don't have.
* Enforce add missing properties for child.
*/
public class EnforceMissingPropertiesHelper {
private static final EventProducer ENFORCER_TRACER = new EventProducer(EnforcerEvent.class,
EventChannel.getDefaultChannel().addConsumers(new LogConsumer(EnforcerEvent.class, EventChannel.LOG)));
private final ConnectContext connectContext;
private final GroupExpression groupExpression;
private Cost curTotalCost;
public EnforceMissingPropertiesHelper(ConnectContext connectContext, GroupExpression groupExpression,
Cost curTotalCost) {
this.connectContext = connectContext;
this.groupExpression = groupExpression;
this.curTotalCost = curTotalCost;
}
public Cost getCurTotalCost() {
return curTotalCost;
}
/**
* Enforce missing property.
*/
public PhysicalProperties enforceProperty(PhysicalProperties output, PhysicalProperties required) {
boolean isSatisfyOrder = output.getOrderSpec().satisfy(required.getOrderSpec());
boolean isSatisfyDistribution = output.getDistributionSpec().satisfy(required.getDistributionSpec());
if (isSatisfyDistribution && isSatisfyOrder) {
return output;
}
if (!isSatisfyDistribution && !isSatisfyOrder) {
return enforceSortAndDistribution(output, required);
}
if (!isSatisfyOrder) {
return enforceLocalSort(output, required);
}
if (!required.getOrderSpec().getOrderKeys().isEmpty()) {
// After redistribute data , original order required may be wrong.
return enforceDistributionButMeetSort(output, required);
}
return enforceDistribution(output, required);
}
/**
* When requestProperty include sort, enforce distribution may break the original sort.
* <p>
* But if we add enforce sort, it may cause infinite loop.
* <p>
* trick, use {[empty order], Any} to eliminate the original property.
*/
private PhysicalProperties enforceDistributionButMeetSort(PhysicalProperties output, PhysicalProperties request) {
groupExpression.getOwnerGroup()
.replaceBestPlanProperty(
output, PhysicalProperties.ANY, groupExpression.getCostValueByProperties(output));
return enforceSortAndDistribution(PhysicalProperties.ANY, request);
}
private PhysicalProperties enforceGlobalSort(PhysicalProperties oldOutputProperty, PhysicalProperties required) {
// keep consistent in DistributionSpec with the oldOutputProperty
PhysicalProperties newOutputProperty = new PhysicalProperties(
oldOutputProperty.getDistributionSpec(), required.getOrderSpec());
GroupExpression enforcer = required.getOrderSpec().addGlobalQuickSortEnforcer(groupExpression.getOwnerGroup());
addEnforcerUpdateCost(enforcer, oldOutputProperty, newOutputProperty);
return newOutputProperty;
}
private PhysicalProperties enforceLocalSort(PhysicalProperties oldOutputProperty, PhysicalProperties required) {
// keep consistent in DistributionSpec with the oldOutputProperty
PhysicalProperties newOutputProperty = new PhysicalProperties(
oldOutputProperty.getDistributionSpec(), required.getOrderSpec());
GroupExpression enforcer = required.getOrderSpec().addLocalQuickSortEnforcer(groupExpression.getOwnerGroup());
addEnforcerUpdateCost(enforcer, oldOutputProperty, newOutputProperty);
return newOutputProperty;
}
private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputProperty, PhysicalProperties required) {
DistributionSpec outputDistributionSpec;
DistributionSpec requiredDistributionSpec = required.getDistributionSpec();
if (requiredDistributionSpec instanceof DistributionSpecHash) {
DistributionSpecHash requiredDistributionSpecHash = (DistributionSpecHash) requiredDistributionSpec;
outputDistributionSpec = requiredDistributionSpecHash.withShuffleType(ShuffleType.EXECUTION_BUCKETED);
} else {
outputDistributionSpec = requiredDistributionSpec;
}
PhysicalProperties newOutputProperty = new PhysicalProperties(outputDistributionSpec);
GroupExpression enforcer = outputDistributionSpec.addEnforcer(groupExpression.getOwnerGroup());
addEnforcerUpdateCost(enforcer, oldOutputProperty, newOutputProperty);
return newOutputProperty;
}
private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty,
PhysicalProperties requiredProperty) {
PhysicalProperties enforcedProperty = outputProperty;
if (requiredProperty.getDistributionSpec().equals(new DistributionSpecGather())) {
// NOTICE: if output is must shuffle, we must do distribution first. so add a random shuffle here.
if (outputProperty.getDistributionSpec() instanceof DistributionSpecMustShuffle) {
enforcedProperty = enforceDistribution(enforcedProperty, PhysicalProperties.EXECUTION_ANY);
}
enforcedProperty = enforceLocalSort(enforcedProperty, requiredProperty);
enforcedProperty = enforceDistribution(enforcedProperty, requiredProperty);
enforcedProperty = enforceGlobalSort(enforcedProperty, requiredProperty);
} else {
enforcedProperty = enforceDistribution(outputProperty, requiredProperty);
enforcedProperty = enforceLocalSort(enforcedProperty, requiredProperty);
}
return enforcedProperty;
}
/**
* Add enforcer plan, update cost, update property of enforcer, and setBestPlan
*/
private void addEnforcerUpdateCost(GroupExpression enforcer,
PhysicalProperties oldOutputProperty,
PhysicalProperties newOutputProperty) {
groupExpression.getOwnerGroup().addEnforcer(enforcer);
NereidsTracer.logEnforcerEvent(enforcer.getOwnerGroup().getGroupId(), groupExpression.getPlan(),
oldOutputProperty, newOutputProperty);
ENFORCER_TRACER.log(EnforcerEvent.of(groupExpression, ((PhysicalPlan) enforcer.getPlan()),
oldOutputProperty, newOutputProperty));
enforcer.setEstOutputRowCount(enforcer.getOwnerGroup().getStatistics().getRowCount());
Cost enforcerCost = CostCalculator.calculateCost(connectContext, enforcer,
Lists.newArrayList(oldOutputProperty));
enforcer.setCost(enforcerCost);
curTotalCost = CostCalculator.addChildCost(
connectContext,
enforcer.getPlan(),
enforcerCost,
curTotalCost,
0);
if (enforcer.updateLowestCostTable(newOutputProperty,
Lists.newArrayList(oldOutputProperty), curTotalCost)) {
enforcer.putOutputPropertiesMap(newOutputProperty, newOutputProperty);
}
groupExpression.getOwnerGroup().setBestPlan(enforcer, curTotalCost, newOutputProperty);
}
}