ThriftPlansBuilder.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe.runtime;

import org.apache.doris.common.Config;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.CoordinatorContext;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TPlanFragmentDestination;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TTopnFilterDesc;

import com.google.common.base.Suppliers;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.LinkedHashMultiset;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class ThriftPlansBuilder {
    private static final Logger LOG = LogManager.getLogger(ThriftPlansBuilder.class);

    public static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToThrift(
            CoordinatorContext coordinatorContext) {

        List<PipelineDistributedPlan> distributedPlans = coordinatorContext.distributedPlans;

        // we should set runtime predicate first, then we can use heap sort and to thrift
        setRuntimePredicateIfNeed(coordinatorContext.scanNodes);

        RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder = RuntimeFiltersThriftBuilder.compute(
                coordinatorContext.runtimeFilters, distributedPlans);
        Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier
                = topNFilterToThrift(coordinatorContext.topnFilters);

        Multiset<DistributedPlanWorker> workerProcessInstanceNum = computeInstanceNumPerWorker(distributedPlans);
        Map<DistributedPlanWorker, TPipelineFragmentParamsList> fragmentsGroupByWorker = Maps.newLinkedHashMap();
        int currentInstanceIndex = 0;
        Map<Integer, TFileScanRangeParams> sharedFileScanRangeParams = Maps.newLinkedHashMap();
        for (PipelineDistributedPlan currentFragmentPlan : distributedPlans) {
            sharedFileScanRangeParams.putAll(computeFileScanRangeParams(currentFragmentPlan));

            Map<Integer, Integer> exchangeSenderNum = computeExchangeSenderNum(currentFragmentPlan);
            ListMultimap<DistributedPlanWorker, AssignedJob> instancesPerWorker
                    = groupInstancePerWorker(currentFragmentPlan);
            Map<DistributedPlanWorker, TPipelineFragmentParams> workerToCurrentFragment = Maps.newLinkedHashMap();

            for (AssignedJob instanceJob : currentFragmentPlan.getInstanceJobs()) {
                TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent(
                        currentFragmentPlan, instanceJob, workerToCurrentFragment,
                        instancesPerWorker, exchangeSenderNum, sharedFileScanRangeParams,
                        workerProcessInstanceNum, coordinatorContext);

                TPipelineInstanceParams instanceParam = instanceToThrift(
                        currentFragmentParam, instanceJob, runtimeFiltersThriftBuilder,
                        topNFilterThriftSupplier, currentInstanceIndex++
                );
                currentFragmentParam.getLocalParams().add(instanceParam);
            }

            // arrange fragments by the same worker,
            // so we can merge and send multiple fragment to a backend use one rpc
            for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv : workerToCurrentFragment.entrySet()) {
                TPipelineFragmentParamsList fragments = fragmentsGroupByWorker.computeIfAbsent(
                        kv.getKey(), w -> new TPipelineFragmentParamsList());
                fragments.addToParamsList(kv.getValue());
            }
        }

        // backend should initialize fragment from target to source in backend, then
        // it can bind the receiver fragment for the sender fragment, but frontend
        // compute thrift message from source to fragment, so we need to reverse fragments.
        for (DistributedPlanWorker worker : fragmentsGroupByWorker.keySet()) {
            Collections.reverse(fragmentsGroupByWorker.get(worker).getParamsList());
        }

        setParamsForOlapTableSink(distributedPlans, fragmentsGroupByWorker, coordinatorContext);

        // remove redundant params to reduce rpc message size
        for (Entry<DistributedPlanWorker, TPipelineFragmentParamsList> kv : fragmentsGroupByWorker.entrySet()) {
            boolean isFirstFragmentInCurrentBackend = true;
            for (TPipelineFragmentParams fragmentParams : kv.getValue().getParamsList()) {
                if (!isFirstFragmentInCurrentBackend) {
                    fragmentParams.unsetDescTbl();
                    fragmentParams.unsetFileScanParams();
                    fragmentParams.unsetCoord();
                    fragmentParams.unsetQueryGlobals();
                    fragmentParams.unsetResourceInfo();
                    fragmentParams.setIsSimplifiedParam(true);
                }
                isFirstFragmentInCurrentBackend = false;
            }
        }
        return fragmentsGroupByWorker;
    }

    private static ListMultimap<DistributedPlanWorker, AssignedJob>
            groupInstancePerWorker(PipelineDistributedPlan fragmentPlan) {
        ListMultimap<DistributedPlanWorker, AssignedJob> workerToInstances = ArrayListMultimap.create();
        for (AssignedJob instanceJob : fragmentPlan.getInstanceJobs()) {
            workerToInstances.put(instanceJob.getAssignedWorker(), instanceJob);
        }
        return workerToInstances;
    }

    private static void setRuntimePredicateIfNeed(Collection<ScanNode> scanNodes) {
        for (ScanNode scanNode : scanNodes) {
            if (scanNode instanceof OlapScanNode) {
                for (SortNode topnFilterSortNode : scanNode.getTopnFilterSortNodes()) {
                    topnFilterSortNode.setHasRuntimePredicate();
                }
            }
        }
    }

    private static Supplier<List<TTopnFilterDesc>> topNFilterToThrift(List<TopnFilter> topnFilters) {
        return Suppliers.memoize(() -> {
            if (CollectionUtils.isEmpty(topnFilters)) {
                return null;
            }

            List<TTopnFilterDesc> filterDescs = new ArrayList<>(topnFilters.size());
            for (TopnFilter topnFilter : topnFilters) {
                filterDescs.add(topnFilter.toThrift());
            }
            return filterDescs;
        });
    }

    private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> distributedPlans,
            Map<DistributedPlanWorker, TPipelineFragmentParamsList> fragmentsGroupByWorker,
            CoordinatorContext coordinatorContext) {
        int numBackendsWithSink = 0;
        for (PipelineDistributedPlan distributedPlan : distributedPlans) {
            PlanFragment fragment = distributedPlan.getFragmentJob().getFragment();
            if (fragment.getSink() instanceof OlapTableSink) {
                numBackendsWithSink += (int) distributedPlan.getInstanceJobs()
                        .stream()
                        .map(AssignedJob::getAssignedWorker)
                        .distinct()
                        .count();
            }
        }

        ConnectContext connectContext = coordinatorContext.connectContext;
        for (Entry<DistributedPlanWorker, TPipelineFragmentParamsList> kv : fragmentsGroupByWorker.entrySet()) {
            TPipelineFragmentParamsList fragments = kv.getValue();
            for (TPipelineFragmentParams fragmentParams : fragments.getParamsList()) {
                if (fragmentParams.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) {
                    int loadStreamPerNode = 1;
                    if (connectContext != null && connectContext.getSessionVariable() != null) {
                        loadStreamPerNode = connectContext.getSessionVariable().getLoadStreamPerNode();
                    }
                    fragmentParams.setLoadStreamPerNode(loadStreamPerNode);
                    fragmentParams.setTotalLoadStreams(numBackendsWithSink * loadStreamPerNode);
                    fragmentParams.setNumLocalSink(fragmentParams.getLocalParams().size());
                    LOG.info("num local sink for backend {} is {}", fragmentParams.getBackendId(),
                            fragmentParams.getNumLocalSink());
                }
            }
        }
    }

    private static Multiset<DistributedPlanWorker> computeInstanceNumPerWorker(
            List<PipelineDistributedPlan> distributedPlans) {
        Multiset<DistributedPlanWorker> workerCounter = LinkedHashMultiset.create();
        for (PipelineDistributedPlan distributedPlan : distributedPlans) {
            for (AssignedJob instanceJob : distributedPlan.getInstanceJobs()) {
                workerCounter.add(instanceJob.getAssignedWorker());
            }
        }
        return workerCounter;
    }

    private static Map<Integer, Integer> computeExchangeSenderNum(PipelineDistributedPlan distributedPlan) {
        Map<Integer, Integer> senderNum = Maps.newLinkedHashMap();
        for (Entry<ExchangeNode, DistributedPlan> kv : distributedPlan.getInputs().entries()) {
            ExchangeNode exchangeNode = kv.getKey();
            PipelineDistributedPlan childPlan = (PipelineDistributedPlan) kv.getValue();
            senderNum.merge(exchangeNode.getId().asInt(), childPlan.getInstanceJobs().size(), Integer::sum);
        }
        return senderNum;
    }

    private static void setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) {
        MultiCastDataSink multiCastDataSink = (MultiCastDataSink) fragmentPlan.getFragmentJob().getFragment().getSink();
        List<List<TPlanFragmentDestination>> destinationList = multiCastDataSink.getDestinations();

        List<DataStreamSink> dataStreamSinks = multiCastDataSink.getDataStreamSinks();
        for (int i = 0; i < dataStreamSinks.size(); i++) {
            List<TPlanFragmentDestination> destinations = destinationList.get(i);
            if (!destinations.isEmpty()) {
                // we should only set destination only once,
                // because all backends share the same MultiCastDataSink object
                continue;
            }
            DataStreamSink realSink = dataStreamSinks.get(i);
            for (Entry<DataSink, List<AssignedJob>> kv : fragmentPlan.getDestinations().entrySet()) {
                DataSink sink = kv.getKey();
                if (sink == realSink) {
                    List<AssignedJob> destInstances = kv.getValue();
                    for (AssignedJob destInstance : destInstances) {
                        destinations.add(instanceToDestination(destInstance));
                    }
                    break;
                }
            }
        }
    }

    private static List<TPlanFragmentDestination> nonMultiCastDestinationToThrift(PipelineDistributedPlan plan) {
        Map<DataSink, List<AssignedJob>> destinationsMapping = plan.getDestinations();
        List<TPlanFragmentDestination> destinations = Lists.newArrayList();
        if (!destinationsMapping.isEmpty()) {
            List<AssignedJob> destinationJobs = destinationsMapping.entrySet().iterator().next().getValue();
            for (AssignedJob destinationJob : destinationJobs) {
                destinations.add(instanceToDestination(destinationJob));
            }
        }
        return destinations;
    }

    private static TPlanFragmentDestination instanceToDestination(AssignedJob instance) {
        DistributedPlanWorker worker = instance.getAssignedWorker();
        String host = worker.host();
        int port = worker.port();
        int brpcPort = worker.brpcPort();

        TPlanFragmentDestination destination = new TPlanFragmentDestination();
        destination.setServer(new TNetworkAddress(host, port));
        destination.setBrpcServer(new TNetworkAddress(host, brpcPort));
        destination.setFragmentInstanceId(instance.instanceId());
        return destination;
    }

    private static TPipelineFragmentParams fragmentToThriftIfAbsent(
            PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob,
            Map<DistributedPlanWorker, TPipelineFragmentParams> workerToFragmentParams,
            ListMultimap<DistributedPlanWorker, AssignedJob> instancesPerWorker,
            Map<Integer, Integer> exchangeSenderNum,
            Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap,
            Multiset<DistributedPlanWorker> workerProcessInstanceNum,
            CoordinatorContext coordinatorContext) {
        DistributedPlanWorker worker = assignedJob.getAssignedWorker();
        return workerToFragmentParams.computeIfAbsent(worker, w -> {
            PlanFragment fragment = fragmentPlan.getFragmentJob().getFragment();
            ConnectContext connectContext = coordinatorContext.connectContext;

            TPipelineFragmentParams params = new TPipelineFragmentParams();
            params.setIsNereids(true);
            params.setBackendId(worker.id());
            params.setProtocolVersion(PaloInternalServiceVersion.V1);
            params.setDescTbl(coordinatorContext.descriptorTable);
            params.setQueryId(coordinatorContext.queryId);
            params.setFragmentId(fragment.getFragmentId().asInt());

            // Each tParam will set the total number of Fragments that need to be executed on the same BE,
            // and the BE will determine whether all Fragments have been executed based on this information.
            // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons.
            params.setFragmentNumOnHost(workerProcessInstanceNum.count(worker));

            params.setNeedWaitExecutionTrigger(coordinatorContext.twoPhaseExecution());
            params.setPerExchNumSenders(exchangeSenderNum);

            List<TPlanFragmentDestination> nonMultiCastDestinations;
            if (fragment.getSink() instanceof MultiCastDataSink) {
                nonMultiCastDestinations = Lists.newArrayList();
                setMultiCastDestinationThriftIfNotSet(fragmentPlan);
            } else {
                nonMultiCastDestinations = nonMultiCastDestinationToThrift(fragmentPlan);
            }
            params.setDestinations(nonMultiCastDestinations);

            int instanceNumInThisFragment = fragmentPlan.getInstanceJobs().size();
            params.setNumSenders(instanceNumInThisFragment);
            params.setTotalInstances(instanceNumInThisFragment);

            params.setCoord(coordinatorContext.coordinatorAddress);
            params.setCurrentConnectFe(coordinatorContext.directConnectFrontendAddress.get());
            params.setQueryGlobals(coordinatorContext.queryGlobals);
            params.setQueryOptions(new TQueryOptions(coordinatorContext.queryOptions));
            long memLimit = coordinatorContext.queryOptions.getMemLimit();
            // update memory limit for colocate join
            if (connectContext != null
                    && !connectContext.getSessionVariable().isDisableColocatePlan()
                    && fragment.hasColocatePlanNode()) {
                int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNumInThisFragment);
                memLimit = coordinatorContext.queryOptions.getMemLimit() / rate;
            }
            params.getQueryOptions().setMemLimit(memLimit);

            params.setSendQueryStatisticsWithEveryBatch(fragment.isTransferQueryStatisticsWithEveryBatch());

            TPlanFragment planThrift = fragment.toThrift();
            planThrift.query_cache_param = fragment.queryCacheParam;
            params.setFragment(planThrift);
            params.setLocalParams(Lists.newArrayList());
            params.setWorkloadGroups(coordinatorContext.getWorkloadGroups());

            params.setFileScanParams(fileScanRangeParamsMap);

            if (fragmentPlan.getFragmentJob() instanceof UnassignedScanBucketOlapTableJob) {
                int bucketNum = ((UnassignedScanBucketOlapTableJob) fragmentPlan.getFragmentJob())
                        .getOlapScanNodes()
                        .get(0)
                        .getBucketNum();
                params.setNumBuckets(bucketNum);
            }

            List<AssignedJob> instances = instancesPerWorker.get(worker);
            Map<AssignedJob, Integer> instanceToIndex = instanceToIndex(instances);

            // local shuffle params: bucket_seq_to_instance_idx and shuffle_idx_to_instance_idx
            params.setBucketSeqToInstanceIdx(computeBucketIdToInstanceId(fragmentPlan, w, instanceToIndex));
            params.setShuffleIdxToInstanceIdx(computeDestIdToInstanceId(fragmentPlan, w, instanceToIndex));
            return params;
        });
    }

    private static Map<AssignedJob, Integer> instanceToIndex(List<AssignedJob> instances) {
        Map<AssignedJob, Integer> instanceToIndex = Maps.newLinkedHashMap();
        for (int instanceIndex = 0; instanceIndex < instances.size(); instanceIndex++) {
            instanceToIndex.put(instances.get(instanceIndex), instanceIndex);
        }

        return instanceToIndex;
    }

    private static Map<Integer, TFileScanRangeParams> computeFileScanRangeParams(
            PipelineDistributedPlan distributedPlan) {
        // scan node id -> TFileScanRangeParams
        Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap = Maps.newLinkedHashMap();
        for (ScanNode scanNode : distributedPlan.getFragmentJob().getScanNodes()) {
            if (scanNode instanceof FileQueryScanNode) {
                TFileScanRangeParams fileScanRangeParams = ((FileQueryScanNode) scanNode).getFileScanRangeParams();
                fileScanRangeParamsMap.put(scanNode.getId().asInt(), fileScanRangeParams);
            }
        }

        return fileScanRangeParamsMap;
    }

    private static TPipelineInstanceParams instanceToThrift(
            TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
            RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
            Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier, int currentInstanceNum) {
        TPipelineInstanceParams instanceParam = new TPipelineInstanceParams();
        instanceParam.setFragmentInstanceId(instance.instanceId());
        setScanSourceParam(currentFragmentParam, instance, instanceParam);

        instanceParam.setSenderId(instance.indexInUnassignedJob());
        instanceParam.setBackendNum(currentInstanceNum);
        instanceParam.setRuntimeFilterParams(new TRuntimeFilterParams());

        instanceParam.setTopnFilterDescs(topNFilterThriftSupplier.get());

        // set for runtime filter
        TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
        runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
        instanceParam.setRuntimeFilterParams(runtimeFilterParams);
        if (runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
            runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
        }
        boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
        if (isLocalShuffle) {
            // a fragment in a backend only enable local shuffle once for the first local shuffle instance,
            // because we just skip set scan params for LocalShuffleAssignedJob.receiveDataFromLocal == true
            ignoreDataDistribution(currentFragmentParam);
        }
        return instanceParam;
    }

    private static void setScanSourceParam(
            TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
            TPipelineInstanceParams instanceParams) {
        ScanSource scanSource = instance.getScanSource();
        PerNodeScanParams scanParams;
        if (scanSource instanceof BucketScanSource) {
            scanParams = computeBucketScanSourceParam((BucketScanSource) scanSource);
        } else {
            scanParams = computeDefaultScanSourceParam((DefaultScanSource) scanSource);
        }
        // perNodeScanRanges is required
        instanceParams.setPerNodeScanRanges(scanParams.perNodeScanRanges);
    }

    // local shuffle has two functions:
    // 1. use 10 scan instances -> local shuffle -> 10 agg instances, this function can balance data in agg
    // 2. use 1 scan instance -> local shuffle -> 10 agg, this function is ignore_data_distribution,
    //    it can add parallel in agg
    private static void ignoreDataDistribution(TPipelineFragmentParams currentFragmentParam) {
        // `parallel_instances == 1` is the switch of ignore_data_distribution,
        // and backend will use 1 instance to scan a little data, and local shuffle to
        // # SessionVariable.parallel_pipeline_task_num instances to increment parallel
        currentFragmentParam.setParallelInstances(1);
    }

    private static PerNodeScanParams computeDefaultScanSourceParam(DefaultScanSource defaultScanSource) {
        Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newLinkedHashMap();
        for (Entry<ScanNode, ScanRanges> kv : defaultScanSource.scanNodeToScanRanges.entrySet()) {
            int scanNodeId = kv.getKey().getId().asInt();
            perNodeScanRanges.put(scanNodeId, kv.getValue().params);
        }

        return new PerNodeScanParams(perNodeScanRanges);
    }

    private static PerNodeScanParams computeBucketScanSourceParam(BucketScanSource bucketScanSource) {
        Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newLinkedHashMap();
        for (Entry<Integer, Map<ScanNode, ScanRanges>> kv :
                bucketScanSource.bucketIndexToScanNodeToTablets.entrySet()) {
            Map<ScanNode, ScanRanges> scanNodeToRanges = kv.getValue();
            for (Entry<ScanNode, ScanRanges> kv2 : scanNodeToRanges.entrySet()) {
                int scanNodeId = kv2.getKey().getId().asInt();
                List<TScanRangeParams> scanRanges = perNodeScanRanges.computeIfAbsent(scanNodeId, ArrayList::new);
                scanRanges.addAll(kv2.getValue().params);
            }
        }
        return new PerNodeScanParams(perNodeScanRanges);
    }

    private static Map<Integer, Integer> computeBucketIdToInstanceId(
            PipelineDistributedPlan receivePlan, DistributedPlanWorker worker,
            Map<AssignedJob, Integer> instanceToIndex) {
        List<AssignedJob> instanceJobs = receivePlan.getInstanceJobs();
        if (instanceJobs.isEmpty() || !(instanceJobs.get(0).getScanSource() instanceof BucketScanSource)) {
            // bucket_seq_to_instance_id is optional, so we can return null to save memory
            return null;
        }

        Map<Integer, Integer> bucketIdToInstanceId = Maps.newLinkedHashMap();
        for (AssignedJob instanceJob : instanceJobs) {
            if (instanceJob.getAssignedWorker().id() != worker.id()) {
                continue;
            }

            Integer instanceIndex = instanceToIndex.get(instanceJob);
            if (instanceJob instanceof LocalShuffleBucketJoinAssignedJob) {
                LocalShuffleBucketJoinAssignedJob assignedJob = (LocalShuffleBucketJoinAssignedJob) instanceJob;
                for (Integer bucketIndex : assignedJob.getAssignedJoinBucketIndexes()) {
                    bucketIdToInstanceId.put(bucketIndex, instanceIndex);
                }
            } else {
                BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource();
                for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
                    bucketIdToInstanceId.put(bucketIndex, instanceIndex);
                }
            }
        }
        return bucketIdToInstanceId;
    }

    private static Map<Integer, Integer> computeDestIdToInstanceId(
            PipelineDistributedPlan receivePlan, DistributedPlanWorker worker,
            Map<AssignedJob, Integer> instanceToIndex) {
        if (receivePlan.getInputs().isEmpty()) {
            // shuffle_idx_to_index_id is required
            return Maps.newLinkedHashMap();
        }

        Map<Integer, Integer> destIdToInstanceId = Maps.newLinkedHashMap();
        filterInstancesWhichReceiveDataFromRemote(
                receivePlan, worker,
                (instanceJob, destId) -> destIdToInstanceId.put(destId, instanceToIndex.get(instanceJob))
        );
        return destIdToInstanceId;
    }

    private static void filterInstancesWhichReceiveDataFromRemote(
            PipelineDistributedPlan receivePlan, DistributedPlanWorker filterWorker,
            BiConsumer<AssignedJob, Integer> computeFn) {

        // current only support all input plans have same destination with same order,
        // so we can get first input plan to compute shuffle index to instance id
        Set<Entry<ExchangeNode, DistributedPlan>> exchangeToChildPlanSet = receivePlan.getInputs().entries();
        if (exchangeToChildPlanSet.isEmpty()) {
            return;
        }
        Entry<ExchangeNode, DistributedPlan> exchangeToChildPlan = exchangeToChildPlanSet.iterator().next();
        ExchangeNode linkNode = exchangeToChildPlan.getKey();
        PipelineDistributedPlan firstInputPlan = (PipelineDistributedPlan) exchangeToChildPlan.getValue();
        Map<DataSink, List<AssignedJob>> sinkToDestInstances = firstInputPlan.getDestinations();
        for (Entry<DataSink, List<AssignedJob>> kv : sinkToDestInstances.entrySet()) {
            DataSink senderSink = kv.getKey();
            if (senderSink.getExchNodeId().asInt() == linkNode.getId().asInt()) {
                for (int destId = 0; destId < kv.getValue().size(); destId++) {
                    AssignedJob assignedJob = kv.getValue().get(destId);
                    if (assignedJob.getAssignedWorker().id() == filterWorker.id()) {
                        computeFn.accept(assignedJob, destId);
                    }
                }
                break;
            }
        }
    }

    private static class PerNodeScanParams {
        Map<Integer, List<TScanRangeParams>> perNodeScanRanges;

        public PerNodeScanParams(Map<Integer, List<TScanRangeParams>> perNodeScanRanges) {
            this.perNodeScanRanges = perNodeScanRanges;
        }
    }
}