UnassignedShuffleJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.distribute.worker.job;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
/** UnassignedShuffleJob */
public class UnassignedShuffleJob extends AbstractUnassignedJob {
private boolean useSerialSource;
public UnassignedShuffleJob(
StatementContext statementContext, PlanFragment fragment,
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob);
}
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
useSerialSource = fragment.useSerialSource(
distributeContext.isLoadJob ? null : statementContext.getConnectContext());
int expectInstanceNum = degreeOfParallelism();
List<AssignedJob> biggestParallelChildFragment = getInstancesOfBiggestParallelChildFragment(inputJobs);
if (expectInstanceNum > 0 && expectInstanceNum < biggestParallelChildFragment.size()) {
// When group by cardinality is smaller than number of backend, only some backends always
// process while other has no data to process.
// So we shuffle instances to make different backends handle different queries.
List<DistributedPlanWorker> shuffleWorkersInBiggestParallelChildFragment
= distinctShuffleWorkers(biggestParallelChildFragment);
Function<Integer, DistributedPlanWorker> workerSelector = instanceIndex -> {
int selectIndex = instanceIndex % shuffleWorkersInBiggestParallelChildFragment.size();
return shuffleWorkersInBiggestParallelChildFragment.get(selectIndex);
};
return buildInstances(expectInstanceNum, workerSelector);
} else {
// keep same instance num like child fragment
Function<Integer, DistributedPlanWorker> workerSelector = instanceIndex -> {
int selectIndex = instanceIndex % biggestParallelChildFragment.size();
return biggestParallelChildFragment.get(selectIndex).getAssignedWorker();
};
return buildInstances(biggestParallelChildFragment.size(), workerSelector);
}
}
protected int degreeOfParallelism() {
// TODO: check we use nested loop join do right outer / semi / anti join,
// we should add an exchange node with gather distribute under the nested loop join
int expectInstanceNum = -1;
ConnectContext connectContext = statementContext.getConnectContext();
if (connectContext != null && connectContext.getSessionVariable() != null) {
expectInstanceNum = connectContext.getSessionVariable().getExchangeInstanceParallel();
}
return expectInstanceNum;
}
private List<AssignedJob> getInstancesOfBiggestParallelChildFragment(
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
int maxInstanceNum = -1;
List<AssignedJob> biggestParallelChildFragment = ImmutableList.of();
// skip broadcast exchange
for (Entry<ExchangeNode, Collection<AssignedJob>> exchangeToChildInstances : inputJobs.asMap().entrySet()) {
List<AssignedJob> instances = (List) exchangeToChildInstances.getValue();
if (instances.size() > maxInstanceNum) {
biggestParallelChildFragment = instances;
maxInstanceNum = instances.size();
}
}
return biggestParallelChildFragment;
}
private List<AssignedJob> buildInstances(
int instanceNum, Function<Integer, DistributedPlanWorker> workerSelector) {
ConnectContext connectContext = statementContext.getConnectContext();
if (useSerialSource) {
return buildInstancesWithLocalShuffle(instanceNum, workerSelector, connectContext);
} else {
return buildInstancesWithoutLocalShuffle(instanceNum, workerSelector, connectContext);
}
}
private List<AssignedJob> buildInstancesWithoutLocalShuffle(
int instanceNum, Function<Integer, DistributedPlanWorker> workerSelector, ConnectContext connectContext) {
ImmutableList.Builder<AssignedJob> instances = ImmutableList.builderWithExpectedSize(instanceNum);
for (int i = 0; i < instanceNum; i++) {
DistributedPlanWorker selectedWorker = workerSelector.apply(i);
AssignedJob assignedJob = assignWorkerAndDataSources(
i, connectContext.nextInstanceId(),
selectedWorker, new DefaultScanSource(ImmutableMap.of())
);
instances.add(assignedJob);
}
return instances.build();
}
private List<AssignedJob> buildInstancesWithLocalShuffle(
int instanceNum, Function<Integer, DistributedPlanWorker> workerSelector, ConnectContext connectContext) {
ImmutableList.Builder<AssignedJob> instances = ImmutableList.builderWithExpectedSize(instanceNum);
Multimap<DistributedPlanWorker, Integer> workerToInstanceIds = ArrayListMultimap.create();
for (int i = 0; i < instanceNum; i++) {
DistributedPlanWorker selectedWorker = workerSelector.apply(i);
workerToInstanceIds.put(selectedWorker, i);
}
int shareScanId = 0;
for (Entry<DistributedPlanWorker, Collection<Integer>> kv : workerToInstanceIds.asMap().entrySet()) {
DistributedPlanWorker worker = kv.getKey();
Collection<Integer> indexesInFragment = kv.getValue();
DefaultScanSource shareScanSource = new DefaultScanSource(ImmutableMap.of());
for (Integer indexInFragment : indexesInFragment) {
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
indexInFragment, shareScanId, connectContext.nextInstanceId(),
this, worker, shareScanSource
);
instances.add(instance);
}
shareScanId++;
}
return instances.build();
}
private List<DistributedPlanWorker> distinctShuffleWorkers(List<AssignedJob> instances) {
Set<DistributedPlanWorker> candidateWorkerSet = Sets.newLinkedHashSet();
for (AssignedJob instance : instances) {
candidateWorkerSet.add(instance.getAssignedWorker());
}
List<DistributedPlanWorker> candidateWorkers = Lists.newArrayList(candidateWorkerSet);
Collections.shuffle(candidateWorkers);
return candidateWorkers;
}
}