UnassignedJobBuilder.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.DictionarySink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.thrift.TExplainLevel;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
 * UnassignedJobBuilder.
 * build UnassignedJob by fragment
 */
public class UnassignedJobBuilder {
    private final ScanWorkerSelector scanWorkerSelector;

    public UnassignedJobBuilder(ScanWorkerSelector scanWorkerSelector) {
        this.scanWorkerSelector = scanWorkerSelector;
    }

    /**
     * build job from fragment.
     */
    public static FragmentIdMapping<UnassignedJob> buildJobs(
            ScanWorkerSelector scanWorkerSelector,
            StatementContext statementContext, FragmentIdMapping<PlanFragment> fragments) {
        UnassignedJobBuilder builder = new UnassignedJobBuilder(scanWorkerSelector);

        FragmentLineage fragmentLineage = buildFragmentLineage(fragments);
        FragmentIdMapping<UnassignedJob> unassignedJobs = new FragmentIdMapping<>();

        // build from leaf to parent
        Iterator<Entry<PlanFragmentId, PlanFragment>> iterator = fragments.entrySet().iterator();
        while (iterator.hasNext()) {
            Entry<PlanFragmentId, PlanFragment> kv = iterator.next();
            boolean isTopFragment = !iterator.hasNext();

            PlanFragmentId fragmentId = kv.getKey();
            PlanFragment fragment = kv.getValue();

            ListMultimap<ExchangeNode, UnassignedJob> inputJobs = findInputJobs(
                    fragmentLineage, fragmentId, unassignedJobs);
            UnassignedJob unassignedJob = builder.buildJob(statementContext, fragment, inputJobs, isTopFragment);
            unassignedJobs.put(fragmentId, unassignedJob);
        }

        return unassignedJobs;
    }

    private UnassignedJob buildJob(
            StatementContext statementContext, PlanFragment planFragment,
            ListMultimap<ExchangeNode, UnassignedJob> inputJobs, boolean isTopFragment) {
        List<ScanNode> scanNodes = collectScanNodesInThisFragment(planFragment);
        if (planFragment.specifyInstances.isPresent()) {
            return buildSpecifyInstancesJob(statementContext, planFragment, scanNodes, inputJobs);
        } else if (planFragment.getSink() instanceof DictionarySink) {
            // this fragment already set its instances in `visitPhysicalDistribute`.
            // now assign to 1 BE 1 instance.
            return new UnassignedAllBEJob(statementContext, planFragment, inputJobs);
        } else if (scanNodes.isEmpty() && isTopFragment
                && statementContext.getGroupCommitMergeBackend() != null) {
            return new UnassignedGroupCommitJob(statementContext, planFragment, scanNodes, inputJobs);
        } else if (!scanNodes.isEmpty() || isLeafFragment(planFragment)) {
            return buildLeafOrScanJob(statementContext, planFragment, scanNodes, inputJobs);
        } else {
            return buildShuffleJob(statementContext, planFragment, inputJobs);
        }
    }

    private UnassignedJob buildLeafOrScanJob(
            StatementContext statementContext, PlanFragment planFragment, List<ScanNode> scanNodes,
            ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
        int olapScanNodeNum = olapScanNodeNum(scanNodes);

        UnassignedJob unassignedJob = null;
        if (!scanNodes.isEmpty() && olapScanNodeNum == scanNodes.size()) {
            // we need assign a backend which contains the data,
            // so that the OlapScanNode can find the data in the backend
            // e.g. select * from olap_table
            unassignedJob = buildScanOlapTableJob(
                    statementContext, planFragment, (List) scanNodes, inputJobs, scanWorkerSelector
            );
        } else if (scanNodes.isEmpty()) {
            // select constant without table,
            // e.g. select 100 union select 200
            unassignedJob = buildQueryConstantJob(statementContext, planFragment);
        } else if (olapScanNodeNum == 0) {
            ScanNode scanNode = scanNodes.get(0);
            if (scanNode instanceof SchemaScanNode) {
                // select * from information_schema.tables
                unassignedJob = buildScanMetadataJob(
                        statementContext, planFragment, (SchemaScanNode) scanNode, scanWorkerSelector
                );
            } else {
                // only scan external tables or cloud tables or table valued functions
                // e,g. select * from numbers('number'='100')
                unassignedJob = buildScanRemoteTableJob(
                        statementContext, planFragment, scanNodes, inputJobs, scanWorkerSelector
                );
            }
        }

        if (unassignedJob != null) {
            return unassignedJob;
        }

        throw new IllegalStateException("Cannot generate unassignedJob for fragment"
                + " has both OlapScanNode and Other ScanNode: "
                + planFragment.getExplainString(TExplainLevel.VERBOSE));
    }

    private UnassignedJob buildSpecifyInstancesJob(
            StatementContext statementContext, PlanFragment planFragment,
            List<ScanNode> scanNodes, ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
        return new UnassignedSpecifyInstancesJob(statementContext, planFragment, scanNodes, inputJobs);
    }

    private UnassignedJob buildScanOlapTableJob(
            StatementContext statementContext, PlanFragment planFragment, List<OlapScanNode> olapScanNodes,
            ListMultimap<ExchangeNode, UnassignedJob> inputJobs,
            ScanWorkerSelector scanWorkerSelector) {
        if (shouldAssignByBucket(planFragment)) {
            return new UnassignedScanBucketOlapTableJob(
                    statementContext, planFragment, olapScanNodes, inputJobs, scanWorkerSelector);
        } else if (olapScanNodes.size() == 1) {
            return new UnassignedScanSingleOlapTableJob(
                    statementContext, planFragment, olapScanNodes.get(0), inputJobs, scanWorkerSelector);
        } else {
            throw new IllegalStateException("Not supported multiple scan multiple "
                    + "OlapTable but not contains colocate join or bucket shuffle join: "
                    + planFragment.getExplainString(TExplainLevel.VERBOSE));
        }
    }

    private List<ScanNode> collectScanNodesInThisFragment(PlanFragment planFragment) {
        return planFragment.getPlanRoot().collectInCurrentFragment(ScanNode.class::isInstance);
    }

    private int olapScanNodeNum(List<ScanNode> scanNodes) {
        int olapScanNodeNum = 0;
        for (ScanNode scanNode : scanNodes) {
            if (scanNode instanceof OlapScanNode) {
                olapScanNodeNum++;
            }
        }
        return olapScanNodeNum;
    }

    private boolean isLeafFragment(PlanFragment planFragment) {
        return planFragment.getChildren().isEmpty();
    }

    private UnassignedQueryConstantJob buildQueryConstantJob(
            StatementContext statementContext, PlanFragment planFragment) {
        return new UnassignedQueryConstantJob(statementContext, planFragment);
    }

    private UnassignedJob buildScanMetadataJob(
            StatementContext statementContext, PlanFragment fragment,
            SchemaScanNode schemaScanNode, ScanWorkerSelector scanWorkerSelector) {
        return new UnassignedScanMetadataJob(statementContext, fragment, schemaScanNode, scanWorkerSelector);
    }

    private UnassignedJob buildScanRemoteTableJob(
            StatementContext statementContext, PlanFragment planFragment, List<ScanNode> scanNodes,
            ListMultimap<ExchangeNode, UnassignedJob> inputJobs,
            ScanWorkerSelector scanWorkerSelector) {
        if (scanNodes.size() == 1) {
            return new UnassignedScanSingleRemoteTableJob(
                    statementContext, planFragment, scanNodes.get(0), inputJobs, scanWorkerSelector);
        } else if (UnassignedGatherScanMultiRemoteTablesJob.canApply(scanNodes)) {
            // select * from numbers("number" = "10") a union all select * from numbers("number" = "20") b;
            // use an instance to scan table a and table b
            return new UnassignedGatherScanMultiRemoteTablesJob(statementContext, planFragment, scanNodes, inputJobs);
        } else {
            return null;
        }
    }

    private UnassignedJob buildShuffleJob(
            StatementContext statementContext, PlanFragment planFragment,
            ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
        if (planFragment.isPartitioned()) {
            return new UnassignedShuffleJob(statementContext, planFragment, inputJobs);
        } else {
            return new UnassignedGatherJob(statementContext, planFragment, inputJobs);
        }
    }

    private static ListMultimap<ExchangeNode, UnassignedJob> findInputJobs(
            FragmentLineage lineage, PlanFragmentId fragmentId, FragmentIdMapping<UnassignedJob> unassignedJobs) {
        ListMultimap<ExchangeNode, UnassignedJob> inputJobs = ArrayListMultimap.create();
        Map<PlanNodeId, ExchangeNode> exchangeNodes = lineage.parentFragmentToExchangeNode.get(fragmentId);
        if (exchangeNodes != null) {
            for (Entry<PlanNodeId, ExchangeNode> idToExchange : exchangeNodes.entrySet()) {
                PlanNodeId exchangeId = idToExchange.getKey();
                ExchangeNode exchangeNode = idToExchange.getValue();
                List<PlanFragmentId> childFragmentIds = lineage.exchangeToChildFragment.get(exchangeId);
                for (PlanFragmentId childFragmentId : childFragmentIds) {
                    inputJobs.put(exchangeNode, unassignedJobs.get(childFragmentId));
                }
            }
        }
        return inputJobs;
    }

    private static List<ExchangeNode> collectExchangeNodesInThisFragment(PlanFragment planFragment) {
        return planFragment
                .getPlanRoot()
                .collectInCurrentFragment(ExchangeNode.class::isInstance);
    }

    private static FragmentLineage buildFragmentLineage(
            FragmentIdMapping<PlanFragment> fragments) {
        ListMultimap<PlanNodeId, PlanFragmentId> exchangeToChildFragment = ArrayListMultimap.create();
        FragmentIdMapping<Map<PlanNodeId, ExchangeNode>> parentFragmentToExchangeNode = new FragmentIdMapping<>();

        for (PlanFragment fragment : fragments.values()) {
            PlanFragmentId fragmentId = fragment.getFragmentId();

            // 1. link child fragment to exchange node
            DataSink sink = fragment.getSink();
            if (sink instanceof DataStreamSink) {
                PlanNodeId exchangeNodeId = sink.getExchNodeId();
                exchangeToChildFragment.put(exchangeNodeId, fragmentId);
            } else if (sink instanceof MultiCastDataSink) {
                MultiCastDataSink multiCastDataSink = (MultiCastDataSink) sink;
                for (DataStreamSink dataStreamSink : multiCastDataSink.getDataStreamSinks()) {
                    PlanNodeId exchangeNodeId = dataStreamSink.getExchNodeId();
                    exchangeToChildFragment.put(exchangeNodeId, fragmentId);
                }
            }

            // 2. link parent fragment to exchange node
            List<ExchangeNode> exchangeNodes = collectExchangeNodesInThisFragment(fragment);
            Map<PlanNodeId, ExchangeNode> exchangeNodesInFragment = Maps.newLinkedHashMap();
            for (ExchangeNode exchangeNode : exchangeNodes) {
                exchangeNodesInFragment.put(exchangeNode.getId(), exchangeNode);
            }
            parentFragmentToExchangeNode.put(fragmentId, exchangeNodesInFragment);
        }

        return new FragmentLineage(parentFragmentToExchangeNode, exchangeToChildFragment);
    }

    private static boolean shouldAssignByBucket(PlanFragment fragment) {
        if (fragment.hasColocatePlanNode()) {
            return true;
        }
        if (fragment.hasBucketShuffleJoin()) {
            return true;
        }
        return false;
    }

    // the class support find exchange nodes in the fragment, and find child fragment by exchange node id
    private static class FragmentLineage {
        // exchange node as the source in this fragment
        private final FragmentIdMapping<Map<PlanNodeId, ExchangeNode>> parentFragmentToExchangeNode;
        // exchange node's relative datastreamsink in child fragment
        private final ListMultimap<PlanNodeId, PlanFragmentId> exchangeToChildFragment;

        public FragmentLineage(
                FragmentIdMapping<Map<PlanNodeId, ExchangeNode>> parentFragmentToExchangeNode,
                ListMultimap<PlanNodeId, PlanFragmentId> exchangeToChildFragment) {
            this.parentFragmentToExchangeNode = parentFragmentToExchangeNode;
            this.exchangeToChildFragment = exchangeToChildFragment;
        }
    }
}