PlanFragment.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.TreeNode;
import org.apache.doris.nereids.trees.plans.distribute.NereidsSpecifyInstances;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TQueryCacheParam;
import org.apache.doris.thrift.TResultSinkType;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
 * PlanFragments form a tree structure via their ExchangeNodes. A tree of fragments
 * connected in that way forms a plan. The output of a plan is produced by the root
 * fragment and is either the result of the query or an intermediate result
 * needed by a different plan (such as a hash table).
 *
 * Plans are grouped into cohorts based on the consumer of their output: all
 * plans that materialize intermediate results for a particular consumer plan
 * are grouped into a single cohort.
 *
 * A PlanFragment encapsulates the specific tree of execution nodes that
 * are used to produce the output of the plan fragment, as well as output exprs,
 * destination node, etc. If there are no output exprs, the full row that is
 * produced by the plan root is marked as materialized.
 *
 * A plan fragment can have one or many instances, each of which in turn is executed by
 * an individual node and the output sent to a specific instance of the destination
 * fragment (or, in the case of the root fragment, is materialized in some form).
 *
 * A hash-partitioned plan fragment is the result of one or more hash-partitioning data
 * streams being received by plan nodes in this fragment. In the future, a fragment's
 * data partition could also be hash partitioned based on a scan node that is reading
 * from a physically hash-partitioned table.
 *
 * The sequence of calls is:
 * - c'tor
 * - assemble with getters, etc.
 * - finalize()
 * - toThrift()
 *
 * TODO: the tree of PlanNodes is connected across fragment boundaries, which makes
 *   it impossible search for things within a fragment (using TreeNode functions);
 *   fix that
 */
public class PlanFragment extends TreeNode<PlanFragment> {
    private static final Logger LOG = LogManager.getLogger(PlanFragment.class);
    // id for this plan fragment
    private PlanFragmentId fragmentId;
    // nereids planner and original planner generate fragments in different order.
    // This makes nereids fragment id different from that of original planner, and
    // hence different from that in profile.
    // in original planner, fragmentSequenceNum is fragmentId, and in nereids planner,
    // fragmentSequenceNum is the id displayed in profile
    private int fragmentSequenceNum;
    // private PlanId planId_;
    // private CohortId cohortId_;
    // root of plan tree executed by this fragment
    private PlanNode planRoot;
    // exchange node which this fragment sends its output to
    private ExchangeNode destNode;
    // if null, set with the planRoot's output exprs when translate PhysicalPlan. see `translatePlan`
    private ArrayList<Expr> outputExprs;
    // created in finalize() or set in setSink()
    protected DataSink sink;
    // data source(or sender) of specific partition in the fragment;
    // an UNPARTITIONED fragment is executed on only a single node
    private DataPartition dataPartition;
    // specification of the actually input partition of this fragment when transmitting to be.
    // By default, the value of the data partition in planner and the data partition transmitted to be are the same.
    // So this attribute is empty.
    // But sometimes the planned value and the serialized value are inconsistent. You need to set this value.
    // At present, this situation only occurs in the fragment where the scan node is located.
    // Since the data partition expression of the scan node is actually constructed from the schema of the table,
    //   the expression is not analyzed.
    // This will cause this expression to not be serialized correctly and transmitted to be.
    // In this case, you need to set this attribute to DataPartition RANDOM to avoid the problem.
    private DataPartition dataPartitionForThrift;
    // specification of how the output of this fragment is partitioned (i.e., how
    // it's sent to its destination);
    // if the output is UNPARTITIONED, it is being broadcast
    protected DataPartition outputPartition;
    // Whether query statistics is sent with every batch. In order to get the query
    // statistics correctly when query contains limit, it is necessary to send query
    // statistics with every batch, or only in close.
    private boolean transferQueryStatisticsWithEveryBatch;
    // TODO: SubstitutionMap outputSmap;
    // substitution map to remap exprs onto the output of this fragment, to be applied
    // at destination fragment
    // specification of the number of parallel when fragment is executed
    // default value is 1
    private int parallelExecNum = 1;
    // The runtime filter id that produced
    private Set<RuntimeFilterId> builderRuntimeFilterIds;
    // The runtime filter id that is expected to be used
    private Set<RuntimeFilterId> targetRuntimeFilterIds;
    private int bucketNum;
    // has colocate plan node
    protected boolean hasColocatePlanNode = false;
    protected final Supplier<Boolean> hasBucketShuffleJoin;
    private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCOL;
    public Optional<NereidsSpecifyInstances<ScanSource>> specifyInstances = Optional.empty();
    public TQueryCacheParam queryCacheParam;
    private int numBackends = 0;
    private boolean forceSingleInstance = false;
    /**
     * C'tor for fragment with specific partition; the output is by default broadcast.
     */
    public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
        this.fragmentId = id;
        this.planRoot = root;
        this.dataPartition = partition;
        this.outputPartition = DataPartition.UNPARTITIONED;
        this.transferQueryStatisticsWithEveryBatch = false;
        this.builderRuntimeFilterIds = new HashSet<>();
        this.targetRuntimeFilterIds = new HashSet<>();
        this.hasBucketShuffleJoin = buildHasBucketShuffleJoin();
        setParallelExecNumIfExists();
        setFragmentInPlanTree(planRoot);
    }
    public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition, DataPartition partitionForThrift) {
        this(id, root, partition);
        this.dataPartitionForThrift = partitionForThrift;
    }
    public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition,
            Set<RuntimeFilterId> builderRuntimeFilterIds, Set<RuntimeFilterId> targetRuntimeFilterIds) {
        this(id, root, partition);
        this.builderRuntimeFilterIds = new HashSet<>(builderRuntimeFilterIds);
        this.targetRuntimeFilterIds = new HashSet<>(targetRuntimeFilterIds);
    }
    private Supplier<Boolean> buildHasBucketShuffleJoin() {
        return Suppliers.memoize(() -> {
            List<HashJoinNode> hashJoinNodes = getPlanRoot().collectInCurrentFragment(HashJoinNode.class::isInstance);
            for (HashJoinNode hashJoinNode : hashJoinNodes) {
                if (hashJoinNode.isBucketShuffle()) {
                    return true;
                }
            }
            return false;
        });
    }
    /**
     * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node.
     * Does not traverse the children of ExchangeNodes because those must belong to a
     * different fragment.
     */
    public void setFragmentInPlanTree(PlanNode node) {
        if (node == null) {
            return;
        }
        node.setFragment(this);
        if (node instanceof ExchangeNode) {
            return;
        }
        for (PlanNode child : node.getChildren()) {
            setFragmentInPlanTree(child);
        }
    }
    /**
     * Assign ParallelExecNum by PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM in SessionVariable for synchronous request
     * Assign ParallelExecNum by default value for Asynchronous request
     */
    public void setParallelExecNumIfExists() {
        if (ConnectContext.get() != null) {
            parallelExecNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
        }
    }
    // Manually set parallel exec number
    // Currently for broker load
    public void setParallelExecNum(int parallelExecNum) {
        this.parallelExecNum = parallelExecNum;
    }
    public void setOutputExprs(List<Expr> outputExprs) {
        this.outputExprs = Expr.cloneList(outputExprs, null);
    }
    public void resetOutputExprs(TupleDescriptor tupleDescriptor) {
        this.outputExprs = Lists.newArrayList();
        for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
            SlotRef slotRef = new SlotRef(slotDescriptor);
            outputExprs.add(slotRef);
        }
    }
    public ArrayList<Expr> getOutputExprs() {
        return outputExprs;
    }
    public void setBuilderRuntimeFilterIds(RuntimeFilterId rid) {
        this.builderRuntimeFilterIds.add(rid);
    }
    public void setTargetRuntimeFilterIds(RuntimeFilterId rid) {
        this.targetRuntimeFilterIds.add(rid);
    }
    public void setHasColocatePlanNode(boolean hasColocatePlanNode) {
        this.hasColocatePlanNode = hasColocatePlanNode;
    }
    public boolean hasBucketShuffleJoin() {
        return hasBucketShuffleJoin.get();
    }
    public void setResultSinkType(TResultSinkType resultSinkType) {
        this.resultSinkType = resultSinkType;
    }
    public boolean hasColocatePlanNode() {
        return hasColocatePlanNode;
    }
    /**
     * Finalize plan tree and create stream sink, if needed.
     */
    public void finalize(StatementBase stmtBase) {
        if (sink != null) {
            return;
        }
        if (destNode != null) {
            Preconditions.checkState(sink == null);
            // we're streaming to an exchange node
            DataStreamSink streamSink = new DataStreamSink(destNode.getId());
            streamSink.setOutputPartition(outputPartition);
            streamSink.setFragment(this);
            sink = streamSink;
        } else {
            if (planRoot == null) {
                // only output expr, no FROM clause
                // "select 1 + 2"
                return;
            }
            Preconditions.checkState(sink == null);
            sink = new ResultSink(planRoot.getId(), resultSinkType);
        }
    }
    /**
     * Return the number of nodes on which the plan fragment will execute.
     * invalid: -1
     */
    public int getNumNodes() {
        return dataPartition == DataPartition.UNPARTITIONED ? 1 : planRoot.getNumNodes();
    }
    public int getParallelExecNum() {
        if (forceSingleInstance) {
            return 1;
        }
        return parallelExecNum;
    }
    public TPlanFragment toThrift() {
        TPlanFragment result = new TPlanFragment();
        if (planRoot != null) {
            result.setPlan(planRoot.treeToThrift());
        }
        if (outputExprs != null) {
            result.setOutputExprs(Expr.treesToThrift(outputExprs));
        }
        if (sink != null) {
            result.setOutputSink(sink.toThrift());
        }
        if (dataPartitionForThrift == null) {
            result.setPartition(dataPartition.toThrift());
        } else {
            result.setPartition(dataPartitionForThrift.toThrift());
        }
        // TODO chenhao , calculated by cost
        result.setMinReservationBytes(0);
        result.setInitialReservationTotalClaims(0);
        return result;
    }
    public String getExplainString(TExplainLevel explainLevel) {
        StringBuilder str = new StringBuilder();
        Preconditions.checkState(dataPartition != null);
        if (CollectionUtils.isNotEmpty(outputExprs)) {
            str.append("  OUTPUT EXPRS:\n    ");
            str.append(outputExprs.stream().map(Expr::toSql).collect(Collectors.joining("\n    ")));
        }
        str.append("\n");
        str.append("  PARTITION: " + dataPartition.getExplainString(explainLevel) + "\n");
        str.append("  HAS_COLO_PLAN_NODE: " + hasColocatePlanNode + "\n");
        if (queryCacheParam != null) {
            str.append("\n");
            str.append("  QUERY_CACHE:\n");
            str.append("    CACHE_NODE_ID: " + queryCacheParam.getNodeId() + "\n");
            str.append("    DIGEST: " + Hex.encodeHexString(queryCacheParam.getDigest()) + "\n");
        }
        str.append("\n");
        if (sink != null) {
            str.append(sink.getExplainString("  ", explainLevel) + "\n");
        }
        if (planRoot != null) {
            str.append(planRoot.getExplainString("  ", "  ", explainLevel));
        }
        return str.toString();
    }
    public void getExplainStringMap(Map<Integer, String> planNodeMap) {
        org.apache.doris.thrift.TExplainLevel explainLevel = org.apache.doris.thrift.TExplainLevel.NORMAL;
        if (planRoot != null) {
            planRoot.getExplainStringMap(explainLevel, planNodeMap);
        }
    }
    /**
     * Returns true if this fragment is partitioned.
     */
    public boolean isPartitioned() {
        return (dataPartition.getType() != TPartitionType.UNPARTITIONED);
    }
    public void updateDataPartition(DataPartition dataPartition) {
        if (this.dataPartition == DataPartition.UNPARTITIONED) {
            return;
        }
        this.dataPartition = dataPartition;
    }
    public PlanFragmentId getId() {
        return fragmentId;
    }
    public ExchangeNode getDestNode() {
        return destNode;
    }
    public PlanNode getDeepestLinearSource() {
        if (getChildren().size() > 1) {
            throw new IllegalStateException("getDeepestLinearSource() called on a fragment with multiple children");
        } else if (getChildren().isEmpty()) {
            // this is the root fragment
            return getPlanRoot();
        } else {
            // this is a non-root fragment
            return getChild(0).getDeepestLinearSource();
        }
    }
    public PlanFragment getDestFragment() {
        if (destNode == null) {
            return null;
        }
        return destNode.getFragment();
    }
    public void setDestination(ExchangeNode destNode) {
        this.destNode = destNode;
        PlanFragment dest = getDestFragment();
        Preconditions.checkNotNull(dest);
        dest.addChild(this);
    }
    public DataPartition getDataPartition() {
        return dataPartition;
    }
    public DataPartition getOutputPartition() {
        return outputPartition;
    }
    public void setOutputPartition(DataPartition outputPartition) {
        this.outputPartition = outputPartition;
    }
    public PlanNode getPlanRoot() {
        return planRoot;
    }
    public void setPlanRoot(PlanNode root) {
        planRoot = root;
        setFragmentInPlanTree(planRoot);
    }
    /**
     * Adds a node as the new root to the plan tree. Connects the existing
     * root as the child of newRoot.
     */
    public void addPlanRoot(PlanNode newRoot) {
        Preconditions.checkState(newRoot.getChildren().size() == 1);
        newRoot.setChild(0, planRoot);
        planRoot = newRoot;
        planRoot.setFragment(this);
    }
    public DataSink getSink() {
        return sink;
    }
    public void setSink(DataSink sink) {
        Preconditions.checkState(this.sink == null);
        Preconditions.checkNotNull(sink);
        sink.setFragment(this);
        this.sink = sink;
    }
    public void resetSink(DataSink sink) {
        sink.setFragment(this);
        this.sink = sink;
    }
    public PlanFragmentId getFragmentId() {
        return fragmentId;
    }
    public Set<RuntimeFilterId> getBuilderRuntimeFilterIds() {
        return builderRuntimeFilterIds;
    }
    public Set<RuntimeFilterId> getTargetRuntimeFilterIds() {
        return targetRuntimeFilterIds;
    }
    public void clearRuntimeFilters() {
        builderRuntimeFilterIds.clear();
        targetRuntimeFilterIds.clear();
    }
    public void setTransferQueryStatisticsWithEveryBatch(boolean value) {
        transferQueryStatisticsWithEveryBatch = value;
    }
    public boolean isTransferQueryStatisticsWithEveryBatch() {
        return transferQueryStatisticsWithEveryBatch;
    }
    public int getFragmentSequenceNum() {
        return fragmentSequenceNum;
    }
    public void setFragmentSequenceNum(int seq) {
        fragmentSequenceNum = seq;
    }
    public int getBucketNum() {
        return bucketNum;
    }
    public void setBucketNum(int bucketNum) {
        this.bucketNum = bucketNum;
    }
    public boolean hasNullAwareLeftAntiJoin() {
        return planRoot.anyMatch(plan -> plan instanceof JoinNodeBase
                && ((JoinNodeBase) plan).getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN);
    }
    public boolean useSerialSource(ConnectContext context) {
        return context != null
                && context.getSessionVariable().isIgnoreStorageDataDistribution()
                && queryCacheParam == null
                && !hasNullAwareLeftAntiJoin()
                // If planRoot is not a serial operator and has serial children, we can use serial source and improve
                // parallelism of non-serial operators.
                // For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are
                // serial.
                && (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator()
                && planRoot.hasSerialChildren()));
    }
    public boolean hasSerialScanNode() {
        return planRoot.hasSerialScanChildren();
    }
    public void setForceSingleInstance() {
        this.forceSingleInstance = true;
    }
}