PhysicalPlanTranslator.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.glue.translator;

import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.AnalyticWindow;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.GroupByClause.GroupingType;
import org.apache.doris.analysis.GroupingInfo;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.OrderByElement;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Function.NullableMode;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.EsExternalTable;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.source.HudiScanNode;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
import org.apache.doris.datasource.odbc.source.OdbcScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.paimon.source.PaimonScanNode;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
import org.apache.doris.fsv2.DirectoryLister;
import org.apache.doris.fsv2.FileSystemDirectoryLister;
import org.apache.doris.fsv2.TransactionScopeCachingDirectoryListerFactory;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.DistributionSpecAllSingleton;
import org.apache.doris.nereids.properties.DistributionSpecAny;
import org.apache.doris.nereids.properties.DistributionSpecExecutionAny;
import org.apache.doris.nereids.properties.DistributionSpecGather;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHiveTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecHiveTableSinkUnPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecOlapTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
import org.apache.doris.nereids.rules.rewrite.MergeLimits;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.UnaryNode;
import org.apache.doris.nereids.trees.expressions.AggregateExpression;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.types.ArrayType;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.JsonType;
import org.apache.doris.nereids.types.MapType;
import org.apache.doris.nereids.types.StructType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.AggregationNode;
import org.apache.doris.planner.AnalyticEvalNode;
import org.apache.doris.planner.AssertNumRowsNode;
import org.apache.doris.planner.BackendPartitionedSchemaScanNode;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.DictionarySink;
import org.apache.doris.planner.EmptySetNode;
import org.apache.doris.planner.ExceptNode;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.HiveTableSink;
import org.apache.doris.planner.IcebergTableSink;
import org.apache.doris.planner.IntersectNode;
import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.MaterializationNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.MultiCastPlanFragment;
import org.apache.doris.planner.NestedLoopJoinNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PartitionSortNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RepeatNode;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.planner.SelectNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TRuntimeFilterType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Used to translate to physical plan generated by new optimizer to the plan fragments.
 * <STRONG>
 * ATTENTION:
 * Must always visit plan's children first when you implement a method to translate from PhysicalPlan to PlanNode.
 * </STRONG>
 */
public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, PlanTranslatorContext> {

    private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class);
    private final StatsErrorEstimator statsErrorEstimator;
    private final PlanTranslatorContext context;

    private DirectoryLister directoryLister;

    public PhysicalPlanTranslator() {
        this(null, null);
    }

    public PhysicalPlanTranslator(PlanTranslatorContext context) {
        this(context, null);
    }

    public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
        this.context = context;
        this.statsErrorEstimator = statsErrorEstimator;
    }

    /**
     * Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree.
     *
     * @param physicalPlan Nereids Physical Plan tree
     * @return Stale Planner PlanFragment tree
     */
    public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
        PlanFragment rootFragment = physicalPlan.accept(this, context);
        if (CollectionUtils.isEmpty(rootFragment.getOutputExprs())) {
            List<Expr> outputExprs = Lists.newArrayList();
            physicalPlan.getOutput().stream().map(Slot::getExprId)
                    .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
            rootFragment.setOutputExprs(outputExprs);
        }
        Collections.reverse(context.getPlanFragments());
        // TODO: maybe we need to trans nullable directly? and then we could remove call computeMemLayout
        context.getDescTable().computeMemLayout();
        if (context.getSessionVariable() != null && context.getSessionVariable().forbidUnknownColStats) {
            Set<ScanNode> scans = context.getScanNodeWithUnknownColumnStats();
            if (!scans.isEmpty()) {
                StringBuilder builder = new StringBuilder();
                scans.forEach(builder::append);
                throw new AnalysisException("tables with unknown column stats: " + builder);
            }
        }
        for (ScanNode scanNode : context.getScanNodes()) {
            try {
                scanNode.finalizeForNereids();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
        return rootFragment;
    }

    /* ********************************************************************************************
     * distribute node
     * ******************************************************************************************** */

    @Override
    public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute,
            PlanTranslatorContext context) {
        Plan upstream = distribute.child(); // now they're in one fragment but will be split by ExchangeNode.
        PlanFragment upstreamFragment = upstream.accept(this, context);
        List<List<Expr>> upstreamDistributeExprs = getDistributeExprs(upstream);

        DistributionSpec targetDistribution = distribute.getDistributionSpec();

        // TODO: why need set streaming here? should remove this.
        if (upstreamFragment.getPlanRoot() instanceof AggregationNode && upstream instanceof PhysicalHashAggregate
                && context.getFirstAggregateInFragment(upstreamFragment) == upstream) {
            PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) upstream;
            if (hashAggregate.getAggPhase() == AggPhase.LOCAL
                    && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER
                    && hashAggregate.getTopnPushInfo() == null) {
                AggregationNode aggregationNode = (AggregationNode) upstreamFragment.getPlanRoot();
                aggregationNode.setUseStreamingPreagg(hashAggregate.isMaybeUsingStream());
            }
        }
        // all PhysicalDistribute translate to ExchangeNode. upstream as input.
        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), upstreamFragment.getPlanRoot());
        updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute);
        List<ExprId> validOutputIds = distribute.getOutputExprIds();
        if (upstream instanceof PhysicalHashAggregate) {
            // we must add group by keys to output list,
            // otherwise we could not process aggregate's output without group by keys
            List<ExprId> keys = ((PhysicalHashAggregate<?>) upstream).getGroupByExpressions().stream()
                    .filter(SlotReference.class::isInstance)
                    .map(SlotReference.class::cast)
                    .map(SlotReference::getExprId)
                    .collect(Collectors.toList());
            keys.addAll(validOutputIds);
            validOutputIds = keys;
        }
        if (upstreamFragment instanceof MultiCastPlanFragment) {
            // TODO: remove this logic when we split to multi-window in logical window to physical window conversion
            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) upstreamFragment.getSink();
            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
                    multiCastDataSink.getDataStreamSinks().size() - 1);
            if (!(upstream instanceof PhysicalProject)) {
                List<Expr> projectionExprs = new ArrayList<>();
                PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute);
                Preconditions.checkState(consumer != null, "consumer not found");
                for (Slot slot : distribute.getOutput()) {
                    projectionExprs.add(ExpressionTranslator.translate(consumer.getProducerSlot(slot), context));
                }
                TupleDescriptor projectionTuple = generateTupleDesc(distribute.getOutput(), null, context);
                dataStreamSink.setProjections(projectionExprs);
                dataStreamSink.setOutputTupleDesc(projectionTuple);
            }
        }
        // target data partition
        DataPartition targetDataPartition = toDataPartition(targetDistribution, validOutputIds, context);
        exchangeNode.setPartitionType(targetDataPartition.getType());
        exchangeNode.setChildrenDistributeExprLists(upstreamDistributeExprs);
        // its source partition is targetDataPartition. and outputPartition is UNPARTITIONED now, will be set when
        // visit its SinkNode
        PlanFragment downstreamFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, targetDataPartition);
        if (targetDistribution instanceof DistributionSpecGather
                || targetDistribution instanceof DistributionSpecStorageGather) {
            // gather to one instance
            exchangeNode.setNumInstances(1);
        } else if (targetDistribution instanceof DistributionSpecAllSingleton) {
            // instances number = BE number now. assign one by one later.
            //ATTN: this number MAY BE CHANGED when we do distributing because when we finished physical planning,
            // we got the source table version. and in distribute planning, basing on the src version we may find
            // there's some BE whose dictionary already have newest data we dont have to reload.
            int aliveBENumber = Env.getCurrentSystemInfo().getAllClusterBackends(true).size();
            exchangeNode.setNumInstances(aliveBENumber);
        } else { // not change instances
            exchangeNode.setNumInstances(upstreamFragment.getPlanRoot().getNumInstances());
        }

        // process multicast sink
        if (upstreamFragment instanceof MultiCastPlanFragment) {
            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) upstreamFragment.getSink();
            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
                    multiCastDataSink.getDataStreamSinks().size() - 1);
            exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc());
            dataStreamSink.setExchNodeId(exchangeNode.getId());
            dataStreamSink.setOutputPartition(targetDataPartition);
            downstreamFragment.addChild(upstreamFragment);
            ((MultiCastPlanFragment) upstreamFragment).addToDest(exchangeNode);

            CTEScanNode cteScanNode = context.getCteScanNodeMap().get(upstreamFragment.getFragmentId());
            Preconditions.checkState(cteScanNode != null, "cte scan node is null");
            cteScanNode.setFragment(upstreamFragment);
            cteScanNode.setPlanNodeId(exchangeNode.getId());
            context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
                    runtimeFilterTranslator.getContext().getPlanNodeIdToCTEDataSinkMap()
                            .put(cteScanNode.getId(), dataStreamSink));
        } else {
            /*
             * FragmentA (NodeA) ---> FragmentB (NodeB)
             * ↓
             * FragmentA (NodeA -> DataStreamSink) ---> FragmentB (ExchangeNode -> NodeB)
             *                                ↓-----------------------↑
             */
            upstreamFragment.setDestination(exchangeNode);
            // by exchange, upstreamFragment transform itselves partition to exchange's partition
            upstreamFragment.setOutputPartition(targetDataPartition);
            DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId());
            streamSink.setOutputPartition(targetDataPartition);
            upstreamFragment.setSink(streamSink);
        }

        context.addPlanFragment(downstreamFragment);
        return downstreamFragment;
    }

    /* ********************************************************************************************
     * sink Node, in lexicographical order
     * ******************************************************************************************** */

    @Override
    public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
            PlanTranslatorContext context) {
        PlanFragment planFragment = physicalResultSink.child().accept(this, context);
        TResultSinkType resultSinkType = context.getConnectContext() != null
                ? context.getConnectContext().getResultSinkType() : null;
        planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId(), resultSinkType));
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalDeferMaterializeResultSink(
            PhysicalDeferMaterializeResultSink<? extends Plan> sink,
            PlanTranslatorContext context) {
        PlanFragment planFragment = visitPhysicalResultSink(sink.getPhysicalResultSink(), context);
        TFetchOption fetchOption = sink.getOlapTable().generateTwoPhaseReadOption(sink.getSelectedIndexId());
        ((ResultSink) planFragment.getSink()).setFetchOption(fetchOption);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalDictionarySink(PhysicalDictionarySink<? extends Plan> dictionarySink,
            PlanTranslatorContext context) {
        // Scan(ABCD) DataStreamSink(ABCD) -> Exchange(ABCD) Sink(CB)
        // source partition is UNPARTITIONED set by exchange node.
        // TODO: after changed ABCD to DCB. check what exchange do here.
        PlanFragment rootFragment = dictionarySink.child().accept(this, context);
        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); // only used for explain string
        // set rootFragment output expr

        DictionarySink sink = new DictionarySink(dictionarySink.getDictionary(), dictionarySink.allowAdaptiveLoad(),
                dictionarySink.getCols().stream().map(Column::getName).collect(Collectors.toList()));
        rootFragment.setSink(sink);
        return rootFragment;
    }

    @Override
    public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink,
            PlanTranslatorContext context) {
        PlanFragment rootFragment = olapTableSink.child().accept(this, context);
        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);

        HashSet<String> partialUpdateCols = new HashSet<>();
        boolean isPartialUpdate = olapTableSink.isPartialUpdate();
        if (isPartialUpdate) {
            for (Column col : olapTableSink.getCols()) {
                partialUpdateCols.add(col.getName());
            }
        }
        TupleDescriptor olapTuple = generateTupleDesc(olapTableSink.getTargetTableSlots(),
                olapTableSink.getTargetTable(), context);
        List<Expr> partitionExprs = olapTableSink.getPartitionExprList().stream()
                .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList());
        Map<Long, Expr> syncMvWhereClauses = new HashMap<>();
        for (Map.Entry<Long, Expression> entry : olapTableSink.getSyncMvWhereClauses().entrySet()) {
            syncMvWhereClauses.put(entry.getKey(), ExpressionTranslator.translate(entry.getValue(), context));
        }
        OlapTableSink sink;
        // This statement is only used in the group_commit mode
        if (context.getConnectContext().isGroupCommit()) {
            sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(), olapTuple,
                    olapTableSink.getTargetTable().getPartitionIds(), olapTableSink.isSingleReplicaLoad(),
                    partitionExprs, syncMvWhereClauses,
                    context.getSessionVariable().getGroupCommit(),
                    ConnectContext.get().getSessionVariable().getEnableInsertStrict() ? 0 : 1);
        } else {
            sink = new OlapTableSink(
                olapTableSink.getTargetTable(),
                olapTuple,
                olapTableSink.getPartitionIds().isEmpty() ? null : olapTableSink.getPartitionIds(),
                olapTableSink.isSingleReplicaLoad(), partitionExprs, syncMvWhereClauses
            );
        }
        sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
        rootFragment.setSink(sink);

        return rootFragment;
    }

    @Override
    public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink,
                                                   PlanTranslatorContext context) {
        PlanFragment rootFragment = hiveTableSink.child().accept(this, context);
        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);

        TupleDescriptor hiveTuple = context.generateTupleDesc();
        List<Column> targetTableColumns = hiveTableSink.getTargetTable().getFullSchema();
        for (Column column : targetTableColumns) {
            SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple);
            slotDesc.setIsMaterialized(true);
            slotDesc.setType(column.getType());
            slotDesc.setColumn(column);
            slotDesc.setIsNullable(column.isAllowNull());
            slotDesc.setAutoInc(column.isAutoInc());
        }
        HiveTableSink sink = new HiveTableSink((HMSExternalTable) hiveTableSink.getTargetTable());
        rootFragment.setSink(sink);
        return rootFragment;
    }

    @Override
    public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink<? extends Plan> icebergTableSink,
                                                      PlanTranslatorContext context) {
        PlanFragment rootFragment = icebergTableSink.child().accept(this, context);
        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);

        TupleDescriptor hiveTuple = context.generateTupleDesc();
        List<Column> targetTableColumns = icebergTableSink.getTargetTable().getFullSchema();
        for (Column column : targetTableColumns) {
            SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple);
            slotDesc.setIsMaterialized(true);
            slotDesc.setType(column.getType());
            slotDesc.setColumn(column);
            slotDesc.setIsNullable(column.isAllowNull());
            slotDesc.setAutoInc(column.isAutoInc());
        }
        IcebergTableSink sink = new IcebergTableSink((IcebergExternalTable) icebergTableSink.getTargetTable());
        rootFragment.setSink(sink);
        return rootFragment;
    }

    @Override
    public PlanFragment visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> jdbcTableSink,
            PlanTranslatorContext context) {
        PlanFragment rootFragment = jdbcTableSink.child().accept(this, context);
        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
        List<Column> targetTableColumns = jdbcTableSink.getCols();
        List<String> insertCols = targetTableColumns.stream()
                .map(Column::getName)
                .collect(Collectors.toList());

        JdbcTableSink sink = new JdbcTableSink(
                ((JdbcExternalTable) jdbcTableSink.getTargetTable()).getJdbcTable(),
                insertCols
        );
        rootFragment.setSink(sink);
        return rootFragment;
    }

    @Override
    public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
            PlanTranslatorContext context) {
        PlanFragment sinkFragment = fileSink.child().accept(this, context);
        OutFileClause outFile = new OutFileClause(
                fileSink.getFilePath(),
                fileSink.getFormat(),
                fileSink.getProperties()
        );

        List<Expr> outputExprs = Lists.newArrayList();
        fileSink.getOutput().stream().map(Slot::getExprId)
                .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
        sinkFragment.setOutputExprs(outputExprs);

        // generate colLabels
        List<String> labels = fileSink.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList());

        // TODO: should not call legacy planner analyze in Nereids
        try {
            outFile.analyze(null, outputExprs, labels);
        } catch (Exception e) {
            throw new AnalysisException(e.getMessage(), e.getCause());
        }
        ResultFileSink resultFileSink = new ResultFileSink(sinkFragment.getPlanRoot().getId(), outFile,
                (ArrayList<String>) labels);

        sinkFragment.setSink(resultFileSink);
        return sinkFragment;
    }

    /* ********************************************************************************************
     * scan Node, in lexicographical order
     * ******************************************************************************************** */

    @Override
    public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTranslatorContext context) {
        List<Slot> slots = fileScan.getOutput();
        ExternalTable table = fileScan.getTable();
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);

        SessionVariable sv = ConnectContext.get().getSessionVariable();
        // TODO(cmy): determine the needCheckColumnPriv param
        ScanNode scanNode;
        if (table instanceof HMSExternalTable) {
            if (directoryLister == null) {
                this.directoryLister = new TransactionScopeCachingDirectoryListerFactory(
                        Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister());
            }
            switch (((HMSExternalTable) table).getDlaType()) {
                case ICEBERG:
                    scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv);
                    break;
                case HIVE:
                    scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv, directoryLister);
                    HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
                    hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
                    if (fileScan.getTableSample().isPresent()) {
                        hiveScanNode.setTableSample(new TableSample(fileScan.getTableSample().get().isPercent,
                                fileScan.getTableSample().get().sampleValue, fileScan.getTableSample().get().seek));
                    }
                    break;
                default:
                    throw new RuntimeException("do not support DLA type " + ((HMSExternalTable) table).getDlaType());
            }
        } else if (table instanceof IcebergExternalTable) {
            scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv);
        } else if (table instanceof PaimonExternalTable) {
            scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv);
        } else if (table instanceof TrinoConnectorExternalTable) {
            scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv);
        } else if (table instanceof MaxComputeExternalTable) {
            scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor,
                    fileScan.getSelectedPartitions(), false, sv);
        } else if (table instanceof LakeSoulExternalTable) {
            scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv);
        } else {
            throw new RuntimeException("do not support table type " + table.getType());
        }
        if (fileScan.getTableSnapshot().isPresent() && scanNode instanceof FileQueryScanNode) {
            ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
        }
        return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
    }

    @Override
    public PlanFragment visitPhysicalEmptyRelation(PhysicalEmptyRelation emptyRelation, PlanTranslatorContext context) {
        List<Slot> output = emptyRelation.getOutput();
        TupleDescriptor tupleDescriptor = generateTupleDesc(output, null, context);
        for (Slot slot : output) {
            SlotRef slotRef = context.findSlotRef(slot.getExprId());
            slotRef.setLabel(slot.getName());
        }

        ArrayList<TupleId> tupleIds = new ArrayList<>();
        tupleIds.add(tupleDescriptor.getId());
        EmptySetNode emptySetNode = new EmptySetNode(context.nextPlanNodeId(), tupleIds);
        emptySetNode.setNereidsId(emptyRelation.getId());
        context.getNereidsIdToPlanNodeIdMap().put(emptyRelation.getId(), emptySetNode.getId());
        PlanFragment planFragment = createPlanFragment(emptySetNode,
                DataPartition.UNPARTITIONED, emptyRelation);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), emptyRelation);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalEsScan(PhysicalEsScan esScan, PlanTranslatorContext context) {
        List<Slot> slots = esScan.getOutput();
        TableIf table = esScan.getTable();
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
        EsScanNode esScanNode = new EsScanNode(context.nextPlanNodeId(), tupleDescriptor,
                table instanceof EsExternalTable);
        esScanNode.setNereidsId(esScan.getId());
        context.getNereidsIdToPlanNodeIdMap().put(esScan.getId(), esScanNode.getId());
        Utils.execWithUncheckedException(esScanNode::init);
        context.addScanNode(esScanNode, esScan);
        context.getRuntimeTranslator().ifPresent(
                runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(esScan).forEach(
                        expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
                )
        );
        context.getTopnFilterContext().translateTarget(esScan, esScanNode, context);
        DataPartition dataPartition = DataPartition.RANDOM;
        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), esScan);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTranslatorContext context) {
        if (directoryLister == null) {
            this.directoryLister = new TransactionScopeCachingDirectoryListerFactory(
                    Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister());
        }
        List<Slot> slots = fileScan.getOutput();
        ExternalTable table = fileScan.getTable();
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);

        if (!(table instanceof HMSExternalTable) || ((HMSExternalTable) table).getDlaType() != DLAType.HUDI) {
            throw new RuntimeException("Invalid table type for Hudi scan: " + table.getType());
        }
        Preconditions.checkState(fileScan instanceof PhysicalHudiScan,
                "Invalid physical scan: " + fileScan.getClass().getSimpleName()
                        + " for Hudi table");
        PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
        ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
                hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), ConnectContext.get().getSessionVariable(),
                directoryLister);
        if (fileScan.getTableSnapshot().isPresent()) {
            ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
        }
        HudiScanNode hudiScanNode = (HudiScanNode) scanNode;
        hudiScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
        return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
    }

    @NotNull
    private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileScan, PlanTranslatorContext context,
            ScanNode scanNode,
            ExternalTable table, TupleDescriptor tupleDescriptor) {
        scanNode.setNereidsId(fileScan.getId());
        context.getNereidsIdToPlanNodeIdMap().put(fileScan.getId(), scanNode.getId());
        scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId()));

        TableName tableName = new TableName(null, "", "");
        TableRef ref = new TableRef(tableName, null, null);
        BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
        tupleDescriptor.setRef(tableRef);
        if (fileScan.getStats() != null) {
            scanNode.setCardinality((long) fileScan.getStats().getRowCount());
        }
        Utils.execWithUncheckedException(scanNode::init);
        context.addScanNode(scanNode, fileScan);
        ScanNode finalScanNode = scanNode;
        context.getRuntimeTranslator().ifPresent(
                runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
                        expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
                )
        );
        context.getTopnFilterContext().translateTarget(fileScan, scanNode, context);
        // Create PlanFragment
        DataPartition dataPartition = DataPartition.RANDOM;
        PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTranslatorContext context) {
        List<Slot> slots = jdbcScan.getOutput();
        TableIf table = jdbcScan.getTable();
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
        JdbcScanNode jdbcScanNode = new JdbcScanNode(context.nextPlanNodeId(), tupleDescriptor,
                table instanceof JdbcExternalTable);
        jdbcScanNode.setNereidsId(jdbcScan.getId());
        context.getNereidsIdToPlanNodeIdMap().put(jdbcScan.getId(), jdbcScanNode.getId());
        Utils.execWithUncheckedException(jdbcScanNode::init);
        context.addScanNode(jdbcScanNode, jdbcScan);
        context.getRuntimeTranslator().ifPresent(
                runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(jdbcScan).forEach(
                        expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
                )
        );
        context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode, context);
        DataPartition dataPartition = DataPartition.RANDOM;
        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), jdbcScan);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTranslatorContext context) {
        List<Slot> slots = odbcScan.getOutput();
        TableIf table = odbcScan.getTable();
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
        OdbcScanNode odbcScanNode = new OdbcScanNode(context.nextPlanNodeId(), tupleDescriptor,
                (OdbcTable) table);
        odbcScanNode.setNereidsId(odbcScan.getId());
        context.getNereidsIdToPlanNodeIdMap().put(odbcScan.getId(), odbcScanNode.getId());
        Utils.execWithUncheckedException(odbcScanNode::init);
        context.addScanNode(odbcScanNode, odbcScan);
        context.getRuntimeTranslator().ifPresent(
                runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(odbcScan).forEach(
                        expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
                )
        );
        context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
        context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
        DataPartition dataPartition = DataPartition.RANDOM;
        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), odbcScan);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTranslatorContext context) {
        return computePhysicalOlapScan(olapScan, false, context);
    }

    private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan,
            boolean lazyMaterialize, PlanTranslatorContext context) {
        List<Slot> slots = olapScan.getOutput();
        OlapTable olapTable = olapScan.getTable();
        // generate real output tuple
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, context);
        // generate base index tuple because this fragment partitioned expr relay on slots of based index
        if (olapScan.getSelectedIndexId() != olapScan.getTable().getBaseIndexId()) {
            generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context);
        }

        OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode");
        olapScanNode.setNereidsId(olapScan.getId());
        context.getNereidsIdToPlanNodeIdMap().put(olapScan.getId(), olapScanNode.getId());
        // TODO: move all node set cardinality into one place
        if (olapScan.getStats() != null) {
            // NOTICE: we should not set stats row count
            //   because it is whole table cardinality and will break block rules.
            // olapScanNode.setCardinality((long) olapScan.getStats().getRowCount());
            if (context.getSessionVariable() != null && context.getSessionVariable().forbidUnknownColStats) {
                for (int i = 0; i < slots.size(); i++) {
                    SlotReference slot = (SlotReference) slots.get(i);
                    boolean inVisibleCol = slot.getOriginalColumn().isPresent()
                            && StatisticConstants.shouldIgnoreCol(olapTable, slot.getOriginalColumn().get());
                    if (olapScan.getStats().findColumnStatistics(slot).isUnKnown()
                            && !isComplexDataType(slot.getDataType())
                            && !StatisticConstants.isSystemTable(olapTable)
                            && !inVisibleCol) {
                        context.addUnknownStatsColumn(olapScanNode, tupleDescriptor.getSlots().get(i).getId());
                    }
                }
            }
        }
        // TODO: Do we really need tableName here?
        TableName tableName = new TableName(null, "", "");
        TableRef ref = new TableRef(tableName, null, null);
        BaseTableRef tableRef = new BaseTableRef(ref, olapTable, tableName);
        tupleDescriptor.setRef(tableRef);
        olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds());
        olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds());
        if (olapScan.getTableSample().isPresent()) {
            olapScanNode.setTableSample(new TableSample(olapScan.getTableSample().get().isPercent,
                    olapScan.getTableSample().get().sampleValue, olapScan.getTableSample().get().seek));
        }

        // TODO:  remove this switch?
        switch (olapScan.getTable().getKeysType()) {
            case AGG_KEYS:
            case UNIQUE_KEYS:
            case DUP_KEYS:
                PreAggStatus preAgg = olapScan.getPreAggStatus();
                olapScanNode.setSelectedIndexInfo(olapScan.getSelectedIndexId(), preAgg.isOn(), preAgg.getOffReason());
                break;
            default:
                throw new RuntimeException("Not supported key type: " + olapScan.getTable().getKeysType());
        }

        // create scan range
        Utils.execWithUncheckedException(olapScanNode::init);
        // TODO: process collect scan node in one place
        context.addScanNode(olapScanNode, olapScan);
        if (!lazyMaterialize) {
            // TODO: process translate runtime filter in one place
            //   use real plan node to present rf apply and rf generator
            context.getRuntimeTranslator().ifPresent(
                    runtimeFilterTranslator -> runtimeFilterTranslator.getContext().getTargetListByScan(olapScan)
                            .forEach(expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(
                                    expr, olapScanNode, context)
                            )
            );
            context.getTopnFilterContext().translateTarget(olapScan, olapScanNode, context);
        }
        olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
        // Create PlanFragment
        // TODO: use a util function to convert distribution to DataPartition
        DataPartition dataPartition = DataPartition.RANDOM;
        if (olapScan.getDistributionSpec() instanceof DistributionSpecHash) {
            DistributionSpecHash distributionSpecHash = (DistributionSpecHash) olapScan.getDistributionSpec();
            List<Expr> partitionExprs = distributionSpecHash.getOrderedShuffledColumns().stream()
                    .map(context::findSlotRef).collect(Collectors.toList());
            dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
        }
        if (olapScan.getStats() != null) {
            olapScanNode.setCardinality((long) olapScan.getStats().getRowCount());
        }
        // TODO: maybe we could have a better way to create fragment
        PlanFragment planFragment = createPlanFragment(olapScanNode, dataPartition, olapScan);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), olapScan);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalDeferMaterializeOlapScan(
            PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) {
        PlanFragment planFragment = visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
        OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
        TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
        for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
            if (deferMaterializeOlapScan.getDeferMaterializeSlotIds()
                    .contains(context.findExprId(slotDescriptor.getId()))) {
                slotDescriptor.setNeedMaterialize(false);
            }
        }
        context.createSlotDesc(tupleDescriptor, deferMaterializeOlapScan.getColumnIdSlot());
        context.getTopnFilterContext().translateTarget(deferMaterializeOlapScan, olapScanNode, context);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation,
            PlanTranslatorContext context) {
        List<Slot> slots = oneRowRelation.getLogicalProperties().getOutput();
        TupleDescriptor oneRowTuple = generateTupleDesc(slots, null, context);

        List<Expr> legacyExprs = oneRowRelation.getProjects()
                .stream()
                .map(expr -> ExpressionTranslator.translate(expr, context))
                .collect(Collectors.toList());

        for (int i = 0; i < legacyExprs.size(); i++) {
            SlotDescriptor slotDescriptor = oneRowTuple.getSlots().get(i);
            Expr expr = legacyExprs.get(i);
            slotDescriptor.setSourceExpr(expr);
            slotDescriptor.setIsNullable(slots.get(i).nullable());
        }

        UnionNode unionNode = new UnionNode(context.nextPlanNodeId(), oneRowTuple.getId());
        unionNode.setNereidsId(oneRowRelation.getId());
        context.getNereidsIdToPlanNodeIdMap().put(oneRowRelation.getId(), unionNode.getId());
        unionNode.setCardinality(1L);
        unionNode.addConstExprList(legacyExprs);
        unionNode.finalizeForNereids(oneRowTuple.getSlots(), new ArrayList<>());

        PlanFragment planFragment = createPlanFragment(unionNode, DataPartition.UNPARTITIONED, oneRowRelation);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), oneRowRelation);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanTranslatorContext context) {
        TableIf table = schemaScan.getTable();
        List<Slot> slots = ImmutableList.copyOf(schemaScan.getOutput());
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);

        // For the information_schema.rowsets table, the scan fragment needs to be sent to all BEs.
        // For other information_schema tables, the scan fragment only needs to be sent to one of the BEs.
        SchemaScanNode scanNode = null;
        if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
                table.getName())) {
            scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), table, tupleDescriptor,
                schemaScan.getSchemaCatalog().orElse(null), schemaScan.getSchemaDatabase().orElse(null),
                schemaScan.getSchemaTable().orElse(null));
        } else {
            scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
                schemaScan.getSchemaCatalog().orElse(null), schemaScan.getSchemaDatabase().orElse(null),
                schemaScan.getSchemaTable().orElse(null));
        }
        scanNode.setNereidsId(schemaScan.getId());
        context.getNereidsIdToPlanNodeIdMap().put(schemaScan.getId(), scanNode.getId());
        SchemaScanNode finalScanNode = scanNode;
        context.getRuntimeTranslator().ifPresent(
                runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(schemaScan)
                        .forEach(expr -> runtimeFilterGenerator
                                .translateRuntimeFilterTarget(expr, finalScanNode, context)
                )
        );
        context.addScanNode(scanNode, schemaScan);
        PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), schemaScan);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, PlanTranslatorContext context) {
        List<Slot> slots = tvfRelation.getLogicalProperties().getOutput();
        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, tvfRelation.getFunction().getTable(), context);

        TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction();
        SessionVariable sv = ConnectContext.get().getSessionVariable();
        ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv);
        scanNode.setNereidsId(tvfRelation.getId());
        context.getNereidsIdToPlanNodeIdMap().put(tvfRelation.getId(), scanNode.getId());
        Utils.execWithUncheckedException(scanNode::init);
        context.getRuntimeTranslator().ifPresent(
                runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(tvfRelation)
                        .forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
                )
        );
        context.addScanNode(scanNode, tvfRelation);

        // TODO: it is weird update label in this way
        // set label for explain
        for (Slot slot : slots) {
            String tableColumnName = TableValuedFunctionIf.TVF_TABLE_PREFIX + tvfRelation.getFunction().getName()
                    + "." + slot.getName();
            context.findSlotRef(slot.getExprId()).setLabel(tableColumnName);
        }

        PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, tvfRelation);
        context.addPlanFragment(planFragment);
        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), tvfRelation);
        return planFragment;
    }


    /* ********************************************************************************************
     * other Node, in lexicographical order, ignore algorithm name. for example, HashAggregate -> Aggregate
     * ******************************************************************************************** */

    /**
     * Translate Agg.
     */
    @Override
    public PlanFragment visitPhysicalHashAggregate(
            PhysicalHashAggregate<? extends Plan> aggregate,
            PlanTranslatorContext context) {

        PlanFragment inputPlanFragment = aggregate.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(aggregate.child(0));

        List<Expression> groupByExpressions = aggregate.getGroupByExpressions();
        List<NamedExpression> outputExpressions = aggregate.getOutputExpressions();

        // 1. generate slot reference for each group expression
        List<SlotReference> groupSlots = collectGroupBySlots(groupByExpressions, outputExpressions);
        ArrayList<Expr> execGroupingExpressions = groupByExpressions.stream()
                .map(e -> {
                    Expr result = ExpressionTranslator.translate(e, context);
                    if (result == null) {
                        throw new RuntimeException("translate " + e + " failed");
                    }
                    return result;
                })
                .collect(Collectors.toCollection(ArrayList::new));
        // 2. collect agg expressions and generate agg function to slot reference map
        List<Slot> aggFunctionOutput = Lists.newArrayList();
        List<AggregateExpression> aggregateExpressionList = outputExpressions.stream()
                .filter(o -> o.anyMatch(AggregateExpression.class::isInstance))
                .peek(o -> aggFunctionOutput.add(o.toSlot()))
                .map(o -> o.<AggregateExpression>collect(AggregateExpression.class::isInstance))
                .flatMap(Set::stream)
                .collect(Collectors.toList());
        ArrayList<FunctionCallExpr> execAggregateFunctions = aggregateExpressionList.stream()
                .map(aggregateFunction -> (FunctionCallExpr) ExpressionTranslator.translate(aggregateFunction, context))
                .collect(Collectors.toCollection(ArrayList::new));

        // 3. generate output tuple
        List<Slot> slotList = Lists.newArrayList();
        TupleDescriptor outputTupleDesc;
        slotList.addAll(groupSlots);
        slotList.addAll(aggFunctionOutput);
        outputTupleDesc = generateTupleDesc(slotList, null, context);

        List<Integer> aggFunOutputIds = ImmutableList.of();
        if (!aggFunctionOutput.isEmpty()) {
            aggFunOutputIds = outputTupleDesc
                    .getSlots()
                    .subList(groupSlots.size(), outputTupleDesc.getSlots().size())
                    .stream()
                    .map(slot -> slot.getId().asInt())
                    .collect(ImmutableList.toImmutableList());
        }
        boolean isPartial = aggregate.getAggregateParam().aggMode.productAggregateBuffer;
        AggregateInfo aggInfo = AggregateInfo.create(execGroupingExpressions, execAggregateFunctions,
                aggFunOutputIds, isPartial, outputTupleDesc, outputTupleDesc, aggregate.getAggPhase().toExec());
        AggregationNode aggregationNode = new AggregationNode(context.nextPlanNodeId(),
                inputPlanFragment.getPlanRoot(), aggInfo);

        aggregationNode.setChildrenDistributeExprLists(distributeExprLists);

        aggregationNode.setNereidsId(aggregate.getId());
        context.getNereidsIdToPlanNodeIdMap().put(aggregate.getId(), aggregationNode.getId());
        if (!aggregate.getAggMode().isFinalPhase) {
            aggregationNode.unsetNeedsFinalize();
        }

        switch (aggregate.getAggPhase()) {
            case LOCAL:
                // we should set is useStreamingAgg when has exchange,
                // so the `aggregationNode.setUseStreamingPreagg()` in the visitPhysicalDistribute
                break;
            case DISTINCT_LOCAL:
                aggregationNode.setIntermediateTuple();
                break;
            case GLOBAL:
            case DISTINCT_GLOBAL:
                break;
            default:
                throw new RuntimeException("Unsupported agg phase: " + aggregate.getAggPhase());
        }
        // TODO: use to set useStreamingAgg, we should remove it by set it in Nereids
        PhysicalHashAggregate firstAggregateInFragment = context.getFirstAggregateInFragment(inputPlanFragment);
        if (firstAggregateInFragment == null) {
            context.setFirstAggregateInFragment(inputPlanFragment, aggregate);
        }

        // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
        // so, if we do final phase or merge without exchange.
        // we need turn of parallel scan to ensure to get correct result.
        PlanNode leftMostNode = inputPlanFragment.getPlanRoot();
        while (leftMostNode.getChildren().size() != 0 && !(leftMostNode instanceof ExchangeNode)) {
            leftMostNode = leftMostNode.getChild(0);
        }
        // TODO: nereids forbid all parallel scan under aggregate temporary, because nereids could generate
        //  so complex aggregate plan than legacy planner, and should add forbid parallel scan hint when
        //  generate physical aggregate plan.
        //  There is one exception, we use some precondition in optimizer, input to buffer always require any for input,
        //  so when agg mode is INPUT_TO_BUFFER, we do not forbid parallel scan
        if (aggregate.getAggregateParam().aggMode != AggMode.INPUT_TO_BUFFER) {
            inputPlanFragment.setHasColocatePlanNode(true);
            // Set colocate info in agg node. This is a hint for local shuffling to decide which type of
            // local exchanger will be used.
            aggregationNode.setColocate(true);

            Plan child = aggregate.child();
            // we should set colocate = true, when the same LogicalAggregate generate two PhysicalHashAggregates
            // in one fragment:
            //
            // agg(merge finalize)   <- current, set colocate = true
            //          |
            // agg(update serialize) <- child, also set colocate = true
            if (aggregate.getAggregateParam().aggMode.consumeAggregateBuffer
                    && child instanceof PhysicalHashAggregate
                    && !((PhysicalHashAggregate<Plan>) child).getAggregateParam().aggMode.consumeAggregateBuffer
                    && inputPlanFragment.getPlanRoot() instanceof AggregationNode) {
                AggregationNode childAgg = (AggregationNode) inputPlanFragment.getPlanRoot();
                childAgg.setColocate(true);
            }
        }
        if (aggregate.getTopnPushInfo() != null) {
            List<Expr> orderingExprs = Lists.newArrayList();
            List<Boolean> ascOrders = Lists.newArrayList();
            List<Boolean> nullsFirstParams = Lists.newArrayList();
            aggregate.getTopnPushInfo().orderkeys.forEach(k -> {
                orderingExprs.add(ExpressionTranslator.translate(k.getExpr(), context));
                ascOrders.add(k.isAsc());
                nullsFirstParams.add(k.isNullFirst());
            });
            SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, nullsFirstParams, outputTupleDesc);
            aggregationNode.setSortByGroupKey(sortInfo);
            if (aggregationNode.getLimit() == -1) {
                aggregationNode.setLimit(aggregate.getTopnPushInfo().limit);
            }
        } else {
            aggregationNode.setSortByGroupKey(null);
        }
        setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
        if (aggregate.getStats() != null) {
            aggregationNode.setCardinality((long) aggregate.getStats().getRowCount());
        }
        updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(), aggregate);
        return inputPlanFragment;
    }

    @Override
    public PlanFragment visitPhysicalStorageLayerAggregate(
            PhysicalStorageLayerAggregate storageLayerAggregate, PlanTranslatorContext context) {
        Preconditions.checkState((storageLayerAggregate.getRelation() instanceof PhysicalOlapScan
                        || storageLayerAggregate.getRelation() instanceof PhysicalFileScan),
                "PhysicalStorageLayerAggregate only support PhysicalOlapScan and PhysicalFileScan: "
                        + storageLayerAggregate.getRelation().getClass().getName());

        TPushAggOp pushAggOp;
        switch (storageLayerAggregate.getAggOp()) {
            case COUNT:
                pushAggOp = TPushAggOp.COUNT;
                break;
            case COUNT_ON_MATCH:
                pushAggOp = TPushAggOp.COUNT_ON_INDEX;
                break;
            case MIN_MAX:
                pushAggOp = TPushAggOp.MINMAX;
                break;
            case MIX:
                pushAggOp = TPushAggOp.MIX;
                break;
            default:
                throw new AnalysisException("Unsupported storage layer aggregate: "
                        + storageLayerAggregate.getAggOp());
        }

        if (storageLayerAggregate.getRelation() instanceof PhysicalFileScan
                && pushAggOp.equals(TPushAggOp.COUNT)
                && !ConnectContext.get().getSessionVariable().isEnableCountPushDownForExternalTable()) {
            pushAggOp = TPushAggOp.NONE;
        }

        context.setRelationPushAggOp(
                storageLayerAggregate.getRelation().getRelationId(), pushAggOp);

        PlanFragment planFragment = storageLayerAggregate.getRelation().accept(this, context);

        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), storageLayerAggregate);
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
            PlanTranslatorContext context) {
        PlanFragment currentFragment = assertNumRows.child().accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(assertNumRows.child());

        // we need convert all columns to nullable in AssertNumRows node
        // create a tuple for AssertNumRowsNode
        TupleDescriptor tupleDescriptor = context.generateTupleDesc();
        // create assertNode
        AssertNumRowsNode assertNumRowsNode = new AssertNumRowsNode(context.nextPlanNodeId(),
                currentFragment.getPlanRoot(),
                ExpressionTranslator.translateAssert(assertNumRows.getAssertNumRowsElement()), true, tupleDescriptor);
        assertNumRowsNode.setChildrenDistributeExprLists(distributeExprLists);
        assertNumRowsNode.setNereidsId(assertNumRows.getId());
        context.getNereidsIdToPlanNodeIdMap().put(assertNumRows.getId(), assertNumRowsNode.getId());

        // collect all child output slots
        List<TupleDescriptor> childTuples = context.getTupleDesc(currentFragment.getPlanRoot());
        List<SlotDescriptor> childSlotDescriptors = childTuples.stream()
                .map(TupleDescriptor::getSlots)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());

        // create output slot based on child output
        Map<ExprId, SlotReference> childOutputMap = Maps.newHashMap();
        assertNumRows.child().getOutput().stream()
                .map(SlotReference.class::cast)
                .forEach(s -> childOutputMap.put(s.getExprId(), s));
        List<SlotDescriptor> slotDescriptors = Lists.newArrayList();
        for (SlotDescriptor slot : childSlotDescriptors) {
            SlotReference sf = childOutputMap.get(context.findExprId(slot.getId()));
            SlotDescriptor sd = context.createSlotDesc(tupleDescriptor, sf, slot.getParent().getTable());
            slotDescriptors.add(sd);
        }

        // set all output slot nullable
        slotDescriptors.forEach(sd -> sd.setIsNullable(true));

        addPlanRoot(currentFragment, assertNumRowsNode, assertNumRows);
        return currentFragment;
    }

    /**
     * NOTICE: Must translate left, which it's the producer of consumer.
     */
    @Override
    public PlanFragment visitPhysicalCTEAnchor(PhysicalCTEAnchor<? extends Plan, ? extends Plan> cteAnchor,
            PlanTranslatorContext context) {
        cteAnchor.child(0).accept(this, context);
        return cteAnchor.child(1).accept(this, context);
    }

    @Override
        public PlanFragment visitPhysicalCTEConsumer(PhysicalCTEConsumer cteConsumer,
            PlanTranslatorContext context) {
        CTEId cteId = cteConsumer.getCteId();

        MultiCastPlanFragment multiCastFragment = (MultiCastPlanFragment) context.getCteProduceFragments().get(cteId);
        Preconditions.checkState(multiCastFragment.getSink() instanceof MultiCastDataSink,
                "invalid multiCastFragment");

        MultiCastDataSink multiCastDataSink = (MultiCastDataSink) multiCastFragment.getSink();
        Preconditions.checkState(multiCastDataSink != null, "invalid multiCastDataSink");

        PhysicalCTEProducer<?> cteProducer = context.getCteProduceMap().get(cteId);
        Preconditions.checkState(cteProducer != null, "invalid cteProducer");

        context.getCteConsumerMap().put(cteId, cteConsumer);
        // set datasink to multicast data sink but do not set target now
        // target will be set when translate distribute
        DataStreamSink streamSink = new DataStreamSink();
        streamSink.setFragment(multiCastFragment);
        multiCastDataSink.getDataStreamSinks().add(streamSink);
        multiCastDataSink.getDestinations().add(Lists.newArrayList());

        // update expr to slot mapping
        TupleDescriptor tupleDescriptor = null;
        for (Slot producerSlot : cteProducer.getOutput()) {
            SlotRef slotRef = context.findSlotRef(producerSlot.getExprId());
            tupleDescriptor = slotRef.getDesc().getParent();
            for (Slot consumerSlot : cteConsumer.getProducerToConsumerSlotMap().get(producerSlot)) {
                context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
            }
        }
        CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
        context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
                    runtimeFilterTranslator.getContext().getTargetListByScan(cteConsumer).forEach(
                            expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode, context)));
        context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode);

        return multiCastFragment;
    }

    @Override
    public PlanFragment visitPhysicalCTEProducer(PhysicalCTEProducer<? extends Plan> cteProducer,
            PlanTranslatorContext context) {
        PlanFragment child = cteProducer.child().accept(this, context);
        CTEId cteId = cteProducer.getCteId();
        context.getPlanFragments().remove(child);

        MultiCastPlanFragment multiCastPlanFragment = new MultiCastPlanFragment(child);
        MultiCastDataSink multiCastDataSink = new MultiCastDataSink();
        multiCastPlanFragment.setSink(multiCastDataSink);

        List<Expr> outputs = cteProducer.getOutput().stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .collect(Collectors.toList());

        multiCastPlanFragment.setOutputExprs(outputs);
        context.getCteProduceFragments().put(cteId, multiCastPlanFragment);
        context.getCteProduceMap().put(cteId, cteProducer);
        context.getPlanFragments().add(multiCastPlanFragment);
        return child;
    }

    @Override
    public PlanFragment visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, PlanTranslatorContext context) {
        if (filter.child(0) instanceof AbstractPhysicalJoin) {
            AbstractPhysicalJoin<?, ?> join = (AbstractPhysicalJoin<?, ?>) filter.child();
            join.addFilterConjuncts(filter.getConjuncts());
        }
        PlanFragment inputFragment = filter.child(0).accept(this, context);

        // process multicast sink
        if (inputFragment instanceof MultiCastPlanFragment) {
            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
                    multiCastDataSink.getDataStreamSinks().size() - 1);
            if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts())
                    || CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
                String errMsg = "generate invalid plan \n" + filter.treeString();
                LOG.warn(errMsg);
                throw new AnalysisException(errMsg);
            }
            filter.getConjuncts().stream()
                    .map(e -> ExpressionTranslator.translate(e, context))
                    .forEach(dataStreamSink::addConjunct);
            return inputFragment;
        }

        PlanNode planNode = inputFragment.getPlanRoot();
        // the three nodes don't support conjuncts, need create a SelectNode to filter data
        if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) {
            SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode);
            selectNode.setNereidsId(filter.getId());
            context.getNereidsIdToPlanNodeIdMap().put(filter.getId(), selectNode.getId());
            addConjunctsToPlanNode(filter, selectNode, context);
            addPlanRoot(inputFragment, selectNode, filter);
        } else {
            if (!(filter.child(0) instanceof AbstractPhysicalJoin)) {
                // already have filter on this node, we should not override it, so need a new node
                if (!planNode.getConjuncts().isEmpty()
                        // already have project on this node, filter need execute after project, so need a new node
                        || CollectionUtils.isNotEmpty(planNode.getProjectList())
                        // already have limit on this node, filter need execute after limit, so need a new node
                        || planNode.hasLimit()) {
                    planNode = new SelectNode(context.nextPlanNodeId(), planNode);
                    planNode.setNereidsId(filter.getId());
                    // NOTE: can't collect planNode.getId() on filter's child, such as scan node
                    // since if the filter is embedded into scan, the id mapping relation is not correct
                    // i.e, the physical filter's nereids's id will be mapped to final plan's scan node
                    context.getNereidsIdToPlanNodeIdMap().put(filter.getId(), planNode.getId());
                    addPlanRoot(inputFragment, planNode, filter);
                }
                addConjunctsToPlanNode(filter, planNode, context);
            }
        }
        updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter);
        // in ut, filter.stats may be null
        if (filter.getStats() != null) {
            inputFragment.getPlanRoot().setCardinalityAfterFilter((long) filter.getStats().getRowCount());
        }
        return inputFragment;
    }

    @Override
    public PlanFragment visitPhysicalGenerate(PhysicalGenerate<? extends Plan> generate,
            PlanTranslatorContext context) {
        PlanFragment currentFragment = generate.child().accept(this, context);
        ArrayList<Expr> functionCalls = generate.getGenerators().stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .collect(Collectors.toCollection(ArrayList::new));
        TupleDescriptor tupleDescriptor = generateTupleDesc(generate.getGeneratorOutput(), null, context);
        List<TupleId> childOutputTupleIds = currentFragment.getPlanRoot().getOutputTupleIds();
        if (childOutputTupleIds == null || childOutputTupleIds.isEmpty()) {
            childOutputTupleIds = currentFragment.getPlanRoot().getTupleIds();
        }
        List<SlotId> outputSlotIds = Stream.concat(childOutputTupleIds.stream(),
                        Stream.of(tupleDescriptor.getId()))
                .map(id -> context.getTupleDesc(id).getSlots())
                .flatMap(List::stream)
                .map(SlotDescriptor::getId)
                .collect(Collectors.toList());
        TableFunctionNode tableFunctionNode = new TableFunctionNode(context.nextPlanNodeId(),
                currentFragment.getPlanRoot(), tupleDescriptor.getId(), functionCalls, outputSlotIds);
        tableFunctionNode.setNereidsId(generate.getId());
        context.getNereidsIdToPlanNodeIdMap().put(generate.getId(), tableFunctionNode.getId());
        addPlanRoot(currentFragment, tableFunctionNode, generate);
        return currentFragment;
    }

    /**
     * the contract of hash join node with BE
     * 1. hash join contains 3 types of predicates:
     *   a. equal join conjuncts
     *   b. other join conjuncts
     *   c. other predicates (denoted by filter conjuncts in the rest of comments)
     * <p>
     * 2. hash join contains 3 tuple descriptors
     *   a. input tuple descriptors, corresponding to the left child output and right child output.
     *      If its column is selected, it will be displayed in explain by `tuple ids`.
     *      for example, select L.* from L join R on ..., because no column from R are selected, tuple ids only
     *      contains output tuple of L.
     *      equal join conjuncts is bound on input tuple descriptors.
     * <p>
     *   b.intermediate tuple.
     *      This tuple describes schema of the output block after evaluating equal join conjuncts
     *      and other join conjuncts.
     * <p>
     *      Other join conjuncts currently is bound on intermediate tuple. There are some historical reason, and it
     *      should be bound on input tuple in the future.
     * <p>
     *      filter conjuncts will be evaluated on the intermediate tuple. That means the input block of filter is
     *      described by intermediate tuple, and hence filter conjuncts should be bound on intermediate tuple.
     * <p>
     *      In order to be compatible with old version, intermediate tuple is not pruned. For example, intermediate
     *      tuple contains all slots from both sides of children. After probing hash-table, BE does not need to
     *      materialize all slots in intermediate tuple. The slots in HashJoinNode.hashOutputSlotIds will be
     *      materialized by BE. If `hashOutputSlotIds` is empty, all slots will be materialized.
     * <p>
     *      In case of outer join, the slots in intermediate should be set nullable.
     *      For example,
     *      select L.*, R.* from L left outer join R on ...
     *      All slots from R in intermediate tuple should be nullable.
     * <p>
     *   c. output tuple
     *      This describes the schema of hash join output block.
     * 3. Intermediate tuple
     *      for BE performance reason, the slots in intermediate tuple
     *      depends on the join type and other join conjuncts.
     *      In general, intermediate tuple contains all slots of both children, except one case.
     *      For left-semi/left-ant (right-semi/right-semi) join without other join conjuncts, intermediate tuple
     *      only contains left (right) children output slots.
     *
     */
    @Override
    public PlanFragment visitPhysicalHashJoin(
            PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
            PlanTranslatorContext context) {
        Preconditions.checkArgument(hashJoin.left() instanceof PhysicalPlan,
                "HashJoin's left child should be PhysicalPlan");
        Preconditions.checkArgument(hashJoin.right() instanceof PhysicalPlan,
                "HashJoin's left child should be PhysicalPlan");
        PhysicalHashJoin<PhysicalPlan, PhysicalPlan> physicalHashJoin
                = (PhysicalHashJoin<PhysicalPlan, PhysicalPlan>) hashJoin;
        // NOTICE: We must visit from right to left, to ensure the last fragment is root fragment
        PlanFragment rightFragment = hashJoin.child(1).accept(this, context);
        PlanFragment leftFragment = hashJoin.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(physicalHashJoin.left(), physicalHashJoin.right());

        if (JoinUtils.shouldNestedLoopJoin(hashJoin)) {
            throw new RuntimeException("Physical hash join could not execute without equal join condition.");
        }

        PlanNode leftPlanRoot = leftFragment.getPlanRoot();
        PlanNode rightPlanRoot = rightFragment.getPlanRoot();
        JoinType joinType = hashJoin.getJoinType();

        List<Expr> execEqConjuncts = hashJoin.getHashJoinConjuncts().stream()
                .map(EqualPredicate.class::cast)
                .map(e -> JoinUtils.swapEqualToForChildrenOrder(e, hashJoin.left().getOutputSet()))
                .map(e -> ExpressionTranslator.translate(e, context))
                .collect(Collectors.toList());
        List<Expr> markConjuncts = ImmutableList.of();
        boolean isHashJoinConjunctsEmpty = hashJoin.getHashJoinConjuncts().isEmpty();
        boolean isMarkJoinConjunctsEmpty = hashJoin.getMarkJoinConjuncts().isEmpty();
        JoinOperator joinOperator = JoinType.toJoinOperator(joinType);
        if (isHashJoinConjunctsEmpty) {
            // if hash join conjuncts is empty, means mark join conjuncts must be EqualPredicate
            // BE should use mark join conjuncts to build hash table
            Preconditions.checkState(!isMarkJoinConjunctsEmpty, "mark join conjuncts should not be empty.");
            markConjuncts = hashJoin.getMarkJoinConjuncts().stream()
                    .map(EqualPredicate.class::cast)
                    .map(e -> JoinUtils.swapEqualToForChildrenOrder(e, hashJoin.left().getOutputSet()))
                    .map(e -> ExpressionTranslator.translate(e, context))
                    .collect(Collectors.toList());
            // in order to process semi/anti join with no hash conjunct and having mark conjunct effeciently
            // we can use mark conjunct as hash conjunct with slight different behavior if meets null values
            // so we use null aware semi/anti join to indicate it's a null aware hash conjunct
            // it's unnecessary to introduce new join type like NULL_AWARE_LEFT_SEMI_JOIN in nereids
            // so we translate the join type here to let be known if the hash conjunct is null aware
            if (joinOperator == JoinOperator.LEFT_ANTI_JOIN) {
                joinOperator = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
            } else if (joinOperator == JoinOperator.LEFT_SEMI_JOIN) {
                joinOperator = JoinOperator.NULL_AWARE_LEFT_SEMI_JOIN;
            }
        }

        HashJoinNode hashJoinNode = new HashJoinNode(context.nextPlanNodeId(), leftPlanRoot,
                rightPlanRoot, joinOperator, execEqConjuncts, Lists.newArrayList(), markConjuncts,
                null, null, null, hashJoin.isMarkJoin());
        hashJoinNode.setNereidsId(hashJoin.getId());
        context.getNereidsIdToPlanNodeIdMap().put(hashJoin.getId(), hashJoinNode.getId());
        hashJoinNode.setChildrenDistributeExprLists(distributeExprLists);
        PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin);

        if (JoinUtils.shouldColocateJoin(physicalHashJoin)) {
            // TODO: add reason
            hashJoinNode.setColocate(true, "");
            leftFragment.setHasColocatePlanNode(true);
        } else if (JoinUtils.shouldBroadcastJoin(physicalHashJoin)) {
            Preconditions.checkState(rightPlanRoot instanceof ExchangeNode,
                    "right child of broadcast join must be ExchangeNode but it is " + rightFragment.getPlanRoot());
            Preconditions.checkState(rightFragment.getChildren().size() == 1,
                    "right child of broadcast join must have 1 child, but meet " + rightFragment.getChildren().size());
            ((ExchangeNode) rightPlanRoot).setRightChildOfBroadcastHashJoin(true);
            hashJoinNode.setDistributionMode(DistributionMode.BROADCAST);
        } else if (JoinUtils.shouldBucketShuffleJoin(physicalHashJoin)) {
            hashJoinNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
        } else {
            hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED);
        }

        // Nereids does not care about output order of join,
        // but BE need left child's output must be before right child's output.
        // So we need to swap the output order of left and right child if necessary.
        // TODO: revert this after Nereids could ensure the output order is correct.
        List<TupleDescriptor> leftTuples = context.getTupleDesc(leftPlanRoot);
        List<SlotDescriptor> leftSlotDescriptors = leftTuples.stream()
                .map(TupleDescriptor::getSlots)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
        List<TupleDescriptor> rightTuples = context.getTupleDesc(rightPlanRoot);
        List<SlotDescriptor> rightSlotDescriptors = rightTuples.stream()
                .map(TupleDescriptor::getSlots)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
        Map<ExprId, SlotReference> outputSlotReferenceMap = hashJoin.getOutput().stream()
                .map(SlotReference.class::cast)
                .collect(Collectors.toMap(Slot::getExprId, s -> s, (existing, replacement) -> existing));
        List<SlotReference> outputSlotReferences = Stream.concat(leftTuples.stream(), rightTuples.stream())
                .map(TupleDescriptor::getSlots)
                .flatMap(Collection::stream)
                .map(sd -> context.findExprId(sd.getId()))
                .map(outputSlotReferenceMap::get)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());

        Map<ExprId, SlotReference> hashOutputSlotReferenceMap = Maps.newHashMap(outputSlotReferenceMap);

        hashJoin.getOtherJoinConjuncts()
                .stream()
                .flatMap(e -> e.getInputSlots().stream())
                .map(SlotReference.class::cast)
                .forEach(s -> hashOutputSlotReferenceMap.put(s.getExprId(), s));
        if (!isHashJoinConjunctsEmpty && !isMarkJoinConjunctsEmpty) {
            // if hash join conjuncts is NOT empty, mark join conjuncts would be processed like other conjuncts
            // BE should deal with mark join conjuncts differently, its result is 3 value bool(true, false, null)
            hashJoin.getMarkJoinConjuncts()
                    .stream()
                    .flatMap(e -> e.getInputSlots().stream())
                    .map(SlotReference.class::cast)
                    .forEach(s -> hashOutputSlotReferenceMap.put(s.getExprId(), s));
        }
        hashJoin.getFilterConjuncts().stream()
                .flatMap(e -> e.getInputSlots().stream())
                .map(SlotReference.class::cast)
                .forEach(s -> hashOutputSlotReferenceMap.put(s.getExprId(), s));

        Map<ExprId, SlotReference> leftChildOutputMap = hashJoin.left().getOutput().stream()
                .map(SlotReference.class::cast)
                .collect(Collectors.toMap(Slot::getExprId, s -> s, (existing, replacement) -> existing));
        Map<ExprId, SlotReference> rightChildOutputMap = hashJoin.right().getOutput().stream()
                .map(SlotReference.class::cast)
                .collect(Collectors.toMap(Slot::getExprId, s -> s, (existing, replacement) -> existing));

        // translate runtime filter
        context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> physicalHashJoin.getRuntimeFilters()
                .forEach(filter -> runtimeFilterTranslator.createLegacyRuntimeFilter(filter, hashJoinNode, context)));

        // make intermediate tuple
        List<SlotDescriptor> leftIntermediateSlotDescriptor = Lists.newArrayList();
        List<SlotDescriptor> rightIntermediateSlotDescriptor = Lists.newArrayList();
        TupleDescriptor intermediateDescriptor = context.generateTupleDesc();

        if (hashJoin.getOtherJoinConjuncts().isEmpty() && (isHashJoinConjunctsEmpty != isMarkJoinConjunctsEmpty)
                && (joinType == JoinType.LEFT_ANTI_JOIN
                || joinType == JoinType.LEFT_SEMI_JOIN
                || joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN)) {
            for (SlotDescriptor leftSlotDescriptor : leftSlotDescriptors) {
                if (!leftSlotDescriptor.isMaterialized()) {
                    continue;
                }
                SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
                SlotDescriptor sd;
                if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
                    // TODO: temporary code for two phase read, should remove it after refactor
                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
                } else {
                    sd = context.createSlotDesc(intermediateDescriptor, sf, leftSlotDescriptor.getParent().getTable());
                    if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
                        hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
                        hashJoinNode.getHashOutputExprSlotIdMap().put(sf.getExprId(), leftSlotDescriptor.getId());
                    }
                }
                leftIntermediateSlotDescriptor.add(sd);
            }
        } else if (hashJoin.getOtherJoinConjuncts().isEmpty() && (isHashJoinConjunctsEmpty != isMarkJoinConjunctsEmpty)
                && (joinType == JoinType.RIGHT_ANTI_JOIN || joinType == JoinType.RIGHT_SEMI_JOIN)) {
            for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
                if (!rightSlotDescriptor.isMaterialized()) {
                    continue;
                }
                SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
                SlotDescriptor sd;
                if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
                    // TODO: temporary code for two phase read, should remove it after refactor
                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
                } else {
                    sd = context.createSlotDesc(intermediateDescriptor, sf, rightSlotDescriptor.getParent().getTable());
                    if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
                        hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
                        hashJoinNode.getHashOutputExprSlotIdMap().put(sf.getExprId(), rightSlotDescriptor.getId());
                    }
                }
                rightIntermediateSlotDescriptor.add(sd);
            }
        } else {
            for (SlotDescriptor leftSlotDescriptor : leftSlotDescriptors) {
                if (!leftSlotDescriptor.isMaterialized()) {
                    continue;
                }
                SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
                SlotDescriptor sd;
                if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
                    // TODO: temporary code for two phase read, should remove it after refactor
                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
                } else {
                    sd = context.createSlotDesc(intermediateDescriptor, sf, leftSlotDescriptor.getParent().getTable());
                    // sd = context.createSlotDesc(intermediateDescriptor, sf);
                    if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
                        hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
                        hashJoinNode.getHashOutputExprSlotIdMap().put(sf.getExprId(), leftSlotDescriptor.getId());
                    }
                }
                leftIntermediateSlotDescriptor.add(sd);
            }
            for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
                if (!rightSlotDescriptor.isMaterialized()) {
                    continue;
                }
                SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
                SlotDescriptor sd;
                if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
                    // TODO: temporary code for two phase read, should remove it after refactor
                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
                } else {
                    sd = context.createSlotDesc(intermediateDescriptor, sf, rightSlotDescriptor.getParent().getTable());
                    if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
                        hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
                        hashJoinNode.getHashOutputExprSlotIdMap().put(sf.getExprId(), rightSlotDescriptor.getId());
                    }
                }
                rightIntermediateSlotDescriptor.add(sd);
            }
        }

        if (hashJoin.getMarkJoinSlotReference().isPresent()) {
            SlotReference sf = hashJoin.getMarkJoinSlotReference().get();
            outputSlotReferences.add(sf);
            context.createSlotDesc(intermediateDescriptor, sf);
            if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
                SlotRef markJoinSlotId = context.findSlotRef(sf.getExprId());
                Preconditions.checkState(markJoinSlotId != null);
                hashJoinNode.addSlotIdToHashOutputSlotIds(markJoinSlotId.getSlotId());
                hashJoinNode.getHashOutputExprSlotIdMap().put(sf.getExprId(), markJoinSlotId.getSlotId());
            }
        }

        // set slots as nullable for outer join
        if (joinType == JoinType.LEFT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
            rightIntermediateSlotDescriptor.forEach(sd -> sd.setIsNullable(true));
        }
        if (joinType == JoinType.RIGHT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
            leftIntermediateSlotDescriptor.forEach(sd -> sd.setIsNullable(true));
        }

        // Constant expr will cause be crash.
        // But EliminateJoinCondition and Expression Rewrite already eliminate true literal.
        List<Expr> otherJoinConjuncts = hashJoin.getOtherJoinConjuncts()
                .stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .collect(Collectors.toList());

        hashJoin.getFilterConjuncts().stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .forEach(hashJoinNode::addConjunct);

        hashJoinNode.setOtherJoinConjuncts(otherJoinConjuncts);

        if (!isHashJoinConjunctsEmpty && !isMarkJoinConjunctsEmpty) {
            // add mark join conjuncts to hash join node
            List<Expr> markJoinConjuncts = hashJoin.getMarkJoinConjuncts()
                    .stream()
                    .map(e -> ExpressionTranslator.translate(e, context))
                    .collect(Collectors.toList());
            hashJoinNode.setMarkJoinConjuncts(markJoinConjuncts);
        }

        hashJoinNode.setvIntermediateTupleDescList(Lists.newArrayList(intermediateDescriptor));

        if (hashJoin.isShouldTranslateOutput()) {
            // translate output expr on intermediate tuple
            List<Expr> srcToOutput = outputSlotReferences.stream()
                    .map(e -> ExpressionTranslator.translate(e, context))
                    .collect(Collectors.toList());

            TupleDescriptor outputDescriptor = context.generateTupleDesc();
            outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s));

            hashJoinNode.setOutputTupleDesc(outputDescriptor);
            hashJoinNode.setProjectList(srcToOutput);
        }
        if (hashJoin.getStats() != null) {
            hashJoinNode.setCardinality((long) hashJoin.getStats().getRowCount());
        }
        updateLegacyPlanIdToPhysicalPlan(currentFragment.getPlanRoot(), hashJoin);
        return currentFragment;
    }

    @Override
    public PlanFragment visitPhysicalNestedLoopJoin(
            PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> nestedLoopJoin,
            PlanTranslatorContext context) {
        // NOTICE: We must visit from right to left, to ensure the last fragment is root fragment
        // TODO: we should add a helper method to wrap this logic.
        //   Maybe something like private List<PlanFragment> postOrderVisitChildren(
        //       PhysicalPlan plan, PlanVisitor visitor, Context context).
        PlanFragment rightFragment = nestedLoopJoin.child(1).accept(this, context);
        PlanFragment leftFragment = nestedLoopJoin.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(nestedLoopJoin.child(0), nestedLoopJoin.child(1));
        PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
        PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();

        if (!JoinUtils.shouldNestedLoopJoin(nestedLoopJoin)) {
            throw new RuntimeException("Physical nested loop join could not execute with equal join condition.");
        }

        List<TupleDescriptor> leftTuples = context.getTupleDesc(leftFragmentPlanRoot);
        List<TupleDescriptor> rightTuples = context.getTupleDesc(rightFragmentPlanRoot);
        List<TupleId> tupleIds = Stream.concat(leftTuples.stream(), rightTuples.stream())
                .map(TupleDescriptor::getId)
                .collect(Collectors.toList());

        JoinType joinType = nestedLoopJoin.getJoinType();

        NestedLoopJoinNode nestedLoopJoinNode = new NestedLoopJoinNode(context.nextPlanNodeId(),
                leftFragmentPlanRoot, rightFragmentPlanRoot, tupleIds, JoinType.toJoinOperator(joinType),
                null, null, null, nestedLoopJoin.isMarkJoin());
        nestedLoopJoinNode.setNereidsId(nestedLoopJoin.getId());
        context.getNereidsIdToPlanNodeIdMap().put(nestedLoopJoin.getId(), nestedLoopJoinNode.getId());
        nestedLoopJoinNode.setChildrenDistributeExprLists(distributeExprLists);
        if (nestedLoopJoin.getStats() != null) {
            nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount());
        }
        nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot());
        nestedLoopJoinNode.setChild(1, rightFragment.getPlanRoot());
        setPlanRoot(leftFragment, nestedLoopJoinNode, nestedLoopJoin);
        // TODO: what's this? do we really need to set this?
        rightFragment.getPlanRoot().setCompactData(false);
        context.mergePlanFragment(rightFragment, leftFragment);
        for (PlanFragment rightChild : rightFragment.getChildren()) {
            leftFragment.addChild(rightChild);
        }
        // translate runtime filter
        context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> {
            List<RuntimeFilter> filters = nestedLoopJoin.getRuntimeFilters();
            filters.forEach(filter -> runtimeFilterTranslator
                    .createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context));
            if (filters.stream().anyMatch(filter -> filter.getType() == TRuntimeFilterType.BITMAP)) {
                nestedLoopJoinNode.setOutputLeftSideOnly(true);
            }
        });

        Map<ExprId, SlotReference> leftChildOutputMap = nestedLoopJoin.child(0).getOutput().stream()
                .map(SlotReference.class::cast)
                .collect(Collectors.toMap(Slot::getExprId, s -> s, (existing, replacement) -> existing));
        Map<ExprId, SlotReference> rightChildOutputMap = nestedLoopJoin.child(1).getOutput().stream()
                .map(SlotReference.class::cast)
                .collect(Collectors.toMap(Slot::getExprId, s -> s, (existing, replacement) -> existing));
        // make intermediate tuple
        List<SlotDescriptor> leftIntermediateSlotDescriptor = Lists.newArrayList();
        List<SlotDescriptor> rightIntermediateSlotDescriptor = Lists.newArrayList();
        TupleDescriptor intermediateDescriptor = context.generateTupleDesc();

        // Nereids does not care about output order of join,
        // but BE need left child's output must be before right child's output.
        // So we need to swap the output order of left and right child if necessary.
        // TODO: revert this after Nereids could ensure the output order is correct.
        List<SlotDescriptor> leftSlotDescriptors = leftTuples.stream()
                .map(TupleDescriptor::getSlots)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
        List<SlotDescriptor> rightSlotDescriptors = rightTuples.stream()
                .map(TupleDescriptor::getSlots)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
        Map<ExprId, SlotReference> outputSlotReferenceMap = Maps.newHashMap();

        nestedLoopJoin.getOutput().stream()
                .map(SlotReference.class::cast)
                .forEach(s -> outputSlotReferenceMap.put(s.getExprId(), s));
        nestedLoopJoin.getFilterConjuncts().stream()
                .flatMap(e -> e.getInputSlots().stream())
                .map(SlotReference.class::cast)
                .forEach(s -> outputSlotReferenceMap.put(s.getExprId(), s));
        List<SlotReference> outputSlotReferences = Stream.concat(leftTuples.stream(), rightTuples.stream())
                .map(TupleDescriptor::getSlots)
                .flatMap(Collection::stream)
                .map(sd -> context.findExprId(sd.getId()))
                .map(outputSlotReferenceMap::get)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());

        // TODO: because of the limitation of be, the VNestedLoopJoinNode will output column from both children
        // in the intermediate tuple, so fe have to do the same, if be fix the problem, we can change it back.
        for (SlotDescriptor leftSlotDescriptor : leftSlotDescriptors) {
            if (!leftSlotDescriptor.isMaterialized()) {
                continue;
            }
            SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
            SlotDescriptor sd;
            if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
                // TODO: temporary code for two phase read, should remove it after refactor
                sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
            } else {
                sd = context.createSlotDesc(intermediateDescriptor, sf, leftSlotDescriptor.getParent().getTable());
            }
            leftIntermediateSlotDescriptor.add(sd);
        }
        for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
            if (!rightSlotDescriptor.isMaterialized()) {
                continue;
            }
            SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
            SlotDescriptor sd;
            if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
                // TODO: temporary code for two phase read, should remove it after refactor
                sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
            } else {
                sd = context.createSlotDesc(intermediateDescriptor, sf, rightSlotDescriptor.getParent().getTable());
            }
            rightIntermediateSlotDescriptor.add(sd);
        }

        if (nestedLoopJoin.getMarkJoinSlotReference().isPresent()) {
            outputSlotReferences.add(nestedLoopJoin.getMarkJoinSlotReference().get());
            context.createSlotDesc(intermediateDescriptor, nestedLoopJoin.getMarkJoinSlotReference().get());
        }

        // set slots as nullable for outer join
        if (joinType == JoinType.LEFT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
            rightIntermediateSlotDescriptor.forEach(sd -> sd.setIsNullable(true));
        }
        if (joinType == JoinType.RIGHT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
            leftIntermediateSlotDescriptor.forEach(sd -> sd.setIsNullable(true));
        }

        nestedLoopJoinNode.setvIntermediateTupleDescList(Lists.newArrayList(intermediateDescriptor));

        List<Expr> joinConjuncts = nestedLoopJoin.getOtherJoinConjuncts().stream()
                .filter(e -> !nestedLoopJoin.isBitmapRuntimeFilterCondition(e))
                .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList());

        if (!nestedLoopJoin.isBitMapRuntimeFilterConditionsEmpty() && joinConjuncts.isEmpty()) {
            // left semi join need at least one conjunct. otherwise left-semi-join fallback to cross-join
            joinConjuncts.add(new BoolLiteral(true));
        }

        nestedLoopJoinNode.setJoinConjuncts(joinConjuncts);

        if (!nestedLoopJoin.getOtherJoinConjuncts().isEmpty()) {
            List<Expr> markJoinConjuncts = nestedLoopJoin.getMarkJoinConjuncts().stream()
                    .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList());
            nestedLoopJoinNode.setMarkJoinConjuncts(markJoinConjuncts);
        }

        nestedLoopJoin.getFilterConjuncts().stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .forEach(nestedLoopJoinNode::addConjunct);

        if (nestedLoopJoin.isShouldTranslateOutput()) {
            // translate output expr on intermediate tuple
            List<Expr> srcToOutput = outputSlotReferences.stream()
                    .map(e -> ExpressionTranslator.translate(e, context))
                    .collect(Collectors.toList());

            TupleDescriptor outputDescriptor = context.generateTupleDesc();
            outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s));

            nestedLoopJoinNode.setOutputTupleDesc(outputDescriptor);
            nestedLoopJoinNode.setProjectList(srcToOutput);
        }
        if (nestedLoopJoin.getStats() != null) {
            nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount());
        }
        updateLegacyPlanIdToPhysicalPlan(leftFragment.getPlanRoot(), nestedLoopJoin);
        return leftFragment;
    }

    @Override
    public PlanFragment visitPhysicalLimit(PhysicalLimit<? extends Plan> physicalLimit, PlanTranslatorContext context) {
        PlanFragment inputFragment = physicalLimit.child(0).accept(this, context);
        PlanNode child = inputFragment.getPlanRoot();

        if (physicalLimit.getPhase().isLocal()) {
            long newLimit = MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(),
                    child.getLimit());
            child.setLimit(newLimit);
            if (newLimit != -1
                    && child instanceof AggregationNode && physicalLimit.child() instanceof PhysicalHashAggregate) {
                PhysicalHashAggregate<? extends Plan> agg
                        = (PhysicalHashAggregate<? extends Plan>) physicalLimit.child();
                if (agg.isDistinct()) {
                    if (agg.child(0) instanceof PhysicalDistribute
                            && agg.child(0).child(0) instanceof PhysicalHashAggregate
                            && ((Aggregate) agg.child(0).child(0)).isDistinct()
                            && child.getChild(0) instanceof ExchangeNode
                            && child.getChild(0).getChild(0) instanceof AggregationNode) {
                        child.getChild(0).getChild(0).setLimit(newLimit);
                    }
                }
            }
        } else if (physicalLimit.getPhase().isGlobal()) {
            if (!(child instanceof ExchangeNode)) {
                ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), child);
                exchangeNode.setLimit(physicalLimit.getLimit());
                exchangeNode.setOffset(physicalLimit.getOffset());
                exchangeNode.setPartitionType(TPartitionType.UNPARTITIONED);
                exchangeNode.setNumInstances(1);

                PlanFragment fragment = new PlanFragment(context.nextFragmentId(), exchangeNode,
                        DataPartition.UNPARTITIONED);
                inputFragment.setDestination(exchangeNode);
                inputFragment.setOutputPartition(DataPartition.UNPARTITIONED);

                DataStreamSink sink = new DataStreamSink(exchangeNode.getId());
                sink.setOutputPartition(DataPartition.UNPARTITIONED);
                inputFragment.setSink(sink);

                context.addPlanFragment(fragment);
                inputFragment = fragment;
            } else {
                ExchangeNode exchangeNode = (ExchangeNode) child;
                exchangeNode.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(),
                        exchangeNode.getLimit()));
                exchangeNode.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), exchangeNode.getOffset()));
            }
        }

        updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), physicalLimit);
        return inputFragment;
    }

    @Override
    public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
            PlanTranslatorContext context) {
        PlanFragment inputFragment = partitionTopN.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(partitionTopN.child(0));
        PartitionSortNode partitionSortNode = translatePartitionSortNode(
                partitionTopN, inputFragment.getPlanRoot(), context);
        partitionSortNode.setChildrenDistributeExprLists(distributeExprLists);
        addPlanRoot(inputFragment, partitionSortNode, partitionTopN);
        // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
        // we need turn of parallel scan to ensure to get correct result.
        if (partitionTopN.getPhase() == PartitionTopnPhase.ONE_PHASE_GLOBAL_PTOPN
                && findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) {
            inputFragment.setHasColocatePlanNode(true);
        }
        return inputFragment;
    }

    // TODO: generate expression mapping when be project could do in ExecNode.
    @Override
    public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project, PlanTranslatorContext context) {
        if (project.child(0) instanceof AbstractPhysicalJoin) {
            ((AbstractPhysicalJoin<?, ?>) project.child(0)).setShouldTranslateOutput(false);
        }
        if (project.child(0) instanceof PhysicalFilter) {
            if (project.child(0).child(0) instanceof AbstractPhysicalJoin) {
                ((AbstractPhysicalJoin<?, ?>) project.child(0).child(0)).setShouldTranslateOutput(false);
            }
        }

        PlanFragment inputFragment = project.child(0).accept(this, context);
        PlanNode inputPlanNode = inputFragment.getPlanRoot();
        // this means already have project on this node, filter need execute after project, so need a new node
        if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) {
            SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode);
            selectNode.setNereidsId(project.getId());
            context.getNereidsIdToPlanNodeIdMap().put(project.getId(), selectNode.getId());
            addPlanRoot(inputFragment, selectNode, project);
            inputPlanNode = selectNode;
        }

        List<Expr> projectionExprs = null;
        List<Expr> allProjectionExprs = Lists.newArrayList();
        List<Slot> slots = null;
        // TODO FE/BE do not support multi-layer-project on MultiDataSink now.
        if (project.hasMultiLayerProjection()
                && !(inputFragment instanceof MultiCastPlanFragment)
                // TODO support for two phase read with project, remove it after refactor
                && !(project.child() instanceof PhysicalDeferMaterializeTopN)
                && !(project.child() instanceof PhysicalDeferMaterializeOlapScan
                || (project.child() instanceof PhysicalFilter
                && ((PhysicalFilter<?>) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) {
            int layerCount = project.getMultiLayerProjects().size();
            for (int i = 0; i < layerCount; i++) {
                List<NamedExpression> layer = project.getMultiLayerProjects().get(i);
                projectionExprs = layer.stream()
                        .map(e -> ExpressionTranslator.translate(e, context))
                        .collect(Collectors.toList());
                slots = layer.stream()
                        .map(NamedExpression::toSlot)
                        .collect(Collectors.toList());
                if (i < layerCount - 1) {
                    inputPlanNode.addIntermediateProjectList(projectionExprs);
                    TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context);
                    inputPlanNode.addIntermediateOutputTupleDescList(projectionTuple);
                }
                allProjectionExprs.addAll(projectionExprs);
            }
        } else {
            projectionExprs = project.getProjects()
                    .stream()
                    .map(e -> ExpressionTranslator.translate(e, context))
                    .collect(Collectors.toList());
            slots = project.getProjects()
                    .stream()
                    .map(NamedExpression::toSlot)
                    .collect(Collectors.toList());
            allProjectionExprs.addAll(projectionExprs);
        }
        // process multicast sink
        if (inputFragment instanceof MultiCastPlanFragment) {
            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
                    multiCastDataSink.getDataStreamSinks().size() - 1);
            if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
                String errMsg = "generate invalid plan \n" + project.treeString();
                LOG.warn(errMsg);
                throw new AnalysisException(errMsg);
            }
            TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context);
            dataStreamSink.setProjections(projectionExprs);
            dataStreamSink.setOutputTupleDesc(projectionTuple);
            return inputFragment;
        }

        List<Expr> conjuncts = inputPlanNode.getConjuncts();
        Set<SlotId> requiredSlotIdSet = Sets.newHashSet();
        for (Expr expr : allProjectionExprs) {
            Expr.extractSlots(expr, requiredSlotIdSet);
        }
        Set<SlotId> requiredByProjectSlotIdSet = Sets.newHashSet(requiredSlotIdSet);
        for (Expr expr : conjuncts) {
            Expr.extractSlots(expr, requiredSlotIdSet);
        }
        // For hash join node, use vSrcToOutputSMap to describe the expression calculation, use
        // vIntermediateTupleDescList as input, and set vOutputTupleDesc as the final output.
        // TODO: HashJoinNode's be implementation is not support projection yet, remove this after when supported.
        if (inputPlanNode instanceof JoinNodeBase) {
            TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
            JoinNodeBase joinNode = (JoinNodeBase) inputPlanNode;
            joinNode.setOutputTupleDesc(tupleDescriptor);
            joinNode.setProjectList(projectionExprs);
            // prune the hashOutputSlotIds
            if (joinNode instanceof HashJoinNode) {
                Set<SlotId> oldHashOutputSlotIds = Sets.newHashSet(((HashJoinNode) joinNode).getHashOutputSlotIds());
                ((HashJoinNode) joinNode).getHashOutputSlotIds().clear();
                Set<ExprId> requiredExprIds = Sets.newHashSet();
                Set<SlotId> requiredOtherConjunctsSlotIdSet = Sets.newHashSet();
                List<Expr> otherConjuncts = ((HashJoinNode) joinNode).getOtherJoinConjuncts();
                for (Expr expr : otherConjuncts) {
                    Expr.extractSlots(expr, requiredOtherConjunctsSlotIdSet);
                }
                if (!((HashJoinNode) joinNode).getEqJoinConjuncts().isEmpty()
                        && !((HashJoinNode) joinNode).getMarkJoinConjuncts().isEmpty()) {
                    List<Expr> markConjuncts = ((HashJoinNode) joinNode).getMarkJoinConjuncts();
                    for (Expr expr : markConjuncts) {
                        Expr.extractSlots(expr, requiredOtherConjunctsSlotIdSet);
                    }
                }
                requiredOtherConjunctsSlotIdSet.forEach(e -> requiredExprIds.add(context.findExprId(e)));
                requiredSlotIdSet.forEach(e -> requiredExprIds.add(context.findExprId(e)));
                for (ExprId exprId : requiredExprIds) {
                    SlotId slotId = ((HashJoinNode) joinNode).getHashOutputExprSlotIdMap().get(exprId);
                    // Preconditions.checkState(slotId != null);
                    if (slotId != null) {
                        ((HashJoinNode) joinNode).addSlotIdToHashOutputSlotIds(slotId);
                    }
                }
                if (((HashJoinNode) joinNode).getHashOutputSlotIds().isEmpty()) {
                    // In FE, if all columns are pruned, hash output slots are empty.
                    // On the contrary, BE will keep all columns if hash output slots are empty.
                    // Currently BE will keep this behavior in order to be compatible with older planner.
                    // So we have to workaround this in FE by keeping at least one slot in oldHashOutputSlotIds.
                    // TODO: Remove this code when old planner is deleted and BE changes to be consistent with FE.
                    for (SlotId slotId : oldHashOutputSlotIds) {
                        ((HashJoinNode) joinNode).addSlotIdToHashOutputSlotIds(slotId);
                        break;
                    }
                }
            }
            return inputFragment;
        }

        if (inputPlanNode instanceof TableFunctionNode) {
            TableFunctionNode tableFunctionNode = (TableFunctionNode) inputPlanNode;
            tableFunctionNode.setOutputSlotIds(Lists.newArrayList(requiredSlotIdSet));
        }

        if (inputPlanNode instanceof ScanNode) {
            // TODO support for two phase read with project, remove this if after refactor
            if (!(project.child() instanceof PhysicalDeferMaterializeOlapScan
                    || (project.child() instanceof PhysicalFilter
                    && ((PhysicalFilter<?>) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) {
                TupleDescriptor projectionTuple = generateTupleDesc(slots,
                        ((ScanNode) inputPlanNode).getTupleDesc().getTable(), context);
                inputPlanNode.setProjectList(projectionExprs);
                inputPlanNode.setOutputTupleDesc(projectionTuple);
            }
            if (inputPlanNode instanceof OlapScanNode) {
                ((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
            }
            updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
                    requiredByProjectSlotIdSet, context);
        } else {
            if (project.child() instanceof PhysicalDeferMaterializeTopN) {
                inputFragment.setOutputExprs(allProjectionExprs);
            } else {
                TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
                inputPlanNode.setProjectList(projectionExprs);
                inputPlanNode.setOutputTupleDesc(tupleDescriptor);
            }
        }
        return inputFragment;
    }

    /**
     * Returns a new fragment with a UnionNode as its root. The data partition of the
     * returned fragment and how the data of the child fragments is consumed depends on the
     * data partitions of the child fragments:
     * - All child fragments are unpartitioned or partitioned: The returned fragment has an
     *   UNPARTITIONED or RANDOM data partition, respectively. The UnionNode absorbs the
     *   plan trees of all child fragments.
     * - Mixed partitioned/unpartitioned child fragments: The returned fragment is
     *   RANDOM partitioned. The plan trees of all partitioned child fragments are absorbed
     *   into the UnionNode. All unpartitioned child fragments are connected to the
     *   UnionNode via a RANDOM exchange, and remain unchanged otherwise.
     */
    @Override
    public PlanFragment visitPhysicalSetOperation(
            PhysicalSetOperation setOperation, PlanTranslatorContext context) {
        List<PlanFragment> childrenFragments = new ArrayList<>();
        for (Plan plan : setOperation.children()) {
            childrenFragments.add(plan.accept(this, context));
        }

        TupleDescriptor setTuple = generateTupleDesc(setOperation.getOutput(), null, context);
        List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots());

        SetOperationNode setOperationNode;
        // create setOperationNode
        if (setOperation instanceof PhysicalUnion) {
            setOperationNode = new UnionNode(context.nextPlanNodeId(), setTuple.getId());
        } else if (setOperation instanceof PhysicalExcept) {
            setOperationNode = new ExceptNode(context.nextPlanNodeId(), setTuple.getId());
        } else if (setOperation instanceof PhysicalIntersect) {
            setOperationNode = new IntersectNode(context.nextPlanNodeId(), setTuple.getId());
        } else {
            throw new RuntimeException("not support set operation type " + setOperation);
        }
        setOperationNode.setNereidsId(setOperation.getId());
        context.getNereidsIdToPlanNodeIdMap().put(setOperation.getId(), setOperationNode.getId());
        for (List<SlotReference> regularChildrenOutput : setOperation.getRegularChildrenOutputs()) {
            Builder<Expr> translateOutputs = ImmutableList.builderWithExpectedSize(regularChildrenOutput.size());
            for (SlotReference childOutput : regularChildrenOutput) {
                translateOutputs.add(ExpressionTranslator.translate(childOutput, context));
            }
            setOperationNode.addResultExprLists(translateOutputs.build());
        }

        if (setOperation instanceof PhysicalUnion) {
            for (List<NamedExpression> unionConsts : ((PhysicalUnion) setOperation).getConstantExprsList()) {
                Builder<Expr> translateConsts = ImmutableList.builderWithExpectedSize(unionConsts.size());
                for (NamedExpression unionConst : unionConsts) {
                    translateConsts.add(ExpressionTranslator.translate(unionConst, context));
                }
                setOperationNode.addConstExprList(translateConsts.build());
            }
        }

        for (PlanFragment childFragment : childrenFragments) {
            setOperationNode.addChild(childFragment.getPlanRoot());
        }
        setOperationNode.finalizeForNereids(outputSlotDescs, outputSlotDescs);

        PlanFragment setOperationFragment;
        if (childrenFragments.isEmpty()) {
            setOperationFragment = createPlanFragment(setOperationNode,
                    DataPartition.UNPARTITIONED, setOperation);
            context.addPlanFragment(setOperationFragment);
        } else {
            int childrenSize = childrenFragments.size();
            setOperationFragment = childrenFragments.get(childrenSize - 1);
            for (int i = childrenSize - 2; i >= 0; i--) {
                context.mergePlanFragment(childrenFragments.get(i), setOperationFragment);
                for (PlanFragment child : childrenFragments.get(i).getChildren()) {
                    setOperationFragment.addChild(child);
                }
            }
            setPlanRoot(setOperationFragment, setOperationNode, setOperation);
        }

        // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
        // we need turn of parallel scan to ensure to get correct result.
        // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
        if (!setOperation.getPhysicalProperties().equals(PhysicalProperties.ANY)
                && findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot())) {
            setOperationFragment.setHasColocatePlanNode(true);
            setOperationNode.setColocate(true);
        }

        return setOperationFragment;
    }

    /*-
     * Physical sort:
     * 1. Build sortInfo
     *    There are two types of slotRef:
     *    one is generated by the previous node, collectively called old.
     *    the other is newly generated by the sort node, collectively called new.
     *    Filling of sortInfo related data structures,
     *    a. ordering use newSlotRef.
     *    b. sortTupleSlotExprs use oldSlotRef.
     * 2. Create sortNode
     * 3. Create mergeFragment
     * TODO: When the slotRef of sort is currently generated,
     *       it will be based on the expression in select and orderBy expression in to ensure the uniqueness of slotRef.
     *       But eg:
     *       select a+1 from table order by a+1;
     *       the expressions of the two are inconsistent.
     *       The former will perform an additional Alias.
     *       Currently we cannot test whether this will have any effect.
     *       After a+1 can be parsed , reprocessing.
     */
    @Override
    public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort,
            PlanTranslatorContext context) {
        PlanFragment inputFragment = sort.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(sort.child(0));

        // 2. According to the type of sort, generate physical plan
        if (!sort.getSortPhase().isMerge()) {
            // For localSort or Gather->Sort, we just need to add sortNode
            SortNode sortNode = translateSortNode(sort, inputFragment.getPlanRoot(), context);
            sortNode.setChildrenDistributeExprLists(distributeExprLists);
            addPlanRoot(inputFragment, sortNode, sort);
        } else {
            // For mergeSort, we need to push sortInfo to exchangeNode
            if (!(inputFragment.getPlanRoot() instanceof ExchangeNode)) {
                // if there is no exchange node for mergeSort
                //   e.g., localSort -> mergeSort
                // It means the local has satisfied the Gather property. We can just ignore mergeSort
                return inputFragment;
            }
            SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
            ((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
            if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
                inputFragment.getChild(0).getSink().setMerge(true);
            }
            sortNode.setMergeByExchange();
            sortNode.setChildrenDistributeExprLists(distributeExprLists);
        }
        return inputFragment;
    }

    @Override
    public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTranslatorContext context) {
        PlanFragment inputFragment = topN.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(topN.child(0));
        // 2. According to the type of sort, generate physical plan
        if (!topN.getSortPhase().isMerge()) {
            // For localSort or Gather->Sort, we just need to add TopNNode
            SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
            sortNode.setOffset(topN.getOffset());
            sortNode.setLimit(topN.getLimit());
            if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
                context.getTopnFilterContext().translateSource(topN, sortNode);
                TopnFilter filter = context.getTopnFilterContext().getTopnFilter(topN);
                List<Pair<Integer, Integer>> targets = new ArrayList<>();
                for (Map.Entry<ScanNode, Expr> entry : filter.legacyTargets.entrySet()) {
                    Set<SlotRef> inputSlots = entry.getValue().getInputSlotRef();
                    if (inputSlots.size() != 1) {
                        LOG.warn("topn filter targets error: " + inputSlots);
                    } else {
                        SlotRef slot = inputSlots.iterator().next();
                        targets.add(Pair.of(entry.getKey().getId().asInt(),
                                (slot.getDesc().getId().asInt())));
                    }
                }
                sortNode.setTopnFilterTargets(targets);
            }
            // push sort to scan opt
            if (sortNode.getChild(0) instanceof OlapScanNode) {
                OlapScanNode scanNode = ((OlapScanNode) sortNode.getChild(0));
                if (checkPushSort(sortNode, scanNode.getOlapTable())) {
                    SortInfo sortInfo = sortNode.getSortInfo();
                    scanNode.setSortInfo(sortInfo);
                    scanNode.getSortInfo().setSortTupleSlotExprs(sortNode.getResolvedTupleExprs());
                    for (Expr expr : sortInfo.getOrderingExprs()) {
                        scanNode.getSortInfo().addMaterializedOrderingExpr(expr);
                    }
                    if (sortNode.getOffset() > 0) {
                        scanNode.setSortLimit(sortNode.getLimit() + sortNode.getOffset());
                    } else {
                        scanNode.setSortLimit(sortNode.getLimit());
                    }
                }
            }
            sortNode.setChildrenDistributeExprLists(distributeExprLists);
            addPlanRoot(inputFragment, sortNode, topN);
        } else {
            // For mergeSort, we need to push sortInfo to exchangeNode
            if (!(inputFragment.getPlanRoot() instanceof ExchangeNode)) {
                // if there is no exchange node for mergeSort
                //   e.g., mergeTopN -> localTopN
                // It means the local has satisfied the Gather property. We can just ignore mergeSort
                inputFragment.getPlanRoot().setOffset(topN.getOffset());
                inputFragment.getPlanRoot().setLimit(topN.getLimit());
                return inputFragment;
            }
            ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
            exchangeNode.setChildrenDistributeExprLists(distributeExprLists);
            exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
            if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
                inputFragment.getChild(0).getSink().setMerge(true);
            }
            exchangeNode.setLimit(topN.getLimit());
            exchangeNode.setOffset(topN.getOffset());
            ((SortNode) exchangeNode.getChild(0)).setMergeByExchange();
        }
        updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), topN);
        return inputFragment;
    }

    @Override
    public PlanFragment visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
            PlanTranslatorContext context) {
        PlanFragment planFragment = visitPhysicalTopN(topN.getPhysicalTopN(), context);
        if (planFragment.getPlanRoot() instanceof SortNode) {
            SortNode sortNode = (SortNode) planFragment.getPlanRoot();
            sortNode.setUseTwoPhaseReadOpt(true);
            sortNode.getSortInfo().setUseTwoPhaseRead();
            if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
                context.getTopnFilterContext().translateSource(topN, sortNode);
            }
            TupleDescriptor tupleDescriptor = sortNode.getSortInfo().getSortTupleDescriptor();
            for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
                if (topN.getDeferMaterializeSlotIds()
                        .contains(context.findExprId(slotDescriptor.getId()))) {
                    slotDescriptor.setNeedMaterialize(false);
                }
            }
        }
        return planFragment;
    }

    @Override
    public PlanFragment visitPhysicalRepeat(PhysicalRepeat<? extends Plan> repeat, PlanTranslatorContext context) {
        PlanFragment inputPlanFragment = repeat.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(repeat.child(0));

        Set<VirtualSlotReference> sortedVirtualSlots = repeat.getSortedVirtualSlots();
        TupleDescriptor virtualSlotsTuple =
                generateTupleDesc(ImmutableList.copyOf(sortedVirtualSlots), null, context);

        ImmutableSet<Expression> flattenGroupingSetExprs = ImmutableSet.copyOf(
                ExpressionUtils.flatExpressions(repeat.getGroupingSets()));

        List<Slot> aggregateFunctionUsedSlots = repeat.getOutputExpressions()
                .stream()
                .filter(output -> !(output instanceof VirtualSlotReference))
                .filter(output -> !flattenGroupingSetExprs.contains(output))
                .distinct()
                .map(NamedExpression::toSlot)
                .collect(ImmutableList.toImmutableList());

        // keep flattenGroupingSetExprs comes first
        List<Expr> preRepeatExprs = Stream.concat(flattenGroupingSetExprs.stream(), aggregateFunctionUsedSlots.stream())
                .map(expr -> ExpressionTranslator.translate(expr, context)).collect(ImmutableList.toImmutableList());

        // outputSlots's order need same with preRepeatExprs
        List<Slot> outputSlots = Stream
                .concat(repeat.getOutputExpressions().stream()
                        .filter(output -> flattenGroupingSetExprs.contains(output)),
                        repeat.getOutputExpressions().stream()
                                .filter(output -> !flattenGroupingSetExprs.contains(output)).distinct())
                .map(NamedExpression::toSlot).collect(ImmutableList.toImmutableList());

        // NOTE: we should first translate preRepeatExprs, then generate output tuple,
        //       or else the preRepeatExprs can not find the bottom slotRef and throw
        //       exception: invalid slot id
        TupleDescriptor outputTuple = generateTupleDesc(outputSlots, null, context);

        // cube and rollup already convert to grouping sets in LogicalPlanBuilder.withAggregate()
        GroupingInfo groupingInfo = new GroupingInfo(
                GroupingType.GROUPING_SETS, virtualSlotsTuple, outputTuple, preRepeatExprs);

        List<Set<Integer>> repeatSlotIdList = repeat.computeRepeatSlotIdList(getSlotIds(outputTuple));
        Set<Integer> allSlotId = repeatSlotIdList.stream()
                .flatMap(Set::stream)
                .collect(ImmutableSet.toImmutableSet());

        RepeatNode repeatNode = new RepeatNode(context.nextPlanNodeId(),
                inputPlanFragment.getPlanRoot(), groupingInfo, repeatSlotIdList,
                allSlotId, repeat.computeVirtualSlotValues(sortedVirtualSlots));
        repeatNode.setNereidsId(repeat.getId());
        context.getNereidsIdToPlanNodeIdMap().put(repeat.getId(), repeatNode.getId());
        repeatNode.setChildrenDistributeExprLists(distributeExprLists);
        addPlanRoot(inputPlanFragment, repeatNode, repeat);
        updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(), repeat);
        return inputPlanFragment;
    }

    @Override
    public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalWindow,
            PlanTranslatorContext context) {
        PlanFragment inputPlanFragment = physicalWindow.child(0).accept(this, context);
        List<List<Expr>> distributeExprLists = getDistributeExprs(physicalWindow.child(0));

        // 1. translate to old optimizer variable
        // variable in Nereids
        WindowFrameGroup windowFrameGroup = physicalWindow.getWindowFrameGroup();
        List<Expression> partitionKeyList = Lists.newArrayList(windowFrameGroup.getPartitionKeys());
        List<OrderExpression> orderKeyList = windowFrameGroup.getOrderKeys();
        List<NamedExpression> windowFunctionList = windowFrameGroup.getGroups();
        WindowFrame windowFrame = windowFrameGroup.getWindowFrame();

        // partition by clause
        List<Expr> partitionExprs = partitionKeyList.stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .collect(Collectors.toList());

        // order by clause
        List<OrderByElement> orderByElements = orderKeyList.stream()
                .map(orderKey -> new OrderByElement(
                        ExpressionTranslator.translate(orderKey.child(), context),
                        orderKey.isAsc(), orderKey.isNullFirst()))
                .collect(Collectors.toList());

        // function calls
        List<Expr> analyticFnCalls = windowFunctionList.stream()
                .map(e -> {
                    Expression function = e.child(0).child(0);
                    if (function instanceof AggregateFunction) {
                        AggregateParam param = AggregateParam.LOCAL_RESULT;
                        function = new AggregateExpression((AggregateFunction) function, param);
                    }
                    return ExpressionTranslator.translate(function, context);
                })
                .map(FunctionCallExpr.class::cast)
                .peek(fnCall -> {
                    fnCall.setIsAnalyticFnCall(true);
                    ((org.apache.doris.catalog.AggregateFunction) fnCall.getFn()).setIsAnalyticFn(true);
                })
                .collect(Collectors.toList());

        // analytic window
        AnalyticWindow analyticWindow = physicalWindow.translateWindowFrame(windowFrame, context);

        // 2. get bufferedTupleDesc from SortNode and compute isNullableMatched
        Map<ExprId, SlotRef> bufferedSlotRefForWindow = getBufferedSlotRefForWindow(windowFrameGroup, context);
        TupleDescriptor bufferedTupleDesc = context.getBufferedTupleForWindow();

        // generate predicates to check if the exprs of partitionKeys and orderKeys have matched isNullable between
        // sortNode and analyticNode
        Expr partitionExprsIsNullableMatched = partitionExprs.isEmpty() ? null : windowExprsHaveMatchedNullable(
                partitionKeyList, partitionExprs, bufferedSlotRefForWindow);

        Expr orderElementsIsNullableMatched = orderByElements.isEmpty() ? null : windowExprsHaveMatchedNullable(
                orderKeyList.stream().map(UnaryNode::child).collect(Collectors.toList()),
                orderByElements.stream().map(OrderByElement::getExpr).collect(Collectors.toList()),
                bufferedSlotRefForWindow);

        // 3. generate tupleDesc
        List<Slot> windowSlotList = windowFunctionList.stream()
                .map(NamedExpression::toSlot)
                .collect(Collectors.toList());
        TupleDescriptor outputTupleDesc = generateTupleDesc(windowSlotList, null, context);

        // 4. generate AnalyticEvalNode
        AnalyticEvalNode analyticEvalNode = new AnalyticEvalNode(
                context.nextPlanNodeId(),
                inputPlanFragment.getPlanRoot(),
                analyticFnCalls,
                partitionExprs,
                orderByElements,
                analyticWindow,
                outputTupleDesc,
                outputTupleDesc,
                partitionExprsIsNullableMatched,
                orderElementsIsNullableMatched,
                bufferedTupleDesc
        );
        analyticEvalNode.setNereidsId(physicalWindow.getId());
        context.getNereidsIdToPlanNodeIdMap().put(physicalWindow.getId(), analyticEvalNode.getId());
        analyticEvalNode.setChildrenDistributeExprLists(distributeExprLists);
        PlanNode root = inputPlanFragment.getPlanRoot();
        if (root instanceof SortNode) {
            ((SortNode) root).setIsAnalyticSort(true);
        }
        inputPlanFragment.addPlanRoot(analyticEvalNode);

        // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
        // we need turn of parallel scan to ensure to get correct result.
        // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
        if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
            inputPlanFragment.setHasColocatePlanNode(true);
            analyticEvalNode.setColocate(true);
            if (root instanceof SortNode) {
                ((SortNode) root).setColocate(true);
            }
        }
        return inputPlanFragment;
    }

    @Override
    public PlanFragment visitPhysicalLazyMaterialize(PhysicalLazyMaterialize<? extends Plan> materialize,
            PlanTranslatorContext context) {
        PlanFragment inputPlanFragment = materialize.child(0).accept(this, context);
        TupleDescriptor materializeTupleDesc = generateTupleDesc(materialize.getOutput(), null, context);

        MaterializationNode materializeNode = new MaterializationNode(context.nextPlanNodeId(), materializeTupleDesc,
                inputPlanFragment.getPlanRoot());

        List<Expr> rowIds = materialize.getRowIds().stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .collect(Collectors.toList());
        materializeNode.setRowIds(rowIds);

        materializeNode.setLazyColumns(materialize.getLazyColumns());
        materializeNode.setLocations(materialize.getLazySlotLocations());
        materializeNode.setIdxs(materialize.getlazyTableIdxs());

        List<Boolean> rowStoreFlags = new ArrayList<>();
        for (CatalogRelation relation : materialize.getRelations()) {
            rowStoreFlags.add(shouldUseRowStore(relation));
        }
        materializeNode.setRowStoreFlags(rowStoreFlags);

        materializeNode.setTopMaterializeNode(context.isTopMaterializeNode());
        if (context.isTopMaterializeNode()) {
            context.setTopMaterializeNode(false);
        }

        inputPlanFragment.addPlanRoot(materializeNode);
        return inputPlanFragment;
    }

    private boolean shouldUseRowStore(CatalogRelation rel) {
        boolean useRowStore = false;
        if (rel instanceof PhysicalOlapScan) {
            OlapTable olapTable = ((PhysicalOlapScan) rel).getTable();
            useRowStore = olapTable.storeRowColumn()
                    && CollectionUtils.isEmpty(olapTable.getTableProperty().getCopiedRowStoreColumns());
        }
        return useRowStore;
    }

    @Override
    public PlanFragment visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterializeOlapScan lazyScan,
            PlanTranslatorContext context) {
        PlanFragment planFragment = computePhysicalOlapScan(lazyScan.getScan(), true, context);
        TupleDescriptor outputTuple = generateTupleDesc(lazyScan.getOutput(), lazyScan.getScan().getTable(), context);
        OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
        olapScanNode.setDesc(outputTuple);
        olapScanNode.setIsTopnLazyMaterialize(true);
        olapScanNode.setGlobalRowIdColumn(lazyScan.getRowId().getOriginalColumn().get());
        for (Slot slot : lazyScan.getOutput()) {
            if (((SlotReference) slot).getOriginalColumn().isPresent()) {
                olapScanNode.addTopnLazyMaterializeOutputColumns(((SlotReference) slot).getOriginalColumn().get());
            }
        }
        planFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(outputTuple.getId()));
        // translate rf on outputTuple
        context.getRuntimeTranslator().ifPresent(
                runtimeFilterTranslator -> runtimeFilterTranslator.getContext().getTargetListByScan(lazyScan)
                        .forEach(expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(
                                expr, olapScanNode, context)
                        )
        );
        context.getTopnFilterContext().translateTarget(lazyScan, olapScanNode, context);

        return planFragment;
    }

    /* ********************************************************************************************
     * private functions
     * ******************************************************************************************** */

    private PartitionSortNode translatePartitionSortNode(PhysicalPartitionTopN<? extends Plan> partitionTopN,
            PlanNode childNode, PlanTranslatorContext context) {
        List<Expr> partitionExprs = partitionTopN.getPartitionKeys().stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .collect(Collectors.toList());
        // partition key should on child tuple, sort key should on partition top's tuple
        TupleDescriptor sortTuple = generateTupleDesc(partitionTopN.child().getOutput(), null, context);
        List<Expr> orderingExprs = Lists.newArrayList();
        List<Boolean> ascOrders = Lists.newArrayList();
        List<Boolean> nullsFirstParams = Lists.newArrayList();
        List<OrderKey> orderKeys = partitionTopN.getOrderKeys();
        orderKeys.forEach(k -> {
            orderingExprs.add(ExpressionTranslator.translate(k.getExpr(), context));
            ascOrders.add(k.isAsc());
            nullsFirstParams.add(k.isNullFirst());
        });
        SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, nullsFirstParams, sortTuple);
        PartitionSortNode partitionSortNode = new PartitionSortNode(context.nextPlanNodeId(), childNode,
                partitionTopN.getFunction(), partitionExprs, sortInfo, partitionTopN.hasGlobalLimit(),
                partitionTopN.getPartitionLimit(), partitionTopN.getPhase());
        partitionSortNode.setNereidsId(partitionTopN.getId());
        context.getNereidsIdToPlanNodeIdMap().put(partitionTopN.getId(), partitionSortNode.getId());
        if (partitionTopN.getStats() != null) {
            partitionSortNode.setCardinality((long) partitionTopN.getStats().getRowCount());
        }
        updateLegacyPlanIdToPhysicalPlan(partitionSortNode, partitionTopN);
        return partitionSortNode;
    }

    private SortNode translateSortNode(AbstractPhysicalSort<? extends Plan> sort, PlanNode childNode,
            PlanTranslatorContext context) {
        TupleDescriptor sortTuple = generateTupleDesc(sort.child().getOutput(), null, context);
        List<Expr> orderingExprs = Lists.newArrayList();
        List<Boolean> ascOrders = Lists.newArrayList();
        List<Boolean> nullsFirstParams = Lists.newArrayList();
        List<OrderKey> orderKeys = sort.getOrderKeys();
        orderKeys.forEach(k -> {
            orderingExprs.add(ExpressionTranslator.translate(k.getExpr(), context));
            ascOrders.add(k.isAsc());
            nullsFirstParams.add(k.isNullFirst());
        });
        SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, nullsFirstParams, sortTuple);
        SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, sort instanceof PhysicalTopN);
        sortNode.setNereidsId(sort.getId());
        context.getNereidsIdToPlanNodeIdMap().put(sort.getId(), sortNode.getId());
        if (sort.getStats() != null) {
            sortNode.setCardinality((long) sort.getStats().getRowCount());
        }
        updateLegacyPlanIdToPhysicalPlan(sortNode, sort);
        return sortNode;
    }

    private void updateScanSlotsMaterialization(ScanNode scanNode,
            Set<SlotId> requiredSlotIdSet, Set<SlotId> requiredByProjectSlotIdSet,
            PlanTranslatorContext context) {
        // TODO: use smallest slot if do not need any slot in upper node
        SlotDescriptor smallest = scanNode.getTupleDesc().getSlots().get(0);
        scanNode.getTupleDesc().getSlots().removeIf(s -> !requiredSlotIdSet.contains(s.getId()));
        if (scanNode.getTupleDesc().getSlots().isEmpty()) {
            scanNode.getTupleDesc().getSlots().add(smallest);
        }
        if (context.getSessionVariable() != null
                && context.getSessionVariable().forbidUnknownColStats
                && !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
            for (SlotId slotId : requiredByProjectSlotIdSet) {
                if (context.isColumnStatsUnknown(scanNode, slotId)) {
                    String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
                    throw new AnalysisException("meet unknown column stats: " + colName);
                }
            }
            context.removeScanFromStatsUnknownColumnsMap(scanNode);
        }
    }

    private void addConjunctsToPlanNode(PhysicalFilter<? extends Plan> filter,
            PlanNode planNode,
            PlanTranslatorContext context) {
        filter.getConjuncts().stream()
                .map(e -> ExpressionTranslator.translate(e, context))
                .forEach(planNode::addConjunct);
        updateLegacyPlanIdToPhysicalPlan(planNode, filter);
    }

    private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
        TupleDescriptor tupleDescriptor = context.generateTupleDesc();
        tupleDescriptor.setTable(table);
        for (Slot slot : slotList) {
            context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
        }
        return tupleDescriptor;
    }

    private PlanFragment connectJoinNode(HashJoinNode hashJoinNode, PlanFragment leftFragment,
            PlanFragment rightFragment, PlanTranslatorContext context, AbstractPlan join) {
        hashJoinNode.setChild(0, leftFragment.getPlanRoot());
        hashJoinNode.setChild(1, rightFragment.getPlanRoot());
        setPlanRoot(leftFragment, hashJoinNode, join);
        context.mergePlanFragment(rightFragment, leftFragment);
        for (PlanFragment rightChild : rightFragment.getChildren()) {
            leftFragment.addChild(rightChild);
        }
        return leftFragment;
    }

    private List<SlotReference> collectGroupBySlots(List<Expression> groupByExpressions,
            List<NamedExpression> outputExpressions) {
        List<SlotReference> groupSlots = Lists.newArrayList();
        Set<VirtualSlotReference> virtualSlotReferences = groupByExpressions.stream()
                .filter(VirtualSlotReference.class::isInstance)
                .map(VirtualSlotReference.class::cast)
                .collect(Collectors.toSet());
        for (Expression e : groupByExpressions) {
            if (e instanceof SlotReference && outputExpressions.stream().anyMatch(o -> o.anyMatch(e::equals))) {
                groupSlots.add((SlotReference) e);
            } else if (e instanceof SlotReference && !virtualSlotReferences.isEmpty()) {
                // When there is a virtualSlot, it is a groupingSets scenario,
                // and the original exprId should be retained at this time.
                groupSlots.add((SlotReference) e);
            } else {
                groupSlots.add(new SlotReference(e.toSql(), e.getDataType(), e.nullable(), ImmutableList.of()));
            }
        }
        return groupSlots;
    }

    private List<Integer> getSlotIds(TupleDescriptor tupleDescriptor) {
        return tupleDescriptor.getSlots()
                .stream()
                .map(slot -> slot.getId().asInt())
                .collect(ImmutableList.toImmutableList());
    }

    private Map<ExprId, SlotRef> getBufferedSlotRefForWindow(WindowFrameGroup windowFrameGroup,
                                                             PlanTranslatorContext context) {
        Map<ExprId, SlotRef> bufferedSlotRefForWindow = context.getBufferedSlotRefForWindow();

        // set if absent
        windowFrameGroup.getPartitionKeys().stream()
                .map(NamedExpression.class::cast)
                .forEach(expression -> {
                    ExprId exprId = expression.getExprId();
                    bufferedSlotRefForWindow.putIfAbsent(exprId, context.findSlotRef(exprId));
                });
        windowFrameGroup.getOrderKeys().stream()
                .map(UnaryNode::child)
                .map(NamedExpression.class::cast)
                .forEach(expression -> {
                    ExprId exprId = expression.getExprId();
                    bufferedSlotRefForWindow.putIfAbsent(exprId, context.findSlotRef(exprId));
                });
        return bufferedSlotRefForWindow;
    }

    private Expr windowExprsHaveMatchedNullable(List<Expression> expressions, List<Expr> exprs,
                                                Map<ExprId, SlotRef> bufferedSlotRef) {
        Map<ExprId, Expr> exprIdToExpr = Maps.newHashMap();
        for (int i = 0; i < expressions.size(); i++) {
            NamedExpression expression = (NamedExpression) expressions.get(i);
            exprIdToExpr.put(expression.getExprId(), exprs.get(i));
        }
        return windowExprsHaveMatchedNullable(exprIdToExpr, bufferedSlotRef, expressions, 0, expressions.size());
    }

    private Expr windowExprsHaveMatchedNullable(Map<ExprId, Expr> exprIdToExpr, Map<ExprId, SlotRef> exprIdToSlotRef,
                                                List<Expression> expressions, int i, int size) {
        if (i > size - 1) {
            return new BoolLiteral(true);
        }

        ExprId exprId = ((NamedExpression) expressions.get(i)).getExprId();
        Expr lhs = exprIdToExpr.get(exprId);
        Expr rhs = exprIdToSlotRef.get(exprId);

        Expr bothNull = new CompoundPredicate(CompoundPredicate.Operator.AND,
                new IsNullPredicate(lhs, false, true), new IsNullPredicate(rhs, false, true));
        Expr lhsEqRhsNotNull = new CompoundPredicate(CompoundPredicate.Operator.AND,
                new CompoundPredicate(CompoundPredicate.Operator.AND,
                        new IsNullPredicate(lhs, true, true), new IsNullPredicate(rhs, true, true)),
                new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs,
                        Type.BOOLEAN, NullableMode.DEPEND_ON_ARGUMENT));

        Expr remainder = windowExprsHaveMatchedNullable(exprIdToExpr, exprIdToSlotRef, expressions, i + 1, size);
        return new CompoundPredicate(CompoundPredicate.Operator.AND,
                new CompoundPredicate(CompoundPredicate.Operator.OR, bothNull, lhsEqRhsNotNull), remainder);
    }

    private PlanFragment createPlanFragment(PlanNode planNode, DataPartition dataPartition, AbstractPlan physicalPlan) {
        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), planNode, dataPartition);
        updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
        return planFragment;
    }

    // TODO: refactor this, call it every where is not a good way
    private void setPlanRoot(PlanFragment fragment, PlanNode planNode, AbstractPlan physicalPlan) {
        fragment.setPlanRoot(planNode);
        updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
    }

    // TODO: refactor this, call it every where is not a good way
    private void addPlanRoot(PlanFragment fragment, PlanNode planNode, AbstractPlan physicalPlan) {
        fragment.addPlanRoot(planNode);
        updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
    }

    private DataPartition toDataPartition(DistributionSpec distributionSpec/* target distribution */,
                    List<ExprId> childOutputIds, PlanTranslatorContext context) {
        if (distributionSpec instanceof DistributionSpecAny
                || distributionSpec instanceof DistributionSpecStorageAny
                || distributionSpec instanceof DistributionSpecExecutionAny) {
            return DataPartition.RANDOM;
        } else if (distributionSpec instanceof DistributionSpecGather // gather to one. will set instance later
                // gather to one which has its storage. not useful now.
                || distributionSpec instanceof DistributionSpecStorageGather
                || distributionSpec instanceof DistributionSpecReplicated // broadcast to all
                || distributionSpec instanceof DistributionSpecAllSingleton // broadcast to all. one BE one instance
        ) {
            // broadcast to all (if only one, one equals all)
            return DataPartition.UNPARTITIONED;
        } else if (distributionSpec instanceof DistributionSpecHash) {
            DistributionSpecHash distributionSpecHash = (DistributionSpecHash) distributionSpec;
            List<Expr> partitionExprs = Lists.newArrayList();
            for (int i = 0; i < distributionSpecHash.getEquivalenceExprIds().size(); i++) {
                Set<ExprId> equivalenceExprId = distributionSpecHash.getEquivalenceExprIds().get(i);
                for (ExprId exprId : equivalenceExprId) {
                    if (childOutputIds.contains(exprId)) {
                        partitionExprs.add(context.findSlotRef(exprId));
                        break;
                    }
                }
                if (partitionExprs.size() != i + 1) {
                    throw new RuntimeException("Cannot translate DistributionSpec to DataPartition,"
                            + " DistributionSpec: " + distributionSpec
                            + ", child output: " + childOutputIds);
                }
            }
            TPartitionType partitionType;
            switch (distributionSpecHash.getShuffleType()) {
                case STORAGE_BUCKETED:
                    partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
                    break;
                case EXECUTION_BUCKETED:
                    partitionType = TPartitionType.HASH_PARTITIONED;
                    break;
                case NATURAL:
                default:
                    throw new RuntimeException("Do not support shuffle type: "
                            + distributionSpecHash.getShuffleType());
            }
            return new DataPartition(partitionType, partitionExprs);
        } else if (distributionSpec instanceof DistributionSpecOlapTableSinkHashPartitioned) {
            return DataPartition.TABLET_ID;
        } else if (distributionSpec instanceof DistributionSpecHiveTableSinkHashPartitioned) {
            DistributionSpecHiveTableSinkHashPartitioned partitionSpecHash =
                    (DistributionSpecHiveTableSinkHashPartitioned) distributionSpec;
            List<Expr> partitionExprs = Lists.newArrayList();
            List<ExprId> partitionExprIds = partitionSpecHash.getOutputColExprIds();
            for (ExprId partitionExprId : partitionExprIds) {
                if (childOutputIds.contains(partitionExprId)) {
                    partitionExprs.add(context.findSlotRef(partitionExprId));
                }
            }
            return new DataPartition(TPartitionType.HIVE_TABLE_SINK_HASH_PARTITIONED, partitionExprs);
        } else if (distributionSpec instanceof DistributionSpecHiveTableSinkUnPartitioned) {
            return new DataPartition(TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED);
        } else {
            throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec);
        }
    }

    // TODO: refactor this, call it every where is not a good way
    private void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, AbstractPlan physicalPlan) {
        if (statsErrorEstimator != null) {
            statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
        }
    }

    private void injectRowIdColumnSlot(TupleDescriptor tupleDesc) {
        SlotDescriptor slotDesc = context.addSlotDesc(tupleDesc);
        if (LOG.isDebugEnabled()) {
            LOG.debug("inject slot {}", slotDesc);
        }
        String name = Column.ROWID_COL;
        Column col = new Column(name, Type.STRING, false, null, false, "", "rowid column");
        slotDesc.setType(Type.STRING);
        slotDesc.setColumn(col);
        slotDesc.setIsNullable(false);
        slotDesc.setIsMaterialized(true);
    }

    /**
     * topN opt: using storage data ordering to accelerate topn operation.
     * refer pr: optimize topn query if order by columns is prefix of sort keys of table (#10694)
     */
    private boolean checkPushSort(SortNode sortNode, OlapTable olapTable) {
        // Ensure limit is less than threshold
        if (sortNode.getLimit() <= 0
                || sortNode.getLimit() > context.getSessionVariable().topnOptLimitThreshold) {
            return false;
        }

        // Ensure all isAscOrder is same, ande length != 0. Can't be z-order.
        if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1 || olapTable.isZOrderSort()) {
            return false;
        }

        // Tablet's order by key only can be the front part of schema.
        // Like: schema: a.b.c.d.e.f.g order by key: a.b.c (no a,b,d)
        // Do **prefix match** to check if order by key can be pushed down.
        // olap order by key: a.b.c.d
        // sort key: (a) (a,b) (a,b,c) (a,b,c,d) is ok
        //           (a,c) (a,c,d), (a,c,b) (a,c,f) (a,b,c,d,e)is NOT ok
        List<Expr> sortExprs = sortNode.getSortInfo().getOrderingExprs();
        List<Boolean> nullsFirsts = sortNode.getSortInfo().getNullsFirst();
        List<Boolean> isAscOrders = sortNode.getSortInfo().getIsAscOrder();
        if (sortExprs.size() > olapTable.getDataSortInfo().getColNum()) {
            return false;
        }
        List<Column> sortKeyColumns = new ArrayList<>(olapTable.getFullSchema());
        if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
            Map<Integer, Column> clusterKeyMap = new TreeMap<>();
            for (Column column : olapTable.getFullSchema()) {
                if (column.getClusterKeyId() != -1) {
                    clusterKeyMap.put(column.getClusterKeyId(), column);
                }
            }
            if (!clusterKeyMap.isEmpty()) {
                sortKeyColumns.clear();
                sortKeyColumns.addAll(clusterKeyMap.values());
            }
        }
        for (int i = 0; i < sortExprs.size(); i++) {
            // sort key.
            Column sortColumn = sortKeyColumns.get(i);
            // sort slot.
            Expr sortExpr = sortExprs.get(i);
            if (sortExpr instanceof SlotRef) {
                SlotRef slotRef = (SlotRef) sortExpr;
                if (sortColumn.equals(slotRef.getColumn())) {
                    // [ORDER BY DESC NULLS FIRST] or [ORDER BY ASC NULLS LAST] can not be optimized
                    // to only read file tail, since NULLS is at file head but data is at tail
                    if (sortColumn.isAllowNull() && nullsFirsts.get(i) != isAscOrders.get(i)) {
                        return false;
                    }
                } else {
                    return false;
                }
            } else {
                return false;
            }
        }

        return true;
    }

    private List<Expr> translateToLegacyConjuncts(Set<Expression> conjuncts) {
        List<Expr> outputExprs = Lists.newArrayList();
        if (conjuncts != null) {
            conjuncts.stream()
                    .map(e -> ExpressionTranslator.translate(e, context))
                    .forEach(outputExprs::add);
        }
        return outputExprs;
    }

    private boolean isComplexDataType(DataType dataType) {
        return dataType instanceof ArrayType || dataType instanceof MapType || dataType instanceof JsonType
                || dataType instanceof StructType;
    }

    private PhysicalCTEConsumer getCTEConsumerChild(PhysicalPlan root) {
        if (root == null) {
            return null;
        } else if (root instanceof PhysicalCTEConsumer) {
            return (PhysicalCTEConsumer) root;
        } else if (root.children().size() != 1) {
            return null;
        } else {
            return getCTEConsumerChild((PhysicalPlan) root.child(0));
        }
    }

    private boolean findOlapScanNodesByPassExchangeAndJoinNode(PlanNode root) {
        if (root instanceof OlapScanNode) {
            return true;
        } else if (!(root instanceof JoinNodeBase || root instanceof ExchangeNode)) {
            return root.getChildren().stream().anyMatch(child -> findOlapScanNodesByPassExchangeAndJoinNode(child));
        }
        return false;
    }

    private List<List<Expr>> getDistributeExprs(Plan ... children) {
        List<List<Expr>> distributeExprLists = Lists.newArrayList();
        for (Plan child : children) {
            DistributionSpec spec = ((PhysicalPlan) child).getPhysicalProperties().getDistributionSpec();
            distributeExprLists.add(getDistributeExpr(child.getOutputExprIds(), spec));
        }
        return distributeExprLists;
    }

    private List<Expr> getDistributeExpr(List<ExprId> childOutputIds, DistributionSpec spec) {
        if (spec instanceof DistributionSpecHash) {
            DistributionSpecHash distributionSpecHash = (DistributionSpecHash) spec;
            List<Expr> partitionExprs = Lists.newArrayList();
            for (int i = 0; i < distributionSpecHash.getEquivalenceExprIds().size(); i++) {
                Set<ExprId> equivalenceExprId = distributionSpecHash.getEquivalenceExprIds().get(i);
                for (ExprId exprId : equivalenceExprId) {
                    if (childOutputIds.contains(exprId)) {
                        partitionExprs.add(context.findSlotRef(exprId));
                        break;
                    }
                }
            }
            return partitionExprs;
        }
        return Lists.newArrayList();
    }
}