UnassignedAllBEJob.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.dictionary.Dictionary;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
import org.apache.doris.planner.DictionarySink;
import org.apache.doris.planner.EmptySetNode;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/** UnassignedAllBEJob */
public class UnassignedAllBEJob extends AbstractUnassignedJob {
    private static final Logger LOG = LogManager.getLogger(UnassignedAllBEJob.class);

    public UnassignedAllBEJob(StatementContext statementContext, PlanFragment fragment,
            ListMultimap<ExchangeNode, UnassignedJob> exchangeToUpstreamJob) {
        super(statementContext, fragment, ImmutableList.of(), exchangeToUpstreamJob);
    }

    // ExchangeNode -> upstreamFragment -> AssignedJob(instances of upstreamFragment)
    @Override
    public List<AssignedJob> computeAssignedJobs(DistributeContext distributeContext,
            ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
        ConnectContext connectContext = statementContext.getConnectContext();
        DistributedPlanWorkerManager workerManager = distributeContext.workerManager;

        DictionarySink sink = (DictionarySink) fragment.getSink();
        // it may be ScanNode or optimized to EmptySetNode. use universay function to get the deepest source.
        PlanNode rootNode = fragment.getDeepestLinearSource();
        List<Backend> bes;
        if (sink.allowAdaptiveLoad() && rootNode instanceof OlapScanNode) {
            Dictionary dictionary = sink.getDictionary();
            long lastVersion = dictionary.getSrcVersion();
            long usingVersion = ((OlapScanNode) rootNode).getMaxVersion();
            if (usingVersion > lastVersion) {
                // load new data
                bes = computeFullLoad(workerManager, inputJobs);
            } else {
                // try to load only for the BEs which is outdated
                bes = computePartiallLoad(workerManager, inputJobs, dictionary, sink);
                statementContext.setPartialLoadDictionary(true);
            }
        } else {
            // we explicitly request all BEs to load data. or ExternalTable. (or EmptySetNode - should not happen)
            bes = computeFullLoad(workerManager, inputJobs);
        }

        List<AssignedJob> assignedJobs = Lists.newArrayList();
        for (int i = 0; i < bes.size(); ++i) {
            // every time one BE is selected
            DistributedPlanWorker worker = workerManager.getWorker(bes.get(i));
            if (worker != null) {
                assignedJobs.add(assignWorkerAndDataSources(i, connectContext.nextInstanceId(), worker,
                        new DefaultScanSource(ImmutableMap.of())));
            } else {
                throw new IllegalArgumentException(
                        "worker " + bes.get(i).getAddress() + " not found");
            }
        }

        if (rootNode instanceof OlapScanNode) {
            // set the version of source table we are going to load
            statementContext.setDictionaryUsedSrcVersion(((OlapScanNode) rootNode).getMaxVersion());
        } else if (rootNode instanceof EmptySetNode) {
            // this will make always reload. but we think this shouldn't happen now.
            LOG.warn("EmptySetNode should not be used in DictionarySink");
            statementContext.setDictionaryUsedSrcVersion(0);
        } else { // external table
            // we've checked before construct the load
            ExternalScanNode node = (ExternalScanNode) rootNode;
            MTMVRelatedTableIf table = (MTMVRelatedTableIf) node.getTableIf();
            try {
                MTMVSnapshotIf snapshot = table.getTableSnapshot(MvccUtil.getSnapshotFromContext(table));
                statementContext.setDictionaryUsedSrcVersion(snapshot.getSnapshotVersion());
            } catch (AnalysisException e) {
                throw new IllegalArgumentException("getTableSnapshot failed: " + e.getMessage());
            }
        }
        statementContext.setUsedBackendsDistributing(bes);
        return assignedJobs;
    }

    private List<Backend> computeFullLoad(DistributedPlanWorkerManager workerManager,
            ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
        // input jobs from upstream fragment - may have many instances.
        ExchangeNode exchange = inputJobs.keySet().iterator().next(); // random one - should be same for any exchange.
        int expectInstanceNum = exchange.getNumInstances();

        // for Coordinator to know the right parallelism of DictionarySink
        exchange.getFragment().setParallelExecNum(expectInstanceNum);

        List<Backend> bes = workerManager.getAllBackends(true);
        if (bes.size() != expectInstanceNum) {
            // BE number changed when planning
            throw new IllegalArgumentException("BE number should be " + expectInstanceNum + ", but is " + bes.size());
        }
        return bes;
    }

    private List<Backend> computePartiallLoad(DistributedPlanWorkerManager workerManager,
            ListMultimap<ExchangeNode, AssignedJob> inputJobs, Dictionary dictionary, DictionarySink sink) {
        // dictionary's src version(bundled with dictionary's version) is same with usingVersion(otherwise FullLoad)
        // so we can just use the src version to find the outdated backends
        List<Backend> outdateBEs = dictionary.filterOutdatedBEs(workerManager.getAllBackends(true));

        // reset all exchange node's instance number to the number of outdated backends
        PlanFragment fragment = inputJobs.keySet().iterator().next().getFragment(); // random one exchange
        for (ExchangeNode exchange : inputJobs.keySet()) {
            exchange.setNumInstances(outdateBEs.size());
        }
        // for Coordinator to know the right parallelism and BEs of DictionarySink
        sink.setPartialLoadBEs(outdateBEs);
        fragment.setParallelExecNum(outdateBEs.size());

        return outdateBEs;
    }
}