ChildrenPropertiesRegulator.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.properties;

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.cost.Cost;
import org.apache.doris.nereids.cost.CostCalculator;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.trees.expressions.AggregateExpression;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * ensure child add enough distribute. update children properties if we do regular.
 * NOTICE: all visitor should call visit(plan, context) at proper place
 * to process must shuffle except project and filter
 */
public class ChildrenPropertiesRegulator extends PlanVisitor<List<List<PhysicalProperties>>, Void> {

    private final GroupExpression parent;
    private final List<GroupExpression> children;
    private final List<PhysicalProperties> originChildrenProperties;
    private final List<PhysicalProperties> requiredProperties;
    private final JobContext jobContext;

    public ChildrenPropertiesRegulator(GroupExpression parent, List<GroupExpression> children,
            List<PhysicalProperties> originChildrenProperties, List<PhysicalProperties> requiredProperties,
            JobContext jobContext) {
        this.parent = parent;
        this.children = children;
        this.originChildrenProperties = originChildrenProperties;
        this.requiredProperties = requiredProperties;
        this.jobContext = jobContext;
    }

    /**
     * adjust children properties
     *
     * @return enforce cost.
     */
    public List<List<PhysicalProperties>> adjustChildrenProperties() {
        return parent.getPlan().accept(this, null);
    }

    @Override
    public List<List<PhysicalProperties>> visit(Plan plan, Void context) {
        // process must shuffle
        for (int i = 0; i < children.size(); i++) {
            DistributionSpec distributionSpec = originChildrenProperties.get(i).getDistributionSpec();
            if (distributionSpec instanceof DistributionSpecMustShuffle) {
                updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY);
            }
        }
        return ImmutableList.of(originChildrenProperties);
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalHashAggregate(
            PhysicalHashAggregate<? extends Plan> agg, Void context) {
        if (agg.getGroupByExpressions().isEmpty() && agg.getOutputExpressions().isEmpty()) {
            return ImmutableList.of();
        }
        if (!agg.getAggregateParam().canBeBanned) {
            return ImmutableList.of(originChildrenProperties);
        }
        // forbid one phase agg on distribute
        if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && children.get(0).getPlan() instanceof PhysicalDistribute) {
            // this means one stage gather agg, usually bad pattern
            return ImmutableList.of();
        }

        // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
        // TODO: this is forbid good plan after cte reuse by mistake
        if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER
                && requiredProperties.get(0).getDistributionSpec() instanceof DistributionSpecHash
                && children.get(0).getPlan() instanceof PhysicalDistribute) {
            return ImmutableList.of();
        }

        // agg(group by x)-union all(A, B)
        // no matter x.ndv is high or not, it is not worthwhile to shuffle A and B by x
        // and hence we forbid one phase agg
        if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
                && children.get(0).getPlan() instanceof PhysicalUnion
                && !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) {
            return ImmutableList.of();
        }
        // forbid multi distinct opt that bad than multi-stage version when multi-stage can be executed in one fragment
        if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() == AggMode.INPUT_TO_RESULT) {
            List<MultiDistinction> multiDistinctions = agg.getOutputExpressions().stream()
                    .filter(Alias.class::isInstance)
                    .map(a -> ((Alias) a).child())
                    .filter(AggregateExpression.class::isInstance)
                    .map(a -> ((AggregateExpression) a).getFunction())
                    .filter(MultiDistinction.class::isInstance)
                    .map(MultiDistinction.class::cast)
                    .collect(Collectors.toList());
            if (multiDistinctions.size() == 1) {
                Expression distinctChild = multiDistinctions.get(0).child(0);
                DistributionSpec childDistribution = originChildrenProperties.get(0).getDistributionSpec();
                if (distinctChild instanceof SlotReference && childDistribution instanceof DistributionSpecHash) {
                    SlotReference slotReference = (SlotReference) distinctChild;
                    DistributionSpecHash distributionSpecHash = (DistributionSpecHash) childDistribution;
                    List<ExprId> groupByColumns = agg.getGroupByExpressions().stream()
                            .map(SlotReference.class::cast)
                            .map(SlotReference::getExprId)
                            .collect(Collectors.toList());
                    DistributionSpecHash groupByRequire = new DistributionSpecHash(
                            groupByColumns, ShuffleType.REQUIRE);
                    List<ExprId> distinctChildColumns = Lists.newArrayList(slotReference.getExprId());
                    distinctChildColumns.add(slotReference.getExprId());
                    DistributionSpecHash distinctChildRequire = new DistributionSpecHash(
                            distinctChildColumns, ShuffleType.REQUIRE);
                    if ((!groupByColumns.isEmpty() && distributionSpecHash.satisfy(groupByRequire))
                            || (groupByColumns.isEmpty() && distributionSpecHash.satisfy(distinctChildRequire))) {
                        if (!agg.mustUseMultiDistinctAgg()) {
                            return ImmutableList.of();
                        }
                    }
                }
                // if distinct without group by key, we prefer three or four stage distinct agg
                // because the second phase of multi-distinct only have one instance, and it is slow generally.
                if (agg.getOutputExpressions().size() == 1 && agg.getGroupByExpressions().isEmpty()
                        && !agg.mustUseMultiDistinctAgg()) {
                    return ImmutableList.of();
                }
            }
        }
        // process must shuffle
        visit(agg, context);
        // process agg
        return ImmutableList.of(originChildrenProperties);
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalPartitionTopN(
            PhysicalPartitionTopN<? extends Plan> partitionTopN, Void context) {
        if (partitionTopN.getPhase().isOnePhaseGlobal() && children.get(0).getPlan() instanceof PhysicalDistribute) {
            // one phase partition topn, if the child is an enforced distribution, discard this
            // and use two phase candidate.
            return ImmutableList.of();
        } else if (partitionTopN.getPhase().isTwoPhaseGlobal()
                && !(children.get(0).getPlan() instanceof PhysicalDistribute)) {
            // two phase partition topn, if global's child is not distribution, which means
            // the local distribution has met final requirement, discard this candidate.
            return ImmutableList.of();
        } else {
            visit(partitionTopN, context);
            return ImmutableList.of(originChildrenProperties);
        }
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void context) {
        // do not process must shuffle
        if (children.get(0).getPlan() instanceof PhysicalDistribute) {
            return ImmutableList.of();
        }
        return ImmutableList.of(originChildrenProperties);
    }

    private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) {
        // improper to do bucket shuffle join:
        // oneSide:
        //      - base table and tablets' number is small enough (< paraInstanceNum)
        // otherSide:
        //      - ShuffleType.EXECUTION_BUCKETED
        boolean isEnableBucketShuffleJoin = ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin();
        if (!isEnableBucketShuffleJoin) {
            return true;
        } else if (!(oneSidePlan instanceof GroupPlan)) {
            return false;
        } else {
            PhysicalOlapScan candidate = findDownGradeBucketShuffleCandidate((GroupPlan) oneSidePlan);
            if (candidate == null || candidate.getTable() == null
                    || candidate.getTable().getDefaultDistributionInfo() == null) {
                return false;
            } else {
                int prunedPartNum = candidate.getSelectedPartitionIds().size();
                int bucketNum = candidate.getTable().getDefaultDistributionInfo().getBucketNum();
                int totalBucketNum = prunedPartNum * bucketNum;
                int backEndNum = Math.max(1, ConnectContext.get().getEnv().getClusterInfo()
                        .getBackendsNumber(true));
                int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
                int totalParaNum = Math.min(10, backEndNum * paraNum);
                return totalBucketNum < totalParaNum;
            }
        }
    }

    private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan groupPlan) {
        if (groupPlan == null || groupPlan.getGroup() == null
                || groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
            return null;
        } else {
            Plan targetPlan = groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
            while (targetPlan != null
                    && (targetPlan instanceof PhysicalProject || targetPlan instanceof PhysicalFilter)
                    && !((GroupPlan) targetPlan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
                targetPlan = ((GroupPlan) targetPlan.child(0)).getGroup()
                        .getPhysicalExpressions().get(0).getPlan();
            }
            if (targetPlan == null || !(targetPlan instanceof PhysicalOlapScan)) {
                return null;
            } else {
                return (PhysicalOlapScan) targetPlan;
            }
        }
    }

    private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSpecHash leftHashSpec,
            DistributionSpecHash rightHashSpec) {
        boolean isJoinTypeInScope = (joinType == JoinType.RIGHT_ANTI_JOIN
                || joinType == JoinType.RIGHT_OUTER_JOIN
                || joinType == JoinType.FULL_OUTER_JOIN);
        boolean isSpecInScope = (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
                || rightHashSpec.getShuffleType() == ShuffleType.NATURAL);
        return isJoinTypeInScope && isSpecInScope && !SessionVariable.canUseNereidsDistributePlanner();
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalHashJoin(
            PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void context) {
        Preconditions.checkArgument(children.size() == 2, "children.size() != 2");
        Preconditions.checkArgument(originChildrenProperties.size() == 2);
        Preconditions.checkArgument(requiredProperties.size() == 2);
        // process must shuffle
        visit(hashJoin, context);
        // process hash join
        DistributionSpec leftDistributionSpec = originChildrenProperties.get(0).getDistributionSpec();
        DistributionSpec rightDistributionSpec = originChildrenProperties.get(1).getDistributionSpec();

        // broadcast do not need regular
        if (rightDistributionSpec instanceof DistributionSpecReplicated) {
            return ImmutableList.of(originChildrenProperties);
        }

        // shuffle
        if (!(leftDistributionSpec instanceof DistributionSpecHash)
                || !(rightDistributionSpec instanceof DistributionSpecHash)) {
            throw new RuntimeException("should not come here, two children of shuffle join should all be shuffle");
        }

        Plan leftChild = hashJoin.child(0);
        Plan rightChild = hashJoin.child(1);

        DistributionSpecHash leftHashSpec = (DistributionSpecHash) leftDistributionSpec;
        DistributionSpecHash rightHashSpec = (DistributionSpecHash) rightDistributionSpec;

        Optional<PhysicalProperties> updatedForLeft = Optional.empty();
        Optional<PhysicalProperties> updatedForRight = Optional.empty();

        if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
            // check colocate join with scan
            return ImmutableList.of(originChildrenProperties);
        } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) {
            // right anti, right outer, full outer join could not do bucket shuffle join
            // TODO remove this after we refactor coordinator
            updatedForLeft = Optional.of(calAnotherSideRequired(
                    ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
            updatedForLeft = Optional.of(calAnotherSideRequired(
                    ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
            updatedForLeft = Optional.of(calAnotherSideRequired(
                    ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.EXECUTION_BUCKETED, rightHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
                && rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
                && rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
            if (SessionVariable.canUseNereidsDistributePlanner()) {
                List<PhysicalProperties> shuffleToLeft = Lists.newArrayList(originChildrenProperties);
                PhysicalProperties enforceShuffleRight = calAnotherSideRequired(
                        ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec());
                updateChildEnforceAndCost(1, enforceShuffleRight, shuffleToLeft);

                List<PhysicalProperties> shuffleToRight = Lists.newArrayList(originChildrenProperties);
                PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
                        ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()
                );
                updateChildEnforceAndCost(0, enforceShuffleLeft, shuffleToRight);
                return ImmutableList.of(shuffleToLeft, shuffleToRight);
            }

            // must add enforce because shuffle algorithm is not same between NATURAL and BUCKETED
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
                && rightHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED) {
            if (bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())) {
                return ImmutableList.of(originChildrenProperties);
            }
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
                && rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
            if (SessionVariable.canUseNereidsDistributePlanner()) {
                // nereids coordinator can exchange left side to right side to do bucket shuffle join
                // TODO: maybe we should check if left child is PhysicalDistribute.
                //  If so add storage bucketed shuffle on left side. Other wise,
                //  add execution bucketed shuffle on right side.
                // updatedForLeft = Optional.of(calAnotherSideRequired(
                //         ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
                //         (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                //         (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
                List<PhysicalProperties> shuffleToLeft = Lists.newArrayList(originChildrenProperties);
                PhysicalProperties enforceShuffleRight = calAnotherSideRequired(
                        ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec());
                updateChildEnforceAndCost(1, enforceShuffleRight, shuffleToLeft);

                List<PhysicalProperties> shuffleToRight = Lists.newArrayList(originChildrenProperties);
                PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
                        ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()
                );
                updateChildEnforceAndCost(0, enforceShuffleLeft, shuffleToRight);
                return ImmutableList.of(shuffleToLeft, shuffleToRight);
            } else {
                // legacy coordinator could not do right be selection in this case,
                // since it always to check the left most node whether olap scan node.
                // so we can only shuffle right to left side to do normal shuffle join
                updatedForRight = Optional.of(calAnotherSideRequired(
                        ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
            }
        } else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
                && rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
            if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())) {
                return ImmutableList.of(originChildrenProperties);
            }
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if ((leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
                && rightHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED)) {
            if (children.get(0).getPlan() instanceof PhysicalDistribute) {
                updatedForLeft = Optional.of(calAnotherSideRequired(
                        ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
            } else {
                updatedForRight = Optional.of(calAnotherSideRequired(
                        ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
            }
        } else if ((leftHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED
                && rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
            // TODO: we must do shuffle on right because coordinator could not do right be selection in this case,
            //  since it always to check the left most node whether olap scan node.
            //  after we fix coordinator problem, we could do right to left bucket shuffle
            updatedForRight = Optional.of(calAnotherSideRequired(
                    ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
        } else if ((leftHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED
                && rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED)) {
            if (children.get(0).getPlan() instanceof PhysicalDistribute) {
                updatedForLeft = Optional.of(calAnotherSideRequired(
                        ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
            } else {
                updatedForRight = Optional.of(calAnotherSideRequired(
                        ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
            }

        } else if ((leftHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED
                && rightHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED)) {
            if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
                    (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                    (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())) {
                return ImmutableList.of(originChildrenProperties);
            }
            if (children.get(0).getPlan() instanceof PhysicalDistribute) {
                updatedForLeft = Optional.of(calAnotherSideRequired(
                        ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
            } else {
                updatedForRight = Optional.of(calAnotherSideRequired(
                        ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
            }
        }

        updatedForLeft.ifPresent(physicalProperties -> updateChildEnforceAndCost(0, physicalProperties));
        updatedForRight.ifPresent(physicalProperties -> updateChildEnforceAndCost(1, physicalProperties));

        return ImmutableList.of(originChildrenProperties);
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalNestedLoopJoin(
            PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> nestedLoopJoin, Void context) {
        Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size()));
        Preconditions.checkArgument(originChildrenProperties.size() == 2);
        Preconditions.checkArgument(requiredProperties.size() == 2);
        // process must shuffle
        visit(nestedLoopJoin, context);
        // process nlj
        DistributionSpec rightDistributionSpec = originChildrenProperties.get(1).getDistributionSpec();
        if (rightDistributionSpec instanceof DistributionSpecStorageGather) {
            updateChildEnforceAndCost(1, PhysicalProperties.GATHER);
        }
        return ImmutableList.of(originChildrenProperties);
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalProject(PhysicalProject<? extends Plan> project, Void context) {
        // do not process must shuffle
        if (children.get(0).getPlan() instanceof PhysicalDistribute) {
            return ImmutableList.of();
        }
        return ImmutableList.of(originChildrenProperties);
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) {
        // process must shuffle
        visit(setOperation, context);
        // union with only constant exprs list
        if (children.isEmpty()) {
            return ImmutableList.of(originChildrenProperties);
        }
        // process set operation
        PhysicalProperties requiredProperty = requiredProperties.get(0);
        DistributionSpec requiredDistributionSpec = requiredProperty.getDistributionSpec();
        if (requiredDistributionSpec instanceof DistributionSpecGather) {
            for (int i = 0; i < originChildrenProperties.size(); i++) {
                if (originChildrenProperties.get(i).getDistributionSpec() instanceof DistributionSpecStorageGather) {
                    updateChildEnforceAndCost(i, PhysicalProperties.GATHER);
                }
            }
        } else if (requiredDistributionSpec instanceof DistributionSpecAny) {
            for (int i = 0; i < originChildrenProperties.size(); i++) {
                PhysicalProperties physicalProperties = originChildrenProperties.get(i);
                if (physicalProperties.getDistributionSpec() instanceof DistributionSpecStorageAny
                        || physicalProperties.getDistributionSpec() instanceof DistributionSpecStorageGather
                        || physicalProperties.getDistributionSpec() instanceof DistributionSpecGather
                        || (physicalProperties.getDistributionSpec() instanceof DistributionSpecHash
                        && ((DistributionSpecHash) physicalProperties.getDistributionSpec())
                        .getShuffleType() == ShuffleType.NATURAL)) {
                    updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY);
                }
            }
        } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
            // TODO: should use the most common hash spec as basic
            DistributionSpecHash basic = (DistributionSpecHash) requiredDistributionSpec;
            for (int i = 0; i < originChildrenProperties.size(); i++) {
                DistributionSpecHash current
                        = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
                        || !bothSideShuffleKeysAreSameOrder(basic, current,
                        (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                        (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec())) {
                    PhysicalProperties target = calAnotherSideRequired(
                            ShuffleType.EXECUTION_BUCKETED, basic, current,
                            (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
                            (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec());
                    updateChildEnforceAndCost(i, target);
                }
            }
        }
        return ImmutableList.of(originChildrenProperties);
    }

    @Override
    public List<List<PhysicalProperties>> visitAbstractPhysicalSort(
            AbstractPhysicalSort<? extends Plan> sort, Void context) {
        // process must shuffle
        visit(sort, context);
        if (sort.getSortPhase() == SortPhase.GATHER_SORT && sort.child() instanceof PhysicalDistribute) {
            // forbid gather sort need explicit shuffle
            return ImmutableList.of();
        }
        return ImmutableList.of(originChildrenProperties);
    }

    @Override
    public List<List<PhysicalProperties>> visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Void context) {
        // process must shuffle
        visit(topN, context);

        int sortPhaseNum = jobContext.getCascadesContext().getConnectContext().getSessionVariable().sortPhaseNum;
        // if control sort phase, forbid nothing
        if (sortPhaseNum == 1 || sortPhaseNum == 2) {
            return ImmutableList.of(originChildrenProperties);
        }
        // If child is DistributionSpecGather, topN should forbid two-phase topN
        if (topN.getSortPhase() == SortPhase.LOCAL_SORT
                && originChildrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE)) {
            return ImmutableList.of();
        }
        // forbid one step topn with distribute as child
        if (topN.getSortPhase() == SortPhase.GATHER_SORT
                && children.get(0).getPlan() instanceof PhysicalDistribute) {
            return ImmutableList.of();
        }
        return ImmutableList.of(originChildrenProperties);
    }

    /**
     * check both side real output hash key order are same or not.
     *
     * @param notShuffleSideOutput not shuffle side real output used hash spec
     * @param shuffleSideOutput  shuffle side real output used hash spec
     * @param notShuffleSideRequired not shuffle side required used hash spec
     * @param shuffleSideRequired shuffle side required hash spec
     * @return true if same
     */
    private boolean bothSideShuffleKeysAreSameOrder(
            DistributionSpecHash notShuffleSideOutput, DistributionSpecHash shuffleSideOutput,
            DistributionSpecHash notShuffleSideRequired, DistributionSpecHash shuffleSideRequired) {
        List<ExprId> shuffleSideOutputList = shuffleSideOutput.getOrderedShuffledColumns();
        List<ExprId> notShuffleSideOutputList = calAnotherSideRequiredShuffleIds(notShuffleSideOutput,
                notShuffleSideRequired, shuffleSideRequired);
        if (shuffleSideOutputList.size() != notShuffleSideOutputList.size()) {
            return false;
        } else if (shuffleSideOutputList.equals(notShuffleSideOutputList)) {
            return true;
        } else {
            boolean isSatisfy = true;
            for (int i = 0; i < shuffleSideOutputList.size() && isSatisfy; i++) {
                ExprId shuffleSideExprId = shuffleSideOutputList.get(i);
                ExprId notShuffleSideExprId = notShuffleSideOutputList.get(i);
                if (!(shuffleSideExprId.equals(notShuffleSideExprId)
                        || shuffleSideOutput.getEquivalenceExprIdsOf(shuffleSideExprId)
                        .contains(notShuffleSideExprId))) {
                    isSatisfy = false;
                }
            }
            return isSatisfy;
        }
    }

    /**
     * calculate the shuffle side hash key right orders.
     * For example,
     * if not shuffle side real hash key is 1 2 3.
     * the requirement of hash key of not shuffle side is 3 2 1.
     * the requirement of hash key of shuffle side is 6 5 4.
     * then we should let the shuffle side real output hash key order as 4 5 6
     *
     * @param notShuffleSideOutput not shuffle side real output used hash spec
     * @param notShuffleSideRequired not shuffle side required used hash spec
     * @param shuffleSideRequired shuffle side required hash spec
     * @return shuffle side real output used hash key order
     */
    private List<ExprId> calAnotherSideRequiredShuffleIds(DistributionSpecHash notShuffleSideOutput,
            DistributionSpecHash notShuffleSideRequired, DistributionSpecHash shuffleSideRequired) {
        ImmutableList.Builder<ExprId> rightShuffleIds = ImmutableList.builder();
        for (ExprId scanId : notShuffleSideOutput.getOrderedShuffledColumns()) {
            int index = notShuffleSideRequired.getOrderedShuffledColumns().indexOf(scanId);
            if (index == -1) {
                // when there is no exprId in notShuffleSideOutput, we need to check EquivalenceExprIds
                Set<ExprId> equivalentExprIds = notShuffleSideOutput.getEquivalenceExprIdsOf(scanId);
                for (ExprId alternativeExpr : equivalentExprIds) {
                    index = notShuffleSideRequired.getOrderedShuffledColumns().indexOf(alternativeExpr);
                    if (index != -1) {
                        break;
                    }
                }
            }
            Preconditions.checkState(index != -1, "index could not be -1");
            rightShuffleIds.add(shuffleSideRequired.getOrderedShuffledColumns().get(index));
        }
        return rightShuffleIds.build();
    }

    /**
     * generate shuffle side real output should follow PhysicalProperties. More info could see
     * calAnotherSideRequiredShuffleIds's comment.
     *
     * @param shuffleType real output shuffle type
     * @param notNeedShuffleSideOutput not shuffle side real output used hash spec
     * @param needShuffleSideOutput shuffle side real output used hash spec
     * @param notNeedShuffleSideRequired not shuffle side required used hash spec
     * @param needShuffleSideRequired shuffle side required hash spec
     * @return shuffle side new required hash spec
     */
    private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType,
            DistributionSpecHash notNeedShuffleSideOutput, DistributionSpecHash needShuffleSideOutput,
            DistributionSpecHash notNeedShuffleSideRequired, DistributionSpecHash needShuffleSideRequired) {
        List<ExprId> shuffleSideIds = calAnotherSideRequiredShuffleIds(notNeedShuffleSideOutput,
                notNeedShuffleSideRequired, needShuffleSideRequired);
        return new PhysicalProperties(new DistributionSpecHash(shuffleSideIds, shuffleType,
                needShuffleSideOutput.getTableId(), needShuffleSideOutput.getSelectedIndexId(),
                needShuffleSideOutput.getPartitionIds()));
    }

    private void updateChildEnforceAndCost(int index, PhysicalProperties targetProperties) {
        updateChildEnforceAndCost(index, targetProperties, originChildrenProperties);
    }

    private void updateChildEnforceAndCost(
            int index, PhysicalProperties targetProperties, List<PhysicalProperties> childrenProperties) {
        GroupExpression child = children.get(index);
        Pair<Cost, List<PhysicalProperties>> lowest
                = child.getLowestCostTable().get(childrenProperties.get(index));
        PhysicalProperties output = child.getOutputProperties(childrenProperties.get(index));
        DistributionSpec target = targetProperties.getDistributionSpec();
        updateChildEnforceAndCost(child, output, target, lowest.first);
        childrenProperties.set(index, targetProperties);
    }

    // TODO: why add enforcer according to target and target is from requiredProperties not regular
    private void updateChildEnforceAndCost(GroupExpression child, PhysicalProperties childOutput,
            DistributionSpec target, Cost currentCost) {
        if (child.getPlan() instanceof PhysicalDistribute) {
            // To avoid continuous distribute operator, we just enforce the child's child
            childOutput = child.getInputPropertiesList(childOutput).get(0);
            Pair<Cost, GroupExpression> newChildAndCost = child.getOwnerGroup().getLowestCostPlan(childOutput).get();
            child = newChildAndCost.second;
            currentCost = newChildAndCost.first;
        }

        PhysicalProperties newOutputProperty = new PhysicalProperties(target);
        GroupExpression enforcer = target.addEnforcer(child.getOwnerGroup());
        child.getOwnerGroup().addEnforcer(enforcer);
        ConnectContext connectContext = jobContext.getCascadesContext().getConnectContext();
        Cost enforceCost = CostCalculator.calculateCost(connectContext, enforcer, Lists.newArrayList(childOutput));
        enforcer.setCost(enforceCost);
        Cost totalCost = CostCalculator.addChildCost(
                connectContext, enforcer.getPlan(), enforceCost, currentCost, 0);

        if (enforcer.updateLowestCostTable(newOutputProperty,
                Lists.newArrayList(childOutput), totalCost)) {
            enforcer.putOutputPropertiesMap(newOutputProperty, newOutputProperty);
        }
        child.getOwnerGroup().setBestPlan(enforcer, totalCost, newOutputProperty);
    }
}