ChildOutputPropertyDeriver.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.properties;

import org.apache.doris.nereids.PlanContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * Used for property drive.
 */
public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, PlanContext> {
    /*
     *   parentPlanNode
     *         ▲
     *         │
     * childOutputProperty
     */
    private final List<PhysicalProperties> childrenOutputProperties;

    public ChildOutputPropertyDeriver(List<PhysicalProperties> childrenOutputProperties) {
        this.childrenOutputProperties = Objects.requireNonNull(childrenOutputProperties);
    }

    public PhysicalProperties getOutputProperties(ConnectContext connectContext, GroupExpression groupExpression) {
        return groupExpression.getPlan().accept(this, new PlanContext(connectContext, groupExpression));
    }

    @Override
    public PhysicalProperties visit(Plan plan, PlanContext context) {
        if (childrenOutputProperties.isEmpty()) {
            return PhysicalProperties.ANY;
        } else {
            DistributionSpec firstChildSpec = childrenOutputProperties.get(0).getDistributionSpec();
            if (firstChildSpec instanceof DistributionSpecHash) {
                return PhysicalProperties.createAnyFromHash((DistributionSpecHash) firstChildSpec);
            } else {
                return PhysicalProperties.ANY;
            }
        }
    }

    /* ********************************************************************************************
     * sink Node, in lexicographical order
     * ******************************************************************************************** */

    @Override
    public PhysicalProperties visitPhysicalSink(PhysicalSink<? extends Plan> physicalSink, PlanContext context) {
        return PhysicalProperties.GATHER;
    }

    /* ********************************************************************************************
     * Leaf Plan Node, in lexicographical order
     * ******************************************************************************************** */

    @Override
    public PhysicalProperties visitPhysicalCTEConsumer(
            PhysicalCTEConsumer cteConsumer, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.isEmpty(), "cte consumer should be leaf node");
        return PhysicalProperties.MUST_SHUFFLE;
    }

    @Override
    public PhysicalProperties visitPhysicalEmptyRelation(PhysicalEmptyRelation emptyRelation, PlanContext context) {
        return PhysicalProperties.GATHER;
    }

    @Override
    public PhysicalProperties visitPhysicalEsScan(PhysicalEsScan esScan, PlanContext context) {
        return PhysicalProperties.STORAGE_ANY;
    }

    @Override
    public PhysicalProperties visitPhysicalFileScan(PhysicalFileScan fileScan, PlanContext context) {
        return PhysicalProperties.STORAGE_ANY;
    }

    /**
     * TODO return ANY after refactor coordinator
     * return STORAGE_ANY not ANY, in order to generate distribute on jdbc scan.
     * select * from (select * from external.T) as A union all (select * from external.T)
     * if visitPhysicalJdbcScan returns ANY, the plan is
     * union
     *  |--- JDBCSCAN
     *  +--- JDBCSCAN
     *  this breaks coordinator assumption that one fragment has at most only one scan.
     */
    @Override
    public PhysicalProperties visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanContext context) {
        return PhysicalProperties.STORAGE_ANY;
    }

    @Override
    public PhysicalProperties visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanContext context) {
        return PhysicalProperties.STORAGE_ANY;
    }

    @Override
    public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanContext context) {
        return new PhysicalProperties(olapScan.getDistributionSpec());
    }

    @Override
    public PhysicalProperties visitPhysicalDeferMaterializeOlapScan(
            PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanContext context) {
        return visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
    }

    @Override
    public PhysicalProperties visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, PlanContext context) {
        return PhysicalProperties.GATHER;
    }

    @Override
    public PhysicalProperties visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, PlanContext context) {
        TableValuedFunction function = tvfRelation.getFunction();
        return function.getPhysicalProperties();
    }

    /* ********************************************************************************************
     * Other Node, in lexicographical order
     * ******************************************************************************************** */

    @Override
    public PhysicalProperties visitPhysicalHashAggregate(
            PhysicalHashAggregate<? extends Plan> agg, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        PhysicalProperties childOutputProperty = childrenOutputProperties.get(0);
        switch (agg.getAggPhase()) {
            case LOCAL:
            case GLOBAL:
            case DISTINCT_LOCAL:
            case DISTINCT_GLOBAL:
                return new PhysicalProperties(childOutputProperty.getDistributionSpec());
            default:
                throw new RuntimeException("Could not derive output properties for agg phase: " + agg.getAggPhase());
        }
    }

    @Override
    public PhysicalProperties visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
            PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        PhysicalProperties childOutputProperty = childrenOutputProperties.get(0);
        return new PhysicalProperties(childOutputProperty.getDistributionSpec());
    }

    @Override
    public PhysicalProperties visitPhysicalCTEAnchor(
            PhysicalCTEAnchor<? extends Plan, ? extends Plan> cteAnchor, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 2);
        // return properties inherited from consumer side which may further be used at upper layer
        return childrenOutputProperties.get(1);
    }

    @Override
    public PhysicalProperties visitPhysicalCTEProducer(
            PhysicalCTEProducer<? extends Plan> cteProducer, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        return childrenOutputProperties.get(0);
    }

    @Override
    public PhysicalProperties visitPhysicalDistribute(
            PhysicalDistribute<? extends Plan> distribute, PlanContext context) {
        return distribute.getPhysicalProperties();
    }

    @Override
    public PhysicalProperties visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        return childrenOutputProperties.get(0);
    }

    @Override
    public PhysicalProperties visitPhysicalGenerate(PhysicalGenerate<? extends Plan> generate, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        return childrenOutputProperties.get(0);
    }

    @Override
    public PhysicalProperties visitPhysicalHashJoin(
            PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 2);
        PhysicalProperties leftOutputProperty = childrenOutputProperties.get(0);
        PhysicalProperties rightOutputProperty = childrenOutputProperties.get(1);

        // broadcast
        if (rightOutputProperty.getDistributionSpec() instanceof DistributionSpecReplicated) {
            DistributionSpec leftDistributionSpec = leftOutputProperty.getDistributionSpec();
            // if left side is hash distribute and the key can satisfy the join keys, then mock
            // a right side hash spec with the corresponding join keys, to filling the returning spec
            // with refined EquivalenceExprIds.
            if (leftDistributionSpec instanceof DistributionSpecHash
                    && !(hashJoin.isMarkJoin() && hashJoin.getHashJoinConjuncts().isEmpty())
                    && !hashJoin.getHashConjunctsExprIds().first.isEmpty()
                    && !hashJoin.getHashConjunctsExprIds().second.isEmpty()
                    && hashJoin.getHashConjunctsExprIds().first.size()
                        == hashJoin.getHashConjunctsExprIds().second.size()
                    && leftDistributionSpec.satisfy(
                            new DistributionSpecHash(hashJoin.getHashConjunctsExprIds().first, ShuffleType.REQUIRE))) {
                DistributionSpecHash mockedRightHashSpec = mockAnotherSideSpecFromConjuncts(
                        hashJoin, (DistributionSpecHash) leftDistributionSpec);
                if (SessionVariable.canUseNereidsDistributePlanner()) {
                    return computeShuffleJoinOutputProperties(hashJoin,
                            (DistributionSpecHash) leftDistributionSpec, mockedRightHashSpec);
                } else {
                    return legacyComputeShuffleJoinOutputProperties(hashJoin,
                            (DistributionSpecHash) leftDistributionSpec, mockedRightHashSpec);
                }
            } else {
                return new PhysicalProperties(leftDistributionSpec);
            }
        }

        // shuffle
        if (leftOutputProperty.getDistributionSpec() instanceof DistributionSpecHash
                && rightOutputProperty.getDistributionSpec() instanceof DistributionSpecHash) {
            DistributionSpecHash leftHashSpec = (DistributionSpecHash) leftOutputProperty.getDistributionSpec();
            DistributionSpecHash rightHashSpec = (DistributionSpecHash) rightOutputProperty.getDistributionSpec();
            if (SessionVariable.canUseNereidsDistributePlanner()) {
                return computeShuffleJoinOutputProperties(hashJoin, leftHashSpec, rightHashSpec);
            } else {
                return legacyComputeShuffleJoinOutputProperties(hashJoin, leftHashSpec, rightHashSpec);
            }
        }

        throw new RuntimeException("Could not derive hash join's output properties. join: " + hashJoin);
    }

    @Override
    public PhysicalProperties visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        return childrenOutputProperties.get(0);
    }

    @Override
    public PhysicalProperties visitPhysicalNestedLoopJoin(
            PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> nestedLoopJoin,
            PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 2);
        PhysicalProperties leftOutputProperty = childrenOutputProperties.get(0);
        return new PhysicalProperties(leftOutputProperty.getDistributionSpec());
    }

    @Override
    public PhysicalProperties visitPhysicalProject(PhysicalProject<? extends Plan> project, PlanContext context) {
        // TODO: order spec do not process since we do not use it.
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        PhysicalProperties childProperties = childrenOutputProperties.get(0);
        DistributionSpec childDistributionSpec = childProperties.getDistributionSpec();
        OrderSpec childOrderSpec = childProperties.getOrderSpec();
        if (childDistributionSpec instanceof DistributionSpecHash) {
            Map<ExprId, ExprId> projections = Maps.newHashMap();
            Set<ExprId> obstructions = Sets.newHashSet();
            for (NamedExpression namedExpression : project.getProjects()) {
                if (namedExpression instanceof Alias) {
                    Alias alias = (Alias) namedExpression;
                    Expression child = alias.child();
                    if (child instanceof SlotReference) {
                        projections.put(((SlotReference) child).getExprId(), alias.getExprId());
                    } else if (child instanceof Cast && child.child(0) instanceof Slot
                            && isSameHashValue(child.child(0).getDataType(), child.getDataType())) {
                        // cast(slot as varchar(10)) can do projection if slot is varchar(3)
                        projections.put(((Slot) child.child(0)).getExprId(), alias.getExprId());
                    } else {
                        obstructions.addAll(
                                child.getInputSlots().stream()
                                        .map(NamedExpression::getExprId)
                                        .collect(Collectors.toSet()));
                    }
                }
            }
            if (projections.entrySet().stream().allMatch(kv -> kv.getKey().equals(kv.getValue()))) {
                return childrenOutputProperties.get(0);
            }
            DistributionSpecHash childDistributionSpecHash = (DistributionSpecHash) childDistributionSpec;
            DistributionSpec defaultAnySpec = childDistributionSpecHash.getShuffleType() == ShuffleType.NATURAL
                    ? DistributionSpecStorageAny.INSTANCE : DistributionSpecAny.INSTANCE;
            DistributionSpec outputDistributionSpec = childDistributionSpecHash.project(
                    projections, obstructions, defaultAnySpec);
            return new PhysicalProperties(outputDistributionSpec, childOrderSpec);
        } else {
            return childrenOutputProperties.get(0);
        }
    }

    @Override
    public PhysicalProperties visitPhysicalRepeat(PhysicalRepeat<? extends Plan> repeat, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        DistributionSpec childDistributionSpec = childrenOutputProperties.get(0).getDistributionSpec();
        PhysicalProperties output = childrenOutputProperties.get(0);
        if (childDistributionSpec instanceof DistributionSpecHash) {
            DistributionSpecHash distributionSpecHash = (DistributionSpecHash) childDistributionSpec;
            List<List<Expression>> groupingSets = repeat.getGroupingSets();
            if (!groupingSets.isEmpty()) {
                Set<Expression> intersectGroupingKeys = Utils.fastToImmutableSet(groupingSets.get(0));
                for (int i = 1; i < groupingSets.size() && !intersectGroupingKeys.isEmpty(); i++) {
                    intersectGroupingKeys = Sets.intersection(
                            intersectGroupingKeys, Utils.fastToImmutableSet(groupingSets.get(i))
                    );
                }
                List<ExprId> orderedShuffledColumns = distributionSpecHash.getOrderedShuffledColumns();
                Set<ExprId> intersectGroupingKeysId = new HashSet<>();
                for (Expression key : intersectGroupingKeys) {
                    if (!(key instanceof SlotReference)) {
                        break;
                    }
                    intersectGroupingKeysId.add(((SlotReference) key).getExprId());
                }
                if (intersectGroupingKeysId.containsAll(orderedShuffledColumns)) {
                    return childrenOutputProperties.get(0);
                }
            }
            output = PhysicalProperties.createAnyFromHash((DistributionSpecHash) childDistributionSpec);
        }
        return output.withOrderSpec(childrenOutputProperties.get(0).getOrderSpec());
    }

    @Override
    public PhysicalProperties visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
            PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        DistributionSpec childDistSpec = childrenOutputProperties.get(0).getDistributionSpec();

        if (partitionTopN.getPhase().isTwoPhaseLocal() || partitionTopN.getPhase().isOnePhaseGlobal()) {
            return new PhysicalProperties(childDistSpec);
        } else {
            Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal(),
                    "partition topn phase is not two phase global");
            Preconditions.checkState(childDistSpec instanceof DistributionSpecHash,
                    "child dist spec is not hash spec");

            return new PhysicalProperties(childDistSpec, new OrderSpec(partitionTopN.getOrderKeys()));
        }
    }

    @Override
    public PhysicalProperties visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanContext context) {
        int[] offsetsOfFirstChild = null;
        ShuffleType firstType = null;
        List<DistributionSpec> childrenDistribution = childrenOutputProperties.stream()
                .map(PhysicalProperties::getDistributionSpec)
                .collect(Collectors.toList());
        if (childrenDistribution.isEmpty()) {
            // no child, mean it only has some one-row-relations
            return PhysicalProperties.GATHER;
        }
        if (childrenDistribution.stream().allMatch(DistributionSpecGather.class::isInstance)) {
            return PhysicalProperties.GATHER;
        }
        for (int i = 0; i < childrenDistribution.size(); i++) {
            DistributionSpec childDistribution = childrenDistribution.get(i);
            if (!(childDistribution instanceof DistributionSpecHash)) {
                if (i != 0) {
                    // NOTICE: if come here, the first child output must be DistributionSpecHash
                    return PhysicalProperties.createAnyFromHash((DistributionSpecHash) childrenDistribution.get(0));
                } else {
                    return new PhysicalProperties(childDistribution);
                }
            }
            DistributionSpecHash distributionSpecHash = (DistributionSpecHash) childDistribution;
            int[] offsetsOfCurrentChild = new int[distributionSpecHash.getOrderedShuffledColumns().size()];
            for (int j = 0; j < setOperation.getRegularChildOutput(i).size(); j++) {
                int offset = distributionSpecHash.getExprIdToEquivalenceSet()
                        .getOrDefault(setOperation.getRegularChildOutput(i).get(j).getExprId(), -1);
                if (offset >= 0) {
                    offsetsOfCurrentChild[offset] = j;
                } else {
                    // NOTICE: if come here, the first child output must be DistributionSpecHash
                    return PhysicalProperties.createAnyFromHash((DistributionSpecHash) childrenDistribution.get(0));
                }
            }
            if (offsetsOfFirstChild == null) {
                firstType = ((DistributionSpecHash) childDistribution).getShuffleType();
                offsetsOfFirstChild = offsetsOfCurrentChild;
            } else if (!Arrays.equals(offsetsOfFirstChild, offsetsOfCurrentChild)
                    || firstType != ((DistributionSpecHash) childDistribution).getShuffleType()) {
                // NOTICE: if come here, the first child output must be DistributionSpecHash
                return PhysicalProperties.createAnyFromHash((DistributionSpecHash) childrenDistribution.get(0));
            }
        }
        // bucket
        List<ExprId> request = Lists.newArrayList();
        for (int offset : offsetsOfFirstChild) {
            request.add(setOperation.getOutput().get(offset).getExprId());
        }
        return PhysicalProperties.createHash(request, firstType);
    }

    @Override
    public PhysicalProperties visitPhysicalUnion(PhysicalUnion union, PlanContext context) {
        if (union.getConstantExprsList().isEmpty()) {
            return visitPhysicalSetOperation(union, context);
        } else {
            // current be could not run const expr on appropriate node,
            // so if we have constant exprs on union, the output of union always any
            return visit(union, context);
        }
    }

    @Override
    public PhysicalProperties visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort,
            PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        if (sort.getSortPhase().isLocal()) {
            return new PhysicalProperties(
                    childrenOutputProperties.get(0).getDistributionSpec(),
                    new OrderSpec(sort.getOrderKeys()));
        }
        return new PhysicalProperties(DistributionSpecGather.INSTANCE, new OrderSpec(sort.getOrderKeys()));
    }

    @Override
    public PhysicalProperties visitPhysicalWindow(PhysicalWindow<? extends Plan> window, PlanContext context) {
        Preconditions.checkState(childrenOutputProperties.size() == 1);
        return childrenOutputProperties.get(0);
    }

    private PhysicalProperties computeShuffleJoinOutputProperties(
            PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
            DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec) {

        ShuffleSide shuffleSide = computeShuffleSide(leftHashSpec, rightHashSpec);

        ShuffleType outputShuffleType = shuffleSide == ShuffleSide.LEFT
                ? rightHashSpec.getShuffleType() : leftHashSpec.getShuffleType();

        switch (hashJoin.getJoinType()) {
            case INNER_JOIN:
            case CROSS_JOIN:
                if (shuffleSide == ShuffleSide.LEFT) {
                    return new PhysicalProperties(
                            DistributionSpecHash.merge(rightHashSpec, leftHashSpec, outputShuffleType)
                    );
                } else if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == ShuffleSide.NONE) {
                    return new PhysicalProperties(
                            DistributionSpecHash.merge(leftHashSpec, rightHashSpec, outputShuffleType)
                    );
                } else if (shuffleSide == ShuffleSide.BOTH) {
                    return new PhysicalProperties(
                            DistributionSpecHash.merge(leftHashSpec, rightHashSpec, outputShuffleType)
                                    .withShuffleTypeAndForbidColocateJoin(leftHashSpec.getShuffleType())
                    );
                } else {
                    throw new AnalysisException("unknown shuffle side " + shuffleSide);
                }
            case LEFT_SEMI_JOIN:
            case LEFT_ANTI_JOIN:
            case NULL_AWARE_LEFT_ANTI_JOIN:
            case LEFT_OUTER_JOIN:
                if (shuffleSide == ShuffleSide.LEFT || shuffleSide == ShuffleSide.BOTH) {
                    return new PhysicalProperties(
                            leftHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
                    );
                } else if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == ShuffleSide.NONE) {
                    return new PhysicalProperties(leftHashSpec);
                } else {
                    throw new AnalysisException("unknown shuffle side " + shuffleSide);
                }
            case RIGHT_SEMI_JOIN:
            case RIGHT_ANTI_JOIN:
            case RIGHT_OUTER_JOIN:
                if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == ShuffleSide.BOTH) {
                    return new PhysicalProperties(
                            rightHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
                    );
                } else if (shuffleSide == ShuffleSide.LEFT || shuffleSide == ShuffleSide.NONE) {
                    return new PhysicalProperties(rightHashSpec);
                } else {
                    throw new AnalysisException("unknown shuffle side " + shuffleSide);
                }
            case FULL_OUTER_JOIN:
                return PhysicalProperties.createAnyFromHash(leftHashSpec, rightHashSpec);
            default:
                throw new AnalysisException("unknown join type " + hashJoin.getJoinType());
        }
    }

    private ShuffleSide computeShuffleSide(DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec) {
        ShuffleType leftShuffleType = leftHashSpec.getShuffleType();
        ShuffleType rightShuffleType = rightHashSpec.getShuffleType();
        switch (leftShuffleType) {
            case EXECUTION_BUCKETED:
            case STORAGE_BUCKETED:
                return rightShuffleType == ShuffleType.NATURAL ? ShuffleSide.LEFT : ShuffleSide.BOTH;
            case NATURAL:
                return rightShuffleType == ShuffleType.NATURAL ? ShuffleSide.NONE : ShuffleSide.RIGHT;
            default:
        }
        throw new IllegalStateException(
                "Illegal join with wrong distribution, left: " + leftShuffleType + ", right: " + rightShuffleType);
    }

    private PhysicalProperties legacyComputeShuffleJoinOutputProperties(
            PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
            DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec) {
        switch (hashJoin.getJoinType()) {
            case INNER_JOIN:
            case CROSS_JOIN:
                return new PhysicalProperties(DistributionSpecHash.merge(
                        leftHashSpec, rightHashSpec, leftHashSpec.getShuffleType()));
            case LEFT_SEMI_JOIN:
            case LEFT_ANTI_JOIN:
            case NULL_AWARE_LEFT_ANTI_JOIN:
            case LEFT_OUTER_JOIN:
                return new PhysicalProperties(leftHashSpec);
            case RIGHT_SEMI_JOIN:
            case RIGHT_ANTI_JOIN:
            case RIGHT_OUTER_JOIN:
                if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
                    return new PhysicalProperties(rightHashSpec);
                } else {
                    // retain left shuffle type, since coordinator use left most node to schedule fragment
                    // forbid colocate join, since right table already shuffle
                    return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin(
                            leftHashSpec.getShuffleType()));
                }
            case FULL_OUTER_JOIN:
                return PhysicalProperties.createAnyFromHash(leftHashSpec);
            default:
                throw new AnalysisException("unknown join type " + hashJoin.getJoinType());
        }
    }

    private DistributionSpecHash mockAnotherSideSpecFromConjuncts(
            PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, DistributionSpecHash oneSideSpec) {
        List<ExprId> leftExprIds = hashJoin.getHashConjunctsExprIds().first;
        List<ExprId> rightExprIds = hashJoin.getHashConjunctsExprIds().second;
        Preconditions.checkState(!leftExprIds.isEmpty() && !rightExprIds.isEmpty()
                && leftExprIds.size() == rightExprIds.size(), "invalid hash join conjuncts");
        List<ExprId> anotherSideOrderedExprIds = Lists.newArrayList();
        for (ExprId exprId : oneSideSpec.getOrderedShuffledColumns()) {
            int index = leftExprIds.indexOf(exprId);
            if (index == -1) {
                Set<ExprId> equivalentExprIds = oneSideSpec.getEquivalenceExprIdsOf(exprId);
                for (ExprId id : equivalentExprIds) {
                    index = leftExprIds.indexOf(id);
                    if (index >= 0) {
                        break;
                    }
                }
                Preconditions.checkState(index >= 0, "can't find exprId in equivalence set");
            }
            anotherSideOrderedExprIds.add(rightExprIds.get(index));
        }
        return new DistributionSpecHash(anotherSideOrderedExprIds, oneSideSpec.getShuffleType());
    }

    private boolean isSameHashValue(DataType originType, DataType castType) {
        if (originType.isStringLikeType() && (castType.isVarcharType() || castType.isStringType())
                && (castType.width() >= originType.width() || castType.width() < 0)) {
            return true;
        } else {
            return false;
        }
    }

    private enum ShuffleSide {
        LEFT, RIGHT, BOTH, NONE
    }
}