Coordinator.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.Config;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.DictionarySink;
import org.apache.doris.planner.ExceptNode;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.IntersectNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.MultiCastPlanFragment;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
import org.apache.doris.proto.Types;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFragmentInstanceReport;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TPlanFragmentDestination;
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TTopnFilterDesc;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.jetbrains.annotations.NotNull;
import org.joda.time.DateTime;
import java.security.SecureRandom;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class Coordinator implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);
public static final String localIP = FrontendOptions.getLocalHostAddress();
// Random is used to shuffle instances of partitioned
private static final Random instanceRandom = new SecureRandom();
public static ExecutorService backendRpcCallbackExecutor = ThreadPoolManager.newDaemonProfileThreadPool(
32, 100, "backend-rpc-callback", true);
// Overall status of the entire query; set to the first reported fragment error
// status or to CANCELLED, if Cancel() is called.
private Status queryStatus = new Status();
// save of related backends of this query
private Map<TNetworkAddress, Long> addressToBackendID = Maps.newHashMap();
protected ImmutableMap<Long, Backend> idToBackend = ImmutableMap.of();
private final TDescriptorTable descTable;
private FragmentIdMapping<DistributedPlan> distributedPlans;
// scan node id -> TFileScanRangeParams
private Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap = Maps.newHashMap();
// Why do we use query global?
// When `NOW()` function is in sql, we need only one now(),
// but, we execute `NOW()` distributed.
// So we make a query global value here to make one `now()` value in one query process.
private final TQueryGlobals queryGlobals = new TQueryGlobals();
private TQueryOptions queryOptions;
private TNetworkAddress coordAddress;
// fe audit log in connected FE,if a query is forward
// we should send the connected FE to be,
// then be report query statistics to the connected FE
private TNetworkAddress currentConnectFE;
// protects all fields below
private final Lock lock = new ReentrantLock();
// If true, the query is done returning all results. It is possible that the
// coordinator still needs to wait for cleanup on remote fragments (e.g. queries
// with limit)
// Once this is set to true, errors from remote fragments are ignored.
private boolean returnedAllResults = false;
// populated in computeFragmentExecParams()
private final Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = Maps.newHashMap();
private final List<PlanFragment> fragments;
private Map<Long, PipelineExecContexts> beToPipelineExecCtxs = Maps.newHashMap();
private final Map<Pair<Integer, Long>, PipelineExecContext> pipelineExecContexts = new HashMap<>();
private final List<PipelineExecContext> needCheckPipelineExecContexts = Lists.newArrayList();
private List<ResultReceiver> receivers = Lists.newArrayList();
private ResultReceiverConsumer receiverConsumer;
private final List<ScanNode> scanNodes;
private int scanRangeNum = 0;
// number of instances of this query, equals to
// number of backends executing plan fragments on behalf of this query;
// set in computeFragmentExecParams();
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();
private int numReceivedRows = 0;
private List<String> deltaUrls;
private Map<String, String> loadCounters;
private String trackingUrl;
// related txnId and label of group commit
private long txnId;
private String label;
// for export
private List<String> exportFiles;
private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
private final List<TErrorTabletInfo> errorTabletInfos = Lists.newArrayList();
// Input parameter
private long jobId = -1; // job which this task belongs to
private TUniqueId queryId;
// a timestamp represent the absolute timeout
// eg, System.currentTimeMillis() + executeTimeoutS * 1000
private long timeoutDeadline;
private boolean enableShareHashTableForBroadcastJoin = false;
private boolean useNereids = false;
private Backend groupCommitBackend;
// Runtime filter merge instance address and ID
private TNetworkAddress runtimeFilterMergeAddr;
private TUniqueId runtimeFilterMergeInstanceId;
// Runtime filter ID to the target instance address of the fragment,
// that is expected to use this runtime filter, the instance address is not repeated
private Map<RuntimeFilterId, List<FRuntimeFilterTargetParam>> ridToTargetParam = Maps.newHashMap();
// The runtime filter that expects the instance to be used
private List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
private List<TopnFilter> topnFilters = new ArrayList<>();
// Runtime filter ID to the builder instance number
private Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
private ConnectContext context;
private StatsErrorEstimator statsErrorEstimator;
// A countdown latch to mark the completion of each instance.
// use for old pipeline
// instance id -> dummy value
private MarkedCountDownLatch<TUniqueId, Long> instancesDoneLatch = null;
// A countdown latch to mark the completion of each fragment. use for pipelineX
// fragmentid -> backendid
private MarkedCountDownLatch<Integer, Long> fragmentsDoneLatch = null;
public void setGroupCommitBe(Backend backend) {
this.groupCommitBackend = backend;
}
public void setTWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}
public long getNumReceivedRows() {
return numReceivedRows;
}
public List<TPipelineWorkloadGroup> getTWorkloadGroups() {
return tWorkloadGroups;
}
private List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList();
private final ExecutionProfile executionProfile;
private volatile QueueToken queueToken = null;
private QueryQueue queryQueue = null;
public ExecutionProfile getExecutionProfile() {
return executionProfile;
}
// True if all scan node are ExternalScanNode.
private boolean isAllExternalScan = true;
// Used for query/insert
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner,
StatsErrorEstimator statsErrorEstimator) {
this(context, analyzer, planner);
this.statsErrorEstimator = statsErrorEstimator;
}
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.context = context;
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
this.descTable = planner.getDescTable().toThrift();
this.returnedAllResults = false;
this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin;
initQueryOptions(context);
useNereids = planner instanceof NereidsPlanner;
if (!useNereids) {
// Enable local shuffle on pipelineX engine only if Nereids planner is applied.
queryOptions.setEnableLocalShuffle(false);
} else {
distributedPlans = ((NereidsPlanner) planner).getDistributedPlans();
}
setFromUserProperty(context);
this.queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));
this.queryGlobals.setTimestampMs(System.currentTimeMillis());
this.queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
this.queryGlobals.setLoadZeroTolerance(false);
if (context.getSessionVariable().getTimeZone().equals("CST")) {
this.queryGlobals.setTimeZone(TimeUtils.DEFAULT_TIME_ZONE);
} else {
this.queryGlobals.setTimeZone(context.getSessionVariable().getTimeZone());
}
this.assignedRuntimeFilters = planner.getRuntimeFilters();
this.topnFilters = planner.getTopnFilters();
List<Integer> fragmentIds = new ArrayList<>();
for (PlanFragment fragment : fragments) {
fragmentIds.add(fragment.getFragmentId().asInt());
}
this.executionProfile = new ExecutionProfile(queryId, fragmentIds);
}
// Used for broker load task/export task/update coordinator
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) {
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
this.fragments = fragments;
this.scanNodes = scanNodes;
this.queryOptions = new TQueryOptions();
this.queryOptions.setEnableProfile(enableProfile);
this.queryOptions.setProfileLevel(2);
this.queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));
this.queryGlobals.setTimestampMs(System.currentTimeMillis());
this.queryGlobals.setTimeZone(timezone);
this.queryGlobals.setLoadZeroTolerance(loadZeroTolerance);
this.queryOptions.setBeExecVersion(Config.be_exec_version);
List<Integer> fragmentIds = new ArrayList<>();
for (PlanFragment fragment : fragments) {
fragmentIds.add(fragment.getFragmentId().asInt());
}
this.executionProfile = new ExecutionProfile(queryId, fragmentIds);
}
private void setFromUserProperty(ConnectContext connectContext) {
String qualifiedUser = connectContext.getQualifiedUser();
// set cpu resource limitch
int cpuLimit = Env.getCurrentEnv().getAuth().getCpuResourceLimit(qualifiedUser);
if (cpuLimit > 0) {
// overwrite the cpu resource limit from session variable;
TResourceLimit resourceLimit = new TResourceLimit();
resourceLimit.setCpuLimit(cpuLimit);
this.queryOptions.setResourceLimit(resourceLimit);
}
// set exec mem limit
long memLimit = connectContext.getMaxExecMemByte();
if (memLimit > 0) {
// overwrite the exec_mem_limit from session variable;
this.queryOptions.setMemLimit(memLimit);
}
}
private void initQueryOptions(ConnectContext context) {
this.queryOptions = context.getSessionVariable().toThrift();
this.queryOptions.setQueryTimeout(context.getExecTimeoutS());
this.queryOptions.setExecutionTimeout(context.getExecTimeoutS());
if (this.queryOptions.getExecutionTimeout() < 1) {
LOG.info("try set timeout less than 1", new RuntimeException(""));
}
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
this.queryOptions.setMysqlRowBinaryFormat(
context.getCommand() == MysqlCommand.COM_STMT_EXECUTE);
}
public ConnectContext getConnectContext() {
return context;
}
public long getJobId() {
return jobId;
}
public TUniqueId getQueryId() {
return queryId;
}
public int getScanRangeNum() {
return scanRangeNum;
}
public TQueryOptions getQueryOptions() {
return this.queryOptions;
}
public void setQueryId(TUniqueId queryId) {
this.queryId = queryId;
}
public void setQueryType(TQueryType type) {
this.queryOptions.setQueryType(type);
}
public Status getExecStatus() {
return queryStatus;
}
public List<String> getDeltaUrls() {
return deltaUrls;
}
public Map<String, String> getLoadCounters() {
return loadCounters;
}
public String getTrackingUrl() {
return trackingUrl;
}
public long getTxnId() {
return txnId;
}
public void setTxnId(long txnId) {
this.txnId = txnId;
}
public String getLabel() {
return label;
}
public void setExecMemoryLimit(long execMemoryLimit) {
this.queryOptions.setMemLimit(execMemoryLimit);
}
public void setLoadMemLimit(long loadMemLimit) {
this.queryOptions.setLoadMemLimit(loadMemLimit);
}
public void setTimeout(int timeout) {
this.queryOptions.setQueryTimeout(timeout);
this.queryOptions.setExecutionTimeout(timeout);
if (this.queryOptions.getExecutionTimeout() < 1) {
LOG.info("try set timeout less than 1", new RuntimeException(""));
}
}
public void setLoadZeroTolerance(boolean loadZeroTolerance) {
this.queryGlobals.setLoadZeroTolerance(loadZeroTolerance);
}
public void clearExportStatus() {
lock.lock();
try {
this.pipelineExecContexts.clear();
this.queryStatus.updateStatus(TStatusCode.OK, "");
if (this.exportFiles == null) {
this.exportFiles = Lists.newArrayList();
}
this.exportFiles.clear();
this.needCheckPipelineExecContexts.clear();
} finally {
lock.unlock();
}
}
public List<TTabletCommitInfo> getCommitInfos() {
return commitInfos;
}
public List<TErrorTabletInfo> getErrorTabletInfos() {
return errorTabletInfos;
}
public Map<String, Integer> getBeToInstancesNum() {
Map<String, Integer> result = Maps.newTreeMap();
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + ctxs.brpcAddr.port),
ctxs.getInstanceNumber());
}
return result;
}
// Initialize
protected void prepare() throws UserException {
for (PlanFragment fragment : fragments) {
fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment));
}
// set inputFragments
for (PlanFragment fragment : fragments) {
if (!(fragment.getSink() instanceof DataStreamSink)) {
continue;
}
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getDestFragment().getFragmentId());
params.inputFragments.add(fragment.getFragmentId());
}
coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
if (ConnectContext.get() != null && ConnectContext.get().isProxy() && !StringUtils.isEmpty(
ConnectContext.get().getCurrentConnectedFEIp())) {
currentConnectFE = new TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(),
Config.rpc_port);
} else {
currentConnectFE = coordAddress;
}
this.idToBackend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster();
if (LOG.isDebugEnabled()) {
int backendNum = idToBackend.size();
StringBuilder backendInfos = new StringBuilder("backends info:");
for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
Long backendID = entry.getKey();
Backend backend = entry.getValue();
backendInfos.append(' ').append(backendID).append("-")
.append(backend.getHost()).append("-")
.append(backend.getBePort()).append("-")
.append(backend.getProcessEpoch());
}
if (LOG.isDebugEnabled()) {
LOG.debug("query {}, backend size: {}, {}",
DebugUtil.printId(queryId), backendNum, backendInfos.toString());
}
}
}
private void lock() {
lock.lock();
}
private void unlock() {
lock.unlock();
}
private void traceInstance() {
if (LOG.isDebugEnabled()) {
// TODO(zc): add a switch to close this function
StringBuilder sb = new StringBuilder();
int idx = 0;
sb.append("query id=").append(DebugUtil.printId(queryId)).append(",");
sb.append("fragment=[");
for (Map.Entry<PlanFragmentId, FragmentExecParams> entry : fragmentExecParamsMap.entrySet()) {
if (idx++ != 0) {
sb.append(",");
}
sb.append(entry.getKey());
entry.getValue().appendTo(sb);
}
sb.append("]");
if (LOG.isDebugEnabled()) {
LOG.debug(sb.toString());
}
}
}
protected void processFragmentAssignmentAndParams() throws Exception {
// prepare information
prepare();
// compute Fragment Instance
computeScanRangeAssignment();
computeFragmentExecParams();
}
public TPipelineFragmentParams getStreamLoadPlan() throws Exception {
processFragmentAssignmentAndParams();
// This is a load process.
List<Long> relatedBackendIds = Lists.newArrayList(addressToBackendID.values());
Env.getCurrentEnv().getLoadManager().initJobProgress(jobId, queryId, instanceIds,
relatedBackendIds);
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum);
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet());
Map<TNetworkAddress, TPipelineFragmentParams> tExecPlanFragmentParams
= ((FragmentExecParams) this.fragmentExecParamsMap.values().toArray()[0]).toThrift(0);
TPipelineFragmentParams fragmentParams = tExecPlanFragmentParams.values().stream().findFirst().get();
return fragmentParams;
}
private boolean shouldQueue() {
boolean ret = Config.enable_query_queue && !context.getSessionVariable()
.getBypassWorkloadGroup() && !isQueryCancelled();
if (!ret) {
return false;
}
// a query with ScanNode need not queue only when all its scan node is SchemaScanNode
for (ScanNode scanNode : this.scanNodes) {
boolean isSchemaScanNode = scanNode instanceof SchemaScanNode;
if (!isSchemaScanNode) {
return true;
}
}
return false;
}
// Initiate asynchronous execution of query. Returns as soon as all plan fragments
// have started executing at their respective backends.
// 'Request' must contain at least a coordinator plan fragment (ie, can't
// be for a query like 'SELECT 1').
// A call to Exec() must precede all other member function calls.
@Override
public void exec() throws Exception {
// LoadTask does not have context, not controlled by queue now
if (context != null) {
if (Config.enable_workload_group) {
List<TPipelineWorkloadGroup> wgList = context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context);
this.setTWorkloadGroups(wgList);
boolean shouldQueue = this.shouldQueue();
if (shouldQueue) {
Set<Long> wgIdSet = Sets.newHashSet();
for (TPipelineWorkloadGroup twg : wgList) {
wgIdSet.add(twg.getId());
}
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(wgIdSet);
if (queryQueue == null) {
// This logic is actually useless, because when could not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken(context.getSessionVariable().wgQuerySlotCount);
queueToken.get(DebugUtil.printId(queryId),
this.queryOptions.getExecutionTimeout() * 1000);
}
} else {
context.setWorkloadGroupName("");
}
}
execInternal();
}
@Override
public void close() {
// NOTE: all close method should be no exception
if (queryQueue != null && queueToken != null) {
try {
queryQueue.releaseAndNotify(queueToken);
} catch (Throwable t) {
LOG.error("error happens when coordinator close ", t);
}
}
try {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
} catch (Throwable t) {
LOG.error("error happens when scannode stop ", t);
}
}
protected void execInternal() throws Exception {
if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) {
LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: {}",
DebugUtil.printId(queryId), scanNodes.get(0).treeToThrift());
}
if (LOG.isDebugEnabled() && !fragments.isEmpty()) {
LOG.debug("debug: in Coordinator::exec. query id: {}, fragment: {}",
DebugUtil.printId(queryId), fragments.get(0).toThrift());
}
processFragmentAssignmentAndParams();
traceInstance();
QeProcessorImpl.INSTANCE.registerInstances(queryId, instanceIds.size());
// create result receiver
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
DataSink topDataSink = topParams.fragment.getSink();
this.timeoutDeadline = System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L;
if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) {
Boolean enableParallelResultSink = false;
if (topDataSink instanceof ResultSink) {
enableParallelResultSink = queryOptions.isEnableParallelResultSink();
} else {
enableParallelResultSink = queryOptions.isEnableParallelOutfile();
}
Set<TNetworkAddress> addrs = new HashSet<>();
for (FInstanceExecParam param : topParams.instanceExecParams) {
if (addrs.contains(param.host)) {
continue;
}
addrs.add(param.host);
if (context.isReturnResultFromLocal()) {
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
} else {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
TUniqueId finstId;
if (enableParallelResultSink) {
finstId = queryId;
} else {
finstId = topParams.instanceExecParams.get(0).instanceId;
}
context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
}
}
receiverConsumer = new ResultReceiverConsumer(receivers, timeoutDeadline);
LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
if (topDataSink instanceof ResultFileSink
&& ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(),
topParams.instanceExecParams.get(0).host.getHostname());
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {
// This is a load process.
this.queryOptions.setIsReportSuccess(true);
deltaUrls = Lists.newArrayList();
loadCounters = Maps.newHashMap();
List<Long> relatedBackendIds = Lists.newArrayList(addressToBackendID.values());
Env.getCurrentEnv().getLoadManager().initJobProgress(jobId, queryId, instanceIds, relatedBackendIds);
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum);
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet());
}
updateProfileIfPresent(profile -> profile.setAssignFragmentTime());
sendPipelineCtx();
}
protected void sendPipelineCtx() throws TException, RpcException, UserException {
lock();
try {
Multiset<TNetworkAddress> hostCounter = HashMultiset.create();
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
for (FInstanceExecParam fi : params.instanceExecParams) {
hostCounter.add(fi.host);
}
}
int backendIdx = 0;
int profileFragmentId = 0;
beToPipelineExecCtxs.clear();
// fragment:backend
List<Pair<PlanFragmentId, Long>> backendFragments = Lists.newArrayList();
// If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
// we choose #fragments > 1 because in some cases
// we need ensure that A fragment is already prepared to receive data before B fragment sends data.
// For example: select * from numbers("number"="10") will generate ExchangeNode and
// TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not
// send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() > 1;
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
// 1. set up exec states
int instanceNum = params.instanceExecParams.size();
Preconditions.checkState(instanceNum > 0);
Map<TNetworkAddress, TPipelineFragmentParams> tParams = params.toThrift(backendIdx);
boolean needCheckBackendState = false;
if (queryOptions.getQueryType() == TQueryType.LOAD && profileFragmentId == 0) {
// this is a load process, and it is the first fragment.
// we should add all BackendExecState of this fragment to needCheckBackendExecStates,
// so that we can check these backends' state when joining this Coordinator
needCheckBackendState = true;
}
int numBackendsWithSink = 0;
// 3. group PipelineExecContext by BE.
// So that we can use one RPC to send all fragment instances of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
Long backendId = this.addressToBackendID.get(entry.getKey());
backendFragments.add(Pair.of(fragment.getFragmentId(), backendId));
PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(),
entry.getValue(), idToBackend.get(backendId), executionProfile, jobId);
// Each tParam will set the total number of Fragments that need to be executed on the same BE,
// and the BE will determine whether all Fragments have been executed based on this information.
// Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons.
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), pipelineExecContext);
if (needCheckBackendState) {
needCheckPipelineExecContexts.add(pipelineExecContext);
if (LOG.isDebugEnabled()) {
LOG.debug("add need check backend {} for fragment, {} job: {}",
pipelineExecContext.backend.getId(), fragment.getFragmentId().asInt(), jobId);
}
}
PipelineExecContexts ctxs = beToPipelineExecCtxs.get(pipelineExecContext.backend.getId());
if (ctxs == null) {
ctxs = new PipelineExecContexts(queryId, pipelineExecContext.backend,
pipelineExecContext.brpcAddress, twoPhaseExecution,
entry.getValue().getFragmentNumOnHost());
beToPipelineExecCtxs.putIfAbsent(pipelineExecContext.backend.getId(), ctxs);
}
ctxs.addContext(pipelineExecContext);
if (entry.getValue().getFragment().getOutputSink() != null
&& entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
numBackendsWithSink++;
}
++backendIdx;
}
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
if (entry.getValue().getFragment().getOutputSink() != null
&& entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
int loadStreamPerNode = 1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
loadStreamPerNode = ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
}
entry.getValue().setLoadStreamPerNode(loadStreamPerNode);
entry.getValue().setTotalLoadStreams(numBackendsWithSink * loadStreamPerNode);
entry.getValue().setNumLocalSink(entry.getValue().getLocalParams().size());
LOG.info("num local sink for backend {} is {}", entry.getValue().getBackendId(),
entry.getValue().getNumLocalSink());
}
}
profileFragmentId += 1;
} // end for fragments
// Init the mark done in order to track the finished state of the query
fragmentsDoneLatch = new MarkedCountDownLatch<>(backendFragments.size());
for (Pair<PlanFragmentId, Long> pair : backendFragments) {
fragmentsDoneLatch.addMark(pair.first.asInt(), pair.second);
}
// 4. send and wait fragments rpc
// 4.1 serialize fragment
// unsetFields() must be called serially.
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
ctxs.unsetFields();
}
// serializeFragments() can be called in parallel.
final AtomicLong compressedSize = new AtomicLong(0);
beToPipelineExecCtxs.values().parallelStream().forEach(ctxs -> {
try {
compressedSize.addAndGet(ctxs.serializeFragments());
} catch (TException e) {
throw new RuntimeException(e);
}
});
updateProfileIfPresent(profile -> profile.updateFragmentCompressedSize(compressedSize.get()));
updateProfileIfPresent(profile -> profile.setFragmentSerializeTime());
// 4.2 send fragments rpc
List<Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>>>> futures = Lists.newArrayList();
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug(ctxs.debugInfo());
}
futures.add(Pair.of(DateTime.now().getMillis(),
ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy))));
}
Map<TNetworkAddress, List<Long>> rpcPhase1Latency =
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
updateProfileIfPresent(profile -> profile.setFragmentSendPhase1Time());
updateProfileIfPresent(profile -> profile.setRpcPhase1Latency(rpcPhase1Latency));
if (twoPhaseExecution) {
// 5. send and wait execution start rpc
futures.clear();
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
futures.add(Pair.of(DateTime.now().getMillis(),
ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy))));
}
Map<TNetworkAddress, List<Long>> rpcPhase2Latency =
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(),
"send execution start");
updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
updateProfileIfPresent(profile -> profile.setFragmentSendPhase2Time());
updateProfileIfPresent(profile -> profile.setRpcPhase2Latency(rpcPhase2Latency));
}
} finally {
unlock();
}
}
protected Map<TNetworkAddress, List<Long>> waitPipelineRpc(List<Pair<Long, Triple<PipelineExecContexts,
BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>> futures, long leftTimeMs,
String operation) throws RpcException, UserException {
if (leftTimeMs <= 0) {
long currentTimeMillis = System.currentTimeMillis();
long elapsed = (currentTimeMillis - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout();
String msg = String.format(
"timeout before waiting %s rpc, query timeout:%d sec, already elapsed:%d sec, left for this:%d ms",
operation, queryOptions.getExecutionTimeout(), elapsed, leftTimeMs);
LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
if (!queryOptions.isSetExecutionTimeout() || !queryOptions.isSetQueryTimeout()) {
LOG.warn("Query {} does not set timeout info, execution timeout: is_set:{}, value:{}"
+ ", query timeout: is_set:{}, value: {}, "
+ "coordinator timeout deadline {}, cur time millis: {}",
DebugUtil.printId(queryId),
queryOptions.isSetExecutionTimeout(), queryOptions.getExecutionTimeout(),
queryOptions.isSetQueryTimeout(), queryOptions.getQueryTimeout(),
timeoutDeadline, currentTimeMillis);
}
throw new UserException(msg);
}
// BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
// to FE)
Map<TNetworkAddress, List<Long>> beToPrepareLatency = new HashMap<>();
long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms);
for (Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>>> pair : futures) {
Triple<PipelineExecContexts, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>> triple = pair.second;
TStatusCode code;
String errMsg = null;
Exception exception = null;
try {
PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
long rpcDone = DateTime.now().getMillis();
beToPrepareLatency.put(triple.getLeft().brpcAddr,
Lists.newArrayList(result.getReceivedTime() - pair.first,
result.getExecutionTime() - result.getReceivedTime(),
result.getExecutionDoneTime() - result.getExecutionTime(),
rpcDone - result.getExecutionDoneTime()));
code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code == null) {
code = TStatusCode.INTERNAL_ERROR;
}
if (code != TStatusCode.OK) {
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgsList().get(0);
} else {
errMsg = operation + " failed. backend id: " + triple.getLeft().beId;
}
}
} catch (ExecutionException e) {
exception = e;
code = TStatusCode.THRIFT_RPC_ERROR;
triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
} catch (InterruptedException e) {
exception = e;
code = TStatusCode.INTERNAL_ERROR;
triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
} catch (TimeoutException e) {
exception = e;
errMsg = String.format(
"timeout when waiting for %s rpc, query timeout:%d sec, timeout for this operation:%d sec",
operation, queryOptions.getExecutionTimeout(), timeoutMs / 1000);
LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg);
code = TStatusCode.TIMEOUT;
triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
}
if (code != TStatusCode.OK) {
if (exception != null && errMsg == null) {
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
cancelInternal(queryStatus);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
.increase(1L);
throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
case THRIFT_RPC_ERROR:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
.increase(1L);
SimpleScheduler.addToBlacklist(triple.getLeft().beId, errMsg);
throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
default:
throw new UserException(errMsg, exception);
}
}
}
return beToPrepareLatency;
}
public List<String> getExportFiles() {
return exportFiles;
}
void updateExportFiles(List<String> files) {
lock.lock();
try {
if (exportFiles == null) {
exportFiles = Lists.newArrayList();
}
exportFiles.addAll(files);
} finally {
lock.unlock();
}
}
void updateDeltas(List<String> urls) {
lock.lock();
try {
deltaUrls.addAll(urls);
} finally {
lock.unlock();
}
}
private void updateLoadCounters(Map<String, String> newLoadCounters) {
lock.lock();
try {
long numRowsNormal = 0L;
String value = this.loadCounters.get(LoadEtlTask.DPP_NORMAL_ALL);
if (value != null) {
numRowsNormal = Long.parseLong(value);
}
long numRowsAbnormal = 0L;
value = this.loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
if (value != null) {
numRowsAbnormal = Long.parseLong(value);
}
long numRowsUnselected = 0L;
value = this.loadCounters.get(LoadJob.UNSELECTED_ROWS);
if (value != null) {
numRowsUnselected = Long.parseLong(value);
}
// new load counters
value = newLoadCounters.get(LoadEtlTask.DPP_NORMAL_ALL);
if (value != null) {
numRowsNormal += Long.parseLong(value);
}
value = newLoadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
if (value != null) {
numRowsAbnormal += Long.parseLong(value);
}
value = newLoadCounters.get(LoadJob.UNSELECTED_ROWS);
if (value != null) {
numRowsUnselected += Long.parseLong(value);
}
this.loadCounters.put(LoadEtlTask.DPP_NORMAL_ALL, "" + numRowsNormal);
this.loadCounters.put(LoadEtlTask.DPP_ABNORMAL_ALL, "" + numRowsAbnormal);
this.loadCounters.put(LoadJob.UNSELECTED_ROWS, "" + numRowsUnselected);
} finally {
lock.unlock();
}
}
private void updateCommitInfos(List<TTabletCommitInfo> commitInfos) {
lock.lock();
try {
// in pipelinex, the commit info may be duplicate, so we remove the duplicate ones
Map<Pair<Long, Long>, TTabletCommitInfo> commitInfoMap = Maps.newHashMap();
commitInfos.forEach(info -> commitInfoMap.put(Pair.of(info.getBackendId(), info.getTabletId()), info));
this.commitInfos.addAll(commitInfoMap.values());
} finally {
lock.unlock();
}
}
private void updateErrorTabletInfos(List<TErrorTabletInfo> errorTabletInfos) {
lock.lock();
try {
if (this.errorTabletInfos.size() <= Config.max_error_tablet_of_broker_load) {
this.errorTabletInfos.addAll(errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load
- this.errorTabletInfos.size()).collect(Collectors.toList()));
}
} finally {
lock.unlock();
}
}
private void updateStatus(Status status) {
lock.lock();
try {
// The query is done and we are just waiting for remote fragments to clean up.
// Ignore their cancelled updates.
if (returnedAllResults && status.isCancelled()) {
return;
}
// nothing to update
if (status.ok()) {
return;
}
// don't override an error status; also, cancellation has already started
if (!queryStatus.ok()) {
return;
}
queryStatus.updateStatus(status.getErrorCode(), status.getErrorMsg());
cancelInternal(queryStatus);
} finally {
lock.unlock();
}
}
@Override
public RowBatch getNext() throws Exception {
if (receivers.isEmpty()) {
throw new UserException("There is no receiver.");
}
Status status = new Status();
RowBatch resultBatch = receiverConsumer.getNext(status);
if (!status.ok()) {
LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
DebugUtil.printId(queryId), status.getErrorMsg());
}
updateStatus(status);
Status copyStatus = null;
lock();
try {
copyStatus = new Status(queryStatus);
} finally {
unlock();
}
if (!copyStatus.ok()) {
if (Strings.isNullOrEmpty(copyStatus.getErrorMsg())) {
copyStatus.rewriteErrorMsg();
}
if (copyStatus.isRpcError()) {
throw new RpcException(null, copyStatus.getErrorMsg());
} else {
String errMsg = copyStatus.getErrorMsg();
LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), errMsg);
throw new UserException(errMsg);
}
}
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
if (resultBatch.isEos()) {
numReceivedRows += resultBatch.getQueryStatistics().getReturnedRows();
}
} else if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}
// if reached limit rows, cancel this query immediately
// to avoid BE from reading more data.
// ATTN: if change here, also need to change the same logic in QueryProcessor.getNext();
Long limitRows = fragments.get(0).getPlanRoot().getLimit();
boolean reachedLimit = LimitUtils.cancelIfReachLimit(
resultBatch, limitRows, numReceivedRows, this::cancelInternal);
if (reachedLimit) {
resultBatch.setEos(true);
}
return resultBatch;
}
// We use a very conservative cancel strategy.
// 0. If backends has zero process epoch, do not cancel. Zero process epoch usually arises in cluster upgrading.
// 1. If process epoch is same, do not cancel. Means backends does not restart or die.
public Status shouldCancel(List<Backend> currentBackends) {
Map<Long, Backend> curBeMap = Maps.newHashMap();
for (Backend be : currentBackends) {
curBeMap.put(be.getId(), be);
}
try {
lock();
for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) {
Backend be = curBeMap.get(pipelineExecContext.backend.getId());
if (be == null || !be.isAlive()) {
Status errorStatus = new Status(TStatusCode.CANCELLED,
"Backend {} not exists or dead, query {} should be cancelled",
pipelineExecContext.backend.toString(), DebugUtil.printId(queryId));
LOG.warn(errorStatus.getErrorMsg());
return errorStatus;
}
// Backend process epoch changed, indicates that this be restarts, query should be cancelled.
// Check zero since during upgrading, older version oplog will not persistent be start time
// so newer version follower will get zero epoch when replaying oplog or snapshot
if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
Status errorStatus = new Status(TStatusCode.CANCELLED,
"Backend process epoch changed, previous {} now {}, "
+ "means this be has already restarted, should cancel this coordinator,"
+ "query id {}", pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
DebugUtil.printId(queryId));
LOG.warn(errorStatus.getErrorMsg());
return errorStatus;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?",
be.toString());
}
}
return Status.OK;
} finally {
unlock();
}
}
@Override
public void cancel(Status cancelReason) {
if (queueToken != null) {
queueToken.cancel();
}
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
if (cancelReason.ok()) {
throw new RuntimeException("Should use correct cancel reason, but it is "
+ cancelReason.toString());
}
lock();
try {
if (!queryStatus.ok()) {
if (LOG.isDebugEnabled()) {
// Print an error stack here to know why send cancel again.
LOG.debug("Query {} already in abnormal status {}, but received cancel again,"
+ "so that send cancel to BE again",
DebugUtil.printId(queryId), queryStatus.toString(),
new Exception("cancel failed"));
}
} else {
queryStatus.updateStatus(cancelReason.getErrorCode(), cancelReason.getErrorMsg());
LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}",
DebugUtil.printId(queryId), cancelReason.toString());
}
cancelInternal(cancelReason);
} finally {
unlock();
}
}
public boolean isQueryCancelled() {
lock();
try {
return queryStatus.isCancelled();
} finally {
unlock();
}
}
private void cancelLatch() {
if (instancesDoneLatch != null) {
instancesDoneLatch.countDownToZero(new Status());
}
if (fragmentsDoneLatch != null) {
fragmentsDoneLatch.countDownToZero(new Status());
}
}
protected void cancelInternal(Status cancelReason) {
for (ResultReceiver receiver : receivers) {
receiver.cancel(cancelReason);
}
cancelRemoteFragmentsAsync(cancelReason);
cancelLatch();
}
private void cancelRemoteFragmentsAsync(Status cancelReason) {
for (PipelineExecContexts ctx : beToPipelineExecCtxs.values()) {
LOG.debug("Cancel query {} on BE {}. Reason: {}", DebugUtil.printId(queryId), ctx.brpcAddr,
cancelReason.toString());
ctx.cancelQuery(cancelReason);
}
}
protected void computeFragmentExecParams() throws Exception {
// fill hosts field in fragmentExecParams
computeFragmentHosts();
// assign instance ids
instanceIds.clear();
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} fragment {} has {} instances.",
DebugUtil.printId(queryId), params.fragment.getFragmentId(),
params.instanceExecParams.size());
}
for (int j = 0; j < params.instanceExecParams.size(); ++j) {
// we add instance_num to query_id.lo to create a
// globally-unique instance id
FInstanceExecParam instanceExecParam = params.instanceExecParams.get(j);
// already set by nereids coordinator?
if (instanceExecParam.instanceId == null) {
TUniqueId instanceId = new TUniqueId();
instanceId.setHi(queryId.hi);
instanceId.setLo(queryId.lo + instanceIds.size() + 1);
instanceExecParam.instanceId = instanceId;
}
instanceIds.add(instanceExecParam.instanceId);
}
}
// Init instancesDoneLatch, it will be used to track if the instances has finished for insert stmt
instancesDoneLatch = new MarkedCountDownLatch<>(instanceIds.size());
for (TUniqueId instanceId : instanceIds) {
instancesDoneLatch.addMark(instanceId, -1L /* value is meaningless */);
}
// compute multi cast fragment params
computeMultiCastFragmentParams();
// assign runtime filter merge addr and target addr
assignRuntimeFilterAddr();
// compute destinations and # senders per exchange node
// (the root fragment doesn't have a destination)
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (params.fragment instanceof MultiCastPlanFragment) {
continue;
}
PlanFragment destFragment = params.fragment.getDestFragment();
if (destFragment == null) {
// root plan fragment
continue;
}
FragmentExecParams destParams = fragmentExecParamsMap.get(destFragment.getFragmentId());
// set # of senders
DataSink sink = params.fragment.getSink();
// we can only handle unpartitioned (= broadcast) and
// hash-partitioned
// output at the moment
PlanNodeId exchId = sink.getExchNodeId();
PlanNode exchNode = PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId);
Preconditions.checkState(exchNode != null, "exchNode is null");
Preconditions.checkState(exchNode instanceof ExchangeNode,
"exchNode is not ExchangeNode" + exchNode.getId().toString());
// we might have multiple fragments sending to this exchange node
// (distributed MERGE), which is why we need to add up the #senders
if (destParams.perExchNumSenders.get(exchId.asInt()) == null) {
destParams.perExchNumSenders.put(exchId.asInt(), params.instanceExecParams.size());
} else {
destParams.perExchNumSenders.put(exchId.asInt(),
params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
}
if (sink.getOutputPartition() != null
&& sink.getOutputPartition().isBucketShuffleHashPartition()) {
// the destFragment must be bucket shuffle
Preconditions.checkState(bucketShuffleJoinController
.isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is"
+ "Bucket Shuffle Partition, The destFragment must have bucket shuffle join node ");
int bucketSeq = 0;
int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
// when left table is empty, it's bucketset is empty.
// set right table destination address to the address of left table
if (destParams.instanceExecParams.size() == 1 && (bucketNum == 0
|| destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty())) {
bucketNum = 1;
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
// process bucket shuffle join on fragment without scan node
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(), bucketSeq);
bucketSeq++;
params.destinations.add(dest);
}
} else {
if (enableShareHashTableForBroadcastJoin
&& ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) {
// here choose the first instance to build hash table.
Map<TNetworkAddress, FInstanceExecParam> destHosts = new HashMap<>();
destParams.instanceExecParams.forEach(param -> {
if (destHosts.containsKey(param.host)) {
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
} else {
destHosts.put(param.host, param);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
try {
dest.server = toRpcHost(param.host);
dest.setBrpcServer(toBrpcHost(param.host));
} catch (Exception e) {
throw new RuntimeException(e);
}
params.destinations.add(dest);
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
// add destination host to this fragment's destination
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
destParams.instanceExecParams.get(j).recvrId = params.destinations.size();
params.destinations.add(dest);
}
}
}
}
}
private TPlanFragmentDestination setDestination(FragmentExecParams destParams, int recvrId, int bucketSeq)
throws Exception {
TPlanFragmentDestination dest = new TPlanFragmentDestination();
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
dest.fragment_instance_id = new TUniqueId(-1, -1);
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);
if (destParams.ignoreDataDistribution) {
Map<TNetworkAddress, Pair<TUniqueId, Set<Integer>>> hostToInstanceIdAndBucketSeq
= new HashMap<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
hostToInstanceIdAndBucketSeq.putIfAbsent(instanceExecParams.host,
Pair.of(instanceExecParams.instanceId, new HashSet<>()));
hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.addAll(
instanceExecParams.bucketSeqSet);
}
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.contains(bucketSeq)) {
dest.fragment_instance_id = hostToInstanceIdAndBucketSeq.get(instanceExecParams.host)
.first;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = recvrId;
break;
}
}
} else {
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = recvrId;
break;
}
}
}
return dest;
}
private void computeMultiCastFragmentParams() throws Exception {
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (!(params.fragment instanceof MultiCastPlanFragment)) {
continue;
}
MultiCastPlanFragment multi = (MultiCastPlanFragment) params.fragment;
Preconditions.checkState(multi.getSink() instanceof MultiCastDataSink);
MultiCastDataSink multiSink = (MultiCastDataSink) multi.getSink();
for (int i = 0; i < multi.getDestFragmentList().size(); i++) {
PlanFragment destFragment = multi.getDestFragmentList().get(i);
DataStreamSink sink = multiSink.getDataStreamSinks().get(i);
if (destFragment == null) {
continue;
}
FragmentExecParams destParams = fragmentExecParamsMap.get(destFragment.getFragmentId());
multi.getDestFragmentList().get(i).setOutputPartition(params.fragment.getOutputPartition());
PlanNodeId exchId = sink.getExchNodeId();
PlanNode exchNode = PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId);
Preconditions.checkState(!destParams.perExchNumSenders.containsKey(exchId.asInt()));
Preconditions.checkState(exchNode != null, "exchNode is null");
Preconditions.checkState(exchNode instanceof ExchangeNode,
"exchNode is not ExchangeNode" + exchNode.getId().toString());
if (destParams.perExchNumSenders.get(exchId.asInt()) == null) {
destParams.perExchNumSenders.put(exchId.asInt(), params.instanceExecParams.size());
} else {
destParams.perExchNumSenders.put(exchId.asInt(),
params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
}
List<TPlanFragmentDestination> destinations = multiSink.getDestinations().get(i);
if (sink.getOutputPartition() != null
&& sink.getOutputPartition().isBucketShuffleHashPartition()) {
// the destFragment must be bucket shuffle
Preconditions.checkState(bucketShuffleJoinController
.isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is"
+ "Bucket Shuffle Partition, The destFragment must have bucket shuffle join node ");
int bucketSeq = 0;
int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
// when left table is empty, it's bucketset is empty.
// set right table destination address to the address of left table
if (destParams.instanceExecParams.size() == 1 && (bucketNum == 0
|| destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty())) {
bucketNum = 1;
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
// process bucket shuffle join on fragment without scan node
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(),
bucketSeq);
bucketSeq++;
destinations.add(dest);
}
} else if (enableShareHashTableForBroadcastJoin
&& ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) {
// here choose the first instance to build hash table.
Map<TNetworkAddress, FInstanceExecParam> destHosts = new HashMap<>();
destParams.instanceExecParams.forEach(param -> {
if (destHosts.containsKey(param.host)) {
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
} else {
destHosts.put(param.host, param);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
try {
dest.server = toRpcHost(param.host);
dest.setBrpcServer(toBrpcHost(param.host));
} catch (Exception e) {
throw new RuntimeException(e);
}
destinations.add(dest);
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host);
destParams.instanceExecParams.get(j).recvrId = params.destinations.size();
destinations.add(dest);
}
}
}
}
}
private TNetworkAddress toRpcHost(TNetworkAddress host) throws Exception {
Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(
host.getHostname(), host.getPort());
if (backend == null) {
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
TNetworkAddress dest = new TNetworkAddress(backend.getHost(), backend.getBeRpcPort());
return dest;
}
private TNetworkAddress toBrpcHost(TNetworkAddress host) throws Exception {
Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(
host.getHostname(), host.getPort());
if (backend == null) {
throw new UserException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
if (backend.getBrpcPort() < 0) {
return null;
}
return new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
}
private TNetworkAddress toArrowFlightHost(TNetworkAddress host) throws Exception {
Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(
host.getHostname(), host.getPort());
if (backend == null) {
throw new UserException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
if (backend.getArrowFlightSqlPort() < 0) {
throw new UserException("be arrow_flight_sql_port cannot be empty.");
}
return backend.getArrowFlightAddress();
}
// estimate if this fragment contains UnionNode
private boolean containsUnionNode(PlanNode node) {
if (node instanceof UnionNode) {
return true;
}
for (PlanNode child : node.getChildren()) {
if (child instanceof ExchangeNode) {
// Ignore other fragment's node
continue;
} else if (child instanceof UnionNode) {
return true;
} else {
return containsUnionNode(child);
}
}
return false;
}
// estimate if this fragment contains IntersectNode
private boolean containsIntersectNode(PlanNode node) {
if (node instanceof IntersectNode) {
return true;
}
for (PlanNode child : node.getChildren()) {
if (child instanceof ExchangeNode) {
// Ignore other fragment's node
continue;
} else if (child instanceof IntersectNode) {
return true;
} else {
return containsIntersectNode(child);
}
}
return false;
}
// estimate if this fragment contains ExceptNode
private boolean containsExceptNode(PlanNode node) {
if (node instanceof ExceptNode) {
return true;
}
for (PlanNode child : node.getChildren()) {
if (child instanceof ExchangeNode) {
// Ignore other fragment's node
continue;
} else if (child instanceof ExceptNode) {
return true;
} else {
return containsExceptNode(child);
}
}
return false;
}
// estimate if this fragment contains SetOperationNode
private boolean containsSetOperationNode(PlanNode node) {
if (node instanceof SetOperationNode) {
return true;
}
for (PlanNode child : node.getChildren()) {
if (child instanceof ExchangeNode) {
// Ignore other fragment's node
continue;
} else if (child instanceof SetOperationNode) {
return true;
} else {
return containsSetOperationNode(child);
}
}
return false;
}
// For each fragment in fragments, computes hosts on which to run the instances
// and stores result in fragmentExecParams.hosts.
protected void computeFragmentHosts() throws Exception {
// compute hosts of producer fragment before those of consumer fragment(s),
// the latter might inherit the set of hosts from the former
// compute hosts *bottom up*.
for (int i = fragments.size() - 1; i >= 0; --i) {
PlanFragment fragment = fragments.get(i);
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
// if need, we can abstract it to property function `toAllBackends()` or something.
if (fragment.getSink() instanceof DictionarySink) {
// set when assign all BE job
int expectedInstanceNum = fragment.getParallelExecNum();
int count = 0;
DictionarySink sink = (DictionarySink) fragment.getSink();
List<Backend> aliveBackends = sink.getPartialLoadBEs() == null
// Coordinator only support this cluster backends. we need all cluster backends in dict loading.
? Env.getCurrentSystemInfo().getAllClusterBackends(true)
// only load part of BEs
: sink.getPartialLoadBEs();
for (Backend backend : aliveBackends) {
TNetworkAddress execHostport = new TNetworkAddress(backend.getHost(), backend.getBePort());
Reference<Long> backendIdRef = new Reference<Long>(backend.getId());
this.addressToBackendID.put(execHostport, backendIdRef.getRef());
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, params);
params.instanceExecParams.add(instanceParam);
count++;
}
if (count != expectedInstanceNum) {
throw new UserException("Expected " + expectedInstanceNum + " backends, but got " + count
+ ". partial load: " + context.getStatementContext().isPartialLoadDictionary());
}
// TODO: rethink the whole function logic. could All BE sink naturally merged into other judgements?
return;
}
if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostport;
if (groupCommitBackend != null) {
execHostport = getGroupCommitBackend(addressToBackendID);
} else if (((ConnectContext.get() != null && ConnectContext.get().isSetComputeGroup()) || (
isAllExternalScan
&& Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) {
// 2 cases:
// case 1: user set resource tag, we need to use the BE with the specified resource tags.
// case 2: All scan nodes are external scan node,
// and prefer_compute_node_for_external_table is true, we should only select BE which scan
// nodes are used.
// Otherwise, except for the scan node, the rest of the execution nodes of the query
// can be executed on any BE. addressToBackendID can be empty when this is a constant
// select stmt like:
// SELECT @@session.auto_increment_increment AS auto_increment_increment;
execHostport = SimpleScheduler.getHostByCurrentBackend(addressToBackendID);
} else {
execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef);
}
if (execHostport == null) {
LOG.warn("DataPartition UNPARTITIONED, no scanNode Backend available");
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
if (backendIdRef.getRef() != null) {
// backendIdRef can be null is we call getHostByCurrentBackend() before
this.addressToBackendID.put(execHostport, backendIdRef.getRef());
}
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, params);
params.instanceExecParams.add(instanceParam);
// Using serial source means a serial source operator will be used in this fragment (e.g. data will be
// shuffled to only 1 exchange operator) and then split by followed local exchanger
int expectedInstanceNum = fragment.getParallelExecNum();
boolean useSerialSource = fragment.useSerialSource(context);
if (useSerialSource) {
for (int j = 1; j < expectedInstanceNum; j++) {
params.instanceExecParams.add(new FInstanceExecParam(
null, execHostport, params));
}
params.ignoreDataDistribution = true;
params.parallelTasksNum = 1;
}
continue;
}
Pair<PlanNode, PlanNode> pairNodes = findLeftmostNode(fragment.getPlanRoot());
PlanNode leftMostNode = pairNodes.second;
/*
* Case A:
* if the left most is ScanNode, which means there is no child fragment,
* we should assign fragment instances on every scan node hosts.
* Case B:
* if not, there should be exchange nodes to collect all data from child fragments(input fragments),
* so we should assign fragment instances corresponding to the child fragments' host
*/
if (!(leftMostNode instanceof ScanNode)) {
// (Case B)
// there is no leftmost scan; we assign the same hosts as those of our
// input fragment which has a higher instance_number
int maxParallelFragmentIndex = findMaxParallelFragmentIndex(fragment);
PlanFragmentId inputFragmentId = fragment.getChild(maxParallelFragmentIndex).getFragmentId();
// AddAll() soft copy()
int exchangeInstances = -1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
exchangeInstances = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
}
// when we use nested loop join do right outer / semi / anti join, the instance must be 1.
boolean isNereids = context != null && context.getState().isNereids();
if (!isNereids && leftMostNode.getNumInstances() == 1) {
exchangeInstances = 1;
}
// Using serial source means a serial source operator will be used in this fragment (e.g. data will be
// shuffled to only 1 exchange operator) and then splitted by followed local exchanger
boolean useSerialSource = fragment.useSerialSource(context);
if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentId)
.instanceExecParams.size() > exchangeInstances) {
// random select some instance
// get distinct host, when parallel_fragment_exec_instance_num > 1,
// single host may execute several instances
Set<TNetworkAddress> hostSet = Sets.newHashSet();
for (FInstanceExecParam execParams :
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
hostSet.add(execParams.host);
}
List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
Collections.shuffle(hosts, instanceRandom);
for (int index = 0; index < exchangeInstances; index++) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null,
hosts.get(index % hosts.size()), params);
params.instanceExecParams.add(instanceParam);
}
params.ignoreDataDistribution = useSerialSource;
params.parallelTasksNum = useSerialSource ? 1 : params.instanceExecParams.size();
} else {
for (FInstanceExecParam execParams
: fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, params);
params.instanceExecParams.add(instanceParam);
}
params.ignoreDataDistribution = useSerialSource;
params.parallelTasksNum = useSerialSource ? 1 : params.instanceExecParams.size();
}
// When group by cardinality is smaller than number of backend, only some backends always
// process while other has no data to process.
// So we shuffle instances to make different backends handle different queries.
Collections.shuffle(params.instanceExecParams, instanceRandom);
// TODO: switch to unpartitioned/coord execution if our input fragment
// is executed that way (could have been downgraded from distributed)
continue;
}
int parallelExecInstanceNum = fragment.getParallelExecNum();
//for ColocateJoin fragment
if ((isColocateFragment(fragment, fragment.getPlanRoot())
&& fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
&& fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) {
computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params,
fragment.hasNullAwareLeftAntiJoin());
} else if (bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt())) {
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params, fragment.hasNullAwareLeftAntiJoin());
} else {
// case A
for (Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
fragment.getFragmentId()).scanRangeAssignment.entrySet()) {
TNetworkAddress key = entry.getKey();
Map<Integer, List<TScanRangeParams>> value = entry.getValue();
for (Integer planNodeId : value.keySet()) {
List<TScanRangeParams> perNodeScanRanges = value.get(planNodeId);
List<List<TScanRangeParams>> perInstanceScanRanges = Lists.newArrayList();
Optional<ScanNode> node = scanNodes.stream().filter(scanNode -> {
return scanNode.getId().asInt() == planNodeId;
}).findFirst();
boolean sharedScan = true;
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean ignoreStorageDataDistribution = fragment.useSerialSource(context);
if (ignoreStorageDataDistribution) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and no conjuncts, only need 1 instance to save cpu and
// mem resource
if (node.get().shouldUseOneInstance(context)) {
expectedInstanceNum = 1;
}
perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
} else {
expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
}
// if have limit and no conjuncts, only need 1 instance to save cpu and
// mem resource
if (node.get().shouldUseOneInstance(context)) {
expectedInstanceNum = 1;
}
if (fragment.queryCacheParam != null) {
expectedInstanceNum = perNodeScanRanges.size();
}
perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
sharedScan = false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size());
}
for (int j = 0; j < perInstanceScanRanges.size(); j++) {
List<TScanRangeParams> scanRangeParams = perInstanceScanRanges.get(j);
FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, params);
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
params.instanceExecParams.add(instanceParam);
}
params.ignoreDataDistribution = sharedScan;
params.parallelTasksNum = params.ignoreDataDistribution ? 1 : params.instanceExecParams.size();
}
}
}
if (params.instanceExecParams.isEmpty()) {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostport;
if (groupCommitBackend != null) {
execHostport = getGroupCommitBackend(addressToBackendID);
} else if (ConnectContext.get() != null && ConnectContext.get().isSetComputeGroup()
&& !addressToBackendID.isEmpty()) {
// In this case, we only use the BE where the replica selected by the tag is located to
// execute this query. Otherwise, except for the scan node, the rest of the execution nodes
// of the query can be executed on any BE. addressToBackendID can be empty when this is a constant
// select stmt like:
// SELECT @@session.auto_increment_increment AS auto_increment_increment;
execHostport = SimpleScheduler.getHostByCurrentBackend(addressToBackendID);
} else {
execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef);
}
if (execHostport == null) {
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
if (backendIdRef.getRef() != null) {
// backendIdRef can be null is we call getHostByCurrentBackend() before
this.addressToBackendID.put(execHostport, backendIdRef.getRef());
}
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, params);
params.instanceExecParams.add(instanceParam);
}
}
}
private int findMaxParallelFragmentIndex(PlanFragment fragment) {
Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment has no children");
// exclude broadcast join right side's child fragments
List<PlanFragment> childFragmentCandidates = fragment.getChildren().stream()
.filter(e -> e.getOutputPartition() != DataPartition.UNPARTITIONED)
.collect(Collectors.toList());
int maxParallelism = 0;
int maxParaIndex = 0;
for (int i = 0; i < childFragmentCandidates.size(); i++) {
PlanFragmentId childFragmentId = childFragmentCandidates.get(i).getFragmentId();
int currentChildFragmentParallelism = fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size();
if (currentChildFragmentParallelism > maxParallelism) {
maxParallelism = currentChildFragmentParallelism;
maxParaIndex = i;
}
}
return maxParaIndex;
}
private TNetworkAddress getGroupCommitBackend(Map<TNetworkAddress, Long> addressToBackendID) {
// Used for Nereids planner Group commit insert BE select.
TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(),
groupCommitBackend.getBePort());
addressToBackendID.put(execHostport, groupCommitBackend.getId());
return execHostport;
}
// Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship
// between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter
private void assignRuntimeFilterAddr() throws Exception {
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
// Transform <fragment, runtimeFilterId> to <runtimeFilterId, fragment>
for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) {
List<FRuntimeFilterTargetParam> targetFragments = ridToTargetParam.computeIfAbsent(rid,
k -> new ArrayList<>());
for (final FInstanceExecParam instance : params.instanceExecParams) {
targetFragments.add(new FRuntimeFilterTargetParam(instance.fragment().getFragmentId().asInt(),
toBrpcHost(instance.host)));
}
}
for (RuntimeFilterId rid : fragment.getBuilderRuntimeFilterIds()) {
ridToBuilderNum.merge(rid,
(int) params.instanceExecParams.stream().map(ins -> ins.host).distinct().count(), Integer::sum);
}
}
// Use the uppermost fragment as a merged node, the uppermost fragment has one and only one instance
FragmentExecParams uppermostParams = fragmentExecParamsMap.get(fragments.get(0).getFragmentId());
runtimeFilterMergeAddr = toBrpcHost(uppermostParams.instanceExecParams.get(0).host);
runtimeFilterMergeInstanceId = uppermostParams.instanceExecParams.get(0).instanceId;
}
// If fragment has colocated plan node, it will return true.
private boolean isColocateFragment(PlanFragment planFragment, PlanNode node) {
// TODO(cmy): some internal process, such as broker load task, do not have ConnectContext.
// Any configurations needed by the Coordinator should be passed in Coordinator initialization.
// Refine this later.
// Currently, just ignore the session variables if ConnectContext does not exist
if (ConnectContext.get() != null) {
if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
return false;
}
}
//cache the colocateFragmentIds
if (colocateFragmentIds.contains(node.getFragmentId().asInt())) {
return true;
}
if (planFragment.hasColocatePlanNode()) {
colocateFragmentIds.add(planFragment.getId().asInt());
return true;
}
return false;
}
// Returns the id of the leftmost node of any of the gives types in 'plan_root',
// or INVALID_PLAN_NODE_ID if no such node present.
private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) {
PlanNode newPlan = plan;
PlanNode fatherPlan = null;
while (newPlan.getChildren().size() != 0 && !(newPlan instanceof ExchangeNode)) {
fatherPlan = newPlan;
newPlan = newPlan.getChild(0);
}
return Pair.of(fatherPlan, newPlan);
}
private <K, V> V findOrInsert(Map<K, V> m, final K key, final V defaultVal) {
V value = m.get(key);
if (value == null) {
m.put(key, defaultVal);
value = defaultVal;
}
return value;
}
// weather we can overwrite the first parameter or not?
private List<TScanRangeParams> findOrInsert(Map<Integer, List<TScanRangeParams>> m, Integer key,
ArrayList<TScanRangeParams> defaultVal) {
List<TScanRangeParams> value = m.get(key);
if (value == null) {
m.put(key, defaultVal);
value = defaultVal;
}
return value;
}
private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) {
assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdTobucketSeqToScanRangeMap,
fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin);
}
private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
Map<TNetworkAddress, Long> replicaNumPerHost = Maps.newHashMap();
for (ScanNode scanNode : scanNodes) {
List<TScanRangeLocations> locationsList = scanNode.getScanRangeLocations(0);
for (TScanRangeLocations locations : locationsList) {
for (TScanRangeLocation location : locations.locations) {
if (replicaNumPerHost.containsKey(location.server)) {
replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) + 1L);
} else {
replicaNumPerHost.put(location.server, 1L);
}
}
}
}
return replicaNumPerHost;
}
// Populates scan_range_assignment_.
// <fragment, <server, nodeId>>
protected void computeScanRangeAssignment() throws Exception {
Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHostForOlapTable();
boolean isAllOlapTables = scanNodes.stream().allMatch(e -> e instanceof OlapScanNode);
boolean isEnableOrderedLocations = ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableOrderedScanRangeLocations
&& isAllOlapTables;
if (isEnableOrderedLocations) {
sortScanNodes();
}
// set scan ranges/locations for scan nodes
for (ScanNode scanNode : scanNodes) {
if (!(scanNode instanceof ExternalScanNode)) {
isAllExternalScan = false;
}
List<TScanRangeLocations> locations;
// the parameters of getScanRangeLocations may ignore, It doesn't take effect
locations = scanNode.getScanRangeLocations(0);
if (locations == null) {
// only analysis olap scan node
continue;
}
if (isEnableOrderedLocations) {
sortScanRangeLocations(locations);
}
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.computeIfAbsent(scanNode.getFragmentId(),
k -> Sets.newHashSet());
scanNodeIds.add(scanNode.getId().asInt());
if (scanNode instanceof FileQueryScanNode) {
fileScanRangeParamsMap.put(
scanNode.getId().asInt(), ((FileQueryScanNode) scanNode).getFileScanRangeParams());
}
if (fragmentExecParamsMap.get(scanNode.getFragmentId()) == null) {
StringBuilder sb = new StringBuilder();
int idx = 0;
sb.append("query id=").append(DebugUtil.printId(queryId)).append(",");
sb.append("fragment=[");
for (Map.Entry<PlanFragmentId, FragmentExecParams> entry : fragmentExecParamsMap.entrySet()) {
if (idx++ != 0) {
sb.append(",");
}
sb.append(entry.getKey());
entry.getValue().appendTo(sb);
}
sb.append("]");
LOG.info("log when get npe, query_id: {}, scanNode: {}, scanNode fid: {}, map: {}",
DebugUtil.printId(queryId),
scanNode.toString(),
scanNode.getFragmentId(),
sb);
}
FragmentScanRangeAssignment assignment
= fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
boolean fragmentContainsColocateJoin = isColocateFragment(scanNode.getFragment(),
scanNode.getFragment().getPlanRoot()) && (scanNode instanceof OlapScanNode);
boolean fragmentContainsBucketShuffleJoin = bucketShuffleJoinController
.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())
&& (scanNode instanceof OlapScanNode);
// A fragment may contain both colocate join and bucket shuffle join
// on need both compute scanRange to init basic data for query coordinator
if (fragmentContainsColocateJoin) {
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost,
replicaNumPerHost, isEnableOrderedLocations);
}
if (fragmentContainsBucketShuffleJoin) {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
}
if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) {
computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost,
replicaNumPerHost, isEnableOrderedLocations);
}
}
}
// To ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByColocate(
final OlapScanNode scanNode, Map<TNetworkAddress, Long> assignedBytesPerHost,
Map<TNetworkAddress, Long> replicaNumPerHost, boolean isEnableOrderedLocations) throws Exception {
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>());
fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
// Same as bucket shuffle.
int bucketNum = scanNode.getBucketNum();
scanNode.getFragment().setBucketNum(bucketNum);
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId());
for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0),
scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost,
replicaNumPerHost, isEnableOrderedLocations);
}
for (TScanRangeLocations location : locations) {
Map<Integer, List<TScanRangeParams>> scanRanges =
findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<>());
List<TScanRangeParams> scanRangeParamsList =
findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>());
// add scan range
TScanRangeParams scanRangeParams = new TScanRangeParams();
scanRangeParams.scan_range = location.scan_range;
scanRangeParamsList.add(scanRangeParams);
updateScanRangeNumByScanRange(scanRangeParams);
}
}
}
//ensure bucket sequence distribued to every host evenly
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation,
PlanFragmentId fragmentId, Integer bucketSeq, Map<TNetworkAddress, Long> assignedBytesPerHost,
Map<TNetworkAddress, Long> replicaNumPerHost, boolean isEnableOrderedLocations)
throws Exception {
Reference<Long> backendIdRef = new Reference<Long>();
selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost,
backendIdRef, isEnableOrderedLocations);
Backend backend = this.idToBackend.get(backendIdRef.getRef());
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}
private void sortScanNodes() {
Collections.sort(scanNodes, new Comparator<ScanNode>() {
@Override
public int compare(ScanNode s1, ScanNode s2) {
return java.lang.Integer.compare(s1.getId().asInt(), s2.getId().asInt());
}
});
}
private void sortScanRangeLocations(List<TScanRangeLocations> locations) {
Collections.sort(locations, new Comparator<TScanRangeLocations>() {
@Override
public int compare(TScanRangeLocations o1, TScanRangeLocations o2) {
return org.apache.thrift.TBaseHelper.compareTo(
o1.getScanRange().getPaloScanRange().tablet_id,
o2.getScanRange().getPaloScanRange().tablet_id);
}
});
}
public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation,
Map<TNetworkAddress, Long> assignedBytesPerHost,
Map<TNetworkAddress, Long> replicaNumPerHost,
Reference<Long> backendIdRef,
boolean isEnableOrderedLocations) throws UserException {
List<TScanRangeLocation> locations = seqLocation.getLocations();
if (isEnableOrderedLocations) {
Collections.sort(locations);
}
if (!Config.enable_local_replica_selection) {
return selectBackendsByRoundRobin(locations, assignedBytesPerHost, replicaNumPerHost,
backendIdRef);
}
List<TScanRangeLocation> localLocations = new ArrayList<>();
List<TScanRangeLocation> nonlocalLocations = new ArrayList<>();
long localBeId = Env.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
for (final TScanRangeLocation location : locations) {
if (location.backend_id == localBeId) {
localLocations.add(location);
} else {
nonlocalLocations.add(location);
}
}
try {
return selectBackendsByRoundRobin(localLocations, assignedBytesPerHost, replicaNumPerHost, backendIdRef);
} catch (UserException ue) {
if (!Config.enable_local_replica_selection_fallback) {
throw ue;
}
return selectBackendsByRoundRobin(nonlocalLocations, assignedBytesPerHost, replicaNumPerHost, backendIdRef);
}
}
public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> sortedLocations,
Map<TNetworkAddress, Long> assignedBytesPerHost, Map<TNetworkAddress, Long> replicaNumPerHost,
Reference<Long> backendIdRef) throws UserException {
Long minAssignedBytes = Long.MAX_VALUE;
Long minReplicaNum = Long.MAX_VALUE;
TScanRangeLocation minLocation = null;
Long step = 1L;
for (final TScanRangeLocation location : sortedLocations) {
Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
if (assignedBytes < minAssignedBytes || (assignedBytes.equals(minAssignedBytes)
&& replicaNumPerHost.get(location.server) < minReplicaNum)) {
minAssignedBytes = assignedBytes;
minReplicaNum = replicaNumPerHost.get(location.server);
minLocation = location;
}
}
for (TScanRangeLocation location : sortedLocations) {
replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1);
}
TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, sortedLocations,
this.idToBackend, backendIdRef);
assignedBytesPerHost.put(location.server, assignedBytesPerHost.get(location.server) + step);
return location;
}
private void computeScanRangeAssignmentByScheduler(
final ScanNode scanNode,
final List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment,
Map<TNetworkAddress, Long> assignedBytesPerHost,
Map<TNetworkAddress, Long> replicaNumPerHost,
boolean isEnableOrderedLocations) throws Exception {
// Type of locations is List, it could have elements that have same "location"
// and we do have this situation for some scan node.
// The duplicate "location" will NOT be filtered by FragmentScanRangeAssignment,
// since FragmentScanRangeAssignment use List<TScanRangeParams> as its value type,
// duplicate "locations" will be converted to list.
for (TScanRangeLocations scanRangeLocations : locations) {
Reference<Long> backendIdRef = new Reference<Long>();
TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations,
assignedBytesPerHost, replicaNumPerHost, backendIdRef, isEnableOrderedLocations);
Backend backend = this.idToBackend.get(backendIdRef.getRef());
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
Map<Integer, List<TScanRangeParams>> scanRanges = findOrInsert(assignment, execHostPort,
new HashMap<Integer, List<TScanRangeParams>>());
List<TScanRangeParams> scanRangeParamsList = findOrInsert(scanRanges, scanNode.getId().asInt(),
new ArrayList<TScanRangeParams>());
// add scan range
TScanRangeParams scanRangeParams = new TScanRangeParams();
scanRangeParams.scan_range = scanRangeLocations.scan_range;
// Volume is optional, so we need to set the value and the is-set bit
scanRangeParams.setVolumeId(minLocation.volume_id);
scanRangeParamsList.add(scanRangeParams);
updateScanRangeNumByScanRange(scanRangeParams);
}
}
private void updateScanRangeNumByScanRange(TScanRangeParams param) {
TScanRange scanRange = param.getScanRange();
if (scanRange == null) {
return;
}
TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange();
if (brokerScanRange != null) {
scanRangeNum += brokerScanRange.getRanges().size();
}
TExternalScanRange externalScanRange = scanRange.getExtScanRange();
if (externalScanRange != null) {
TFileScanRange fileScanRange = externalScanRange.getFileScanRange();
if (fileScanRange != null) {
if (fileScanRange.isSetRanges()) {
scanRangeNum += fileScanRange.getRanges().size();
} else if (fileScanRange.isSetSplitSource()) {
scanRangeNum += fileScanRange.getSplitSource().getNumSplits();
}
}
}
TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
if (paloScanRange != null) {
scanRangeNum += 1;
}
// TODO: more ranges?
}
// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
if (ctx == null || !ctx.updatePipelineStatus(params)) {
return;
}
Status status = new Status(params.status);
// for now, abort the query if we see any error except if the error is cancelled
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
if (!status.ok()) {
if (returnedAllResults && status.isCancelled()) {
LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}"
+ " is reporting failed status {}",
DebugUtil.printId(queryId), params.getFragmentId(),
DebugUtil.printId(params.getFragmentInstanceId()),
params.getBackendId(),
status.toString());
} else {
LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={},"
+ " error message: {}",
DebugUtil.printId(queryId), params.getFragmentId(),
DebugUtil.printId(params.getFragmentInstanceId()),
params.getBackendId(), status.toString());
updateStatus(status);
}
}
if (params.isSetDeltaUrls()) {
updateDeltas(params.getDeltaUrls());
}
if (params.isSetLoadCounters()) {
updateLoadCounters(params.getLoadCounters());
}
if (params.isSetTrackingUrl()) {
LOG.info("query_id={} tracking_url: {}", DebugUtil.printId(queryId), params.getTrackingUrl());
trackingUrl = params.getTrackingUrl();
}
if (params.isSetTxnId()) {
txnId = params.getTxnId();
}
if (params.isSetLabel()) {
label = params.getLabel();
}
if (params.isSetExportFiles()) {
updateExportFiles(params.getExportFiles());
}
if (params.isSetCommitInfos()) {
updateCommitInfos(params.getCommitInfos());
}
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
if (params.isSetHivePartitionUpdates()) {
((HMSTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateHivePartitionUpdates(params.getHivePartitionUpdates());
}
if (params.isSetIcebergCommitDatas()) {
((IcebergTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateIcebergCommitData(params.getIcebergCommitDatas());
}
if (ctx.done) {
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} fragment {} is marked done",
DebugUtil.printId(queryId), ctx.fragmentId);
}
fragmentsDoneLatch.markedCountDown(params.getFragmentId(), params.getBackendId());
}
if (params.isSetLoadedRows() && jobId != -1) {
if (params.isSetFragmentInstanceReports()) {
for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(), report.getFragmentInstanceId(),
report.getLoadedRows(), report.getLoadedBytes(), params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(), report.getFragmentInstanceId(), report.getNumFinishedRange());
}
} else {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges());
}
}
}
/*
* Waiting the coordinator finish executing.
* return false if waiting timeout.
* return true otherwise.
* NOTICE: return true does not mean that coordinator executed success,
* the caller should check queryStatus for result.
*
* We divide the entire waiting process into multiple rounds,
* with a maximum of 30 seconds per round. And after each round of waiting,
* check the status of the BE. If the BE status is abnormal, the wait is ended
* and the result is returned. Otherwise, continue to the next round of waiting.
* This method mainly avoids the problem that the Coordinator waits for a long time
* after some BE can no long return the result due to some exception, such as BE is down.
*/
public boolean join(int timeoutS) {
final long fixedMaxWaitTime = 30;
long leftTimeoutS = timeoutS;
while (leftTimeoutS > 0) {
long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
boolean awaitRes = false;
try {
if (fragmentsDoneLatch != null) {
awaitRes = fragmentsDoneLatch.await(waitTime, TimeUnit.SECONDS);
} else {
awaitRes = instancesDoneLatch.await(waitTime, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
// Do nothing
}
if (awaitRes) {
return true;
}
if (!checkBackendState()) {
return true;
}
leftTimeoutS -= waitTime;
}
return false;
}
/*
* Check the state of backends in needCheckBackendExecStates.
* return true if all of them are OK. Otherwise, return false.
*/
private boolean checkBackendState() {
for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
if (!ctx.isBackendStateHealthy()) {
queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend "
+ ctx.backend.getId() + " is down");
return false;
}
}
return true;
}
public boolean isDone() {
if (fragmentsDoneLatch != null) {
return fragmentsDoneLatch.getCount() == 0;
} else {
return instancesDoneLatch.getCount() == 0;
}
}
public boolean isTimeout() {
return System.currentTimeMillis() > this.timeoutDeadline;
}
public void setMemTableOnSinkNode(boolean enableMemTableOnSinkNode) {
this.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode);
}
public void setBatchSize(int batchSize) {
this.queryOptions.setBatchSize(batchSize);
}
// Currently this method is for BrokerLoad.
public void setProfileLevel(int profileLevel) {
this.queryOptions.setProfileLevel(profileLevel);
}
// map from a BE host address to the per-node assigned scan ranges;
// records scan range assignment for a single fragment
static class FragmentScanRangeAssignment
extends HashMap<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> {
}
// Bucket sequence -> (scan node id -> list of TScanRangeParams)
static class BucketSeqToScanRange extends HashMap<Integer, Map<Integer, List<TScanRangeParams>>> {
}
class BucketShuffleJoinController {
// fragment_id -> < bucket_seq -> < scannode_id -> scan_range_params >>
protected final Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = Maps.newHashMap();
// fragment_id -> < bucket_seq -> be_addresss >
private final Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap
= Maps.newHashMap();
// fragment_id -> < be_id -> bucket_count >
private final Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap();
// fragment_id -> bucket_num
protected final Map<PlanFragmentId, Integer> fragmentIdToBucketNumMap = Maps.newHashMap();
// cache the bucketShuffleFragmentIds
private final Set<Integer> bucketShuffleFragmentIds = new HashSet<>();
private final Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds;
// TODO(cmy): Should refactor this Controller to unify bucket shuffle join and colocate join
public BucketShuffleJoinController(Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
this.fragmentIdToScanNodeIds = fragmentIdToScanNodeIds;
}
// check whether the node fragment is bucket shuffle join fragment
protected boolean isBucketShuffleJoin(int fragmentId, PlanNode node) {
// check the node is be the part of the fragment
if (fragmentId != node.getFragmentId().asInt()) {
return false;
}
if (bucketShuffleFragmentIds.contains(fragmentId)) {
return true;
}
if (node instanceof HashJoinNode) {
HashJoinNode joinNode = (HashJoinNode) node;
if (joinNode.isBucketShuffle()) {
bucketShuffleFragmentIds.add(joinNode.getFragmentId().asInt());
return true;
}
}
for (PlanNode childNode : node.getChildren()) {
if (isBucketShuffleJoin(fragmentId, childNode)) {
return true;
}
}
return false;
}
private boolean isBucketShuffleJoin(int fragmentId) {
return bucketShuffleFragmentIds.contains(fragmentId);
}
private int getFragmentBucketNum(PlanFragmentId fragmentId) {
return fragmentIdToBucketNumMap.get(fragmentId);
}
// make sure each host have average bucket to scan
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation,
PlanFragmentId fragmentId, Integer bucketSeq, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
Map<Long, Integer> buckendIdToBucketCountMap = fragmentIdToBuckendIdBucketCountMap.get(fragmentId);
int maxBucketNum = Integer.MAX_VALUE;
long buckendId = Long.MAX_VALUE;
Long minReplicaNum = Long.MAX_VALUE;
for (TScanRangeLocation location : seqLocation.locations) {
if (buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) < maxBucketNum) {
maxBucketNum = buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0);
buckendId = location.backend_id;
minReplicaNum = replicaNumPerHost.get(location.server);
} else if (buckendIdToBucketCountMap.getOrDefault(location.backend_id, 0) == maxBucketNum
&& replicaNumPerHost.get(location.server) < minReplicaNum) {
buckendId = location.backend_id;
minReplicaNum = replicaNumPerHost.get(location.server);
}
}
Reference<Long> backendIdRef = new Reference<>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId,
seqLocation.locations, idToBackend, backendIdRef);
//the backend with buckendId is not alive, chose another new backend
if (backendIdRef.getRef() != buckendId) {
buckendIdToBucketCountMap.put(backendIdRef.getRef(),
buckendIdToBucketCountMap.getOrDefault(backendIdRef.getRef(), 0) + 1);
} else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly
buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.getOrDefault(buckendId, 0) + 1);
}
for (TScanRangeLocation location : seqLocation.locations) {
replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1);
}
addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}
// to ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByBucket(
final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
int bucketNum = scanNode.getBucketNum();
fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum);
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>());
fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>());
scanNode.getFragment().setBucketNum(bucketNum);
}
Map<Integer, TNetworkAddress> bucketSeqToAddress
= fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId());
for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(),
bucketSeq, idToBackend, addressToBackendID, replicaNumPerHost);
}
for (TScanRangeLocations location : locations) {
Map<Integer, List<TScanRangeParams>> scanRanges =
findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<>());
List<TScanRangeParams> scanRangeParamsList =
findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>());
// add scan range
TScanRangeParams scanRangeParams = new TScanRangeParams();
scanRangeParams.scan_range = location.scan_range;
scanRangeParamsList.add(scanRangeParams);
updateScanRangeNumByScanRange(scanRangeParams);
}
}
}
private void computeInstanceParam(PlanFragmentId fragmentId,
int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) {
assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap,
fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin);
}
}
private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params,
Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap,
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> curFragmentIdToSeqToAddressMap,
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds, boolean hasNullAwareLeftAntiJoin) {
Map<Integer, TNetworkAddress> bucketSeqToAddress = curFragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges
= Maps.newHashMap();
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges
: bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();
// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
if (scanNodeIds.contains(scanNodeId)) {
filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId));
}
}
Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges
= Pair.of(scanRanges.getKey(), filteredNodeScanRanges);
if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
}
addressToScanRanges.get(address).add(filteredScanRanges);
}
/**
* Ignore storage data distribution iff:
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
* 2. Use Nereids planner.
*/
boolean ignoreStorageDataDistribution = scanNodes != null && !scanNodes.isEmpty()
&& params.fragment != null && params.fragment.useSerialSource(context);
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange
: addressToScanRanges.entrySet()) {
List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range
= findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>());
if (ignoreStorageDataDistribution) {
List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges
= ListUtil.splitBySize(scanRange, parallelExecInstanceNum);
/**
* Split scan ranges evenly into `parallelExecInstanceNum` instances.
*
*
* For a fragment contains co-located join,
*
* scan (id = 0) -> join build (id = 2)
* |
* scan (id = 1) -> join probe (id = 2)
*
* If both of `scan (id = 0)` and `scan (id = 1)` are serial operators, we will plan local exchanger
* after them:
*
* scan (id = 0) -> local exchange -> join build (id = 2)
* |
* scan (id = 1) -> local exchange -> join probe (id = 2)
*
*
* And there is another more complicated scenario, for example, `scan (id = 0)` has 10 partitions and
* 3 buckets which means 3 * 10 tablets and `scan (id = 1)` has 3 buckets and no partition which means
* 3 tablets totally. If expected parallelism is 8, we will get a serial scan (id = 0) and a
* non-serial scan (id = 1). For this case, we will plan another plan with local exchange:
*
* scan (id = 0) -> local exchange -> join build (id = 2)
* |
* scan (id = 1) -> join probe (id = 2)
*/
FInstanceExecParam firstInstanceParam = null;
for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange
: perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, addressScanRange.getKey(), params);
if (firstInstanceParam == null) {
firstInstanceParam = instanceParam;
}
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
int scanId = nodeScanRange.getKey();
Optional<ScanNode> node = scanNodes.stream().filter(
scanNode -> scanNode.getId().asInt() == scanId).findFirst();
Preconditions.checkArgument(node.isPresent());
FInstanceExecParam instanceParamToScan = node.get().isSerialOperator()
? firstInstanceParam : instanceParam;
if (!instanceParamToScan.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParamToScan.perNodeScanRanges
.put(nodeScanRange.getKey(), Lists.newArrayList());
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParamToScan.perNodeScanRanges.get(nodeScanRange.getKey())
.addAll(nodeScanRange.getValue());
}
}
params.instanceExecParams.add(instanceParam);
}
for (int i = perInstanceScanRanges.size(); i < parallelExecInstanceNum; i++) {
params.instanceExecParams.add(new FInstanceExecParam(null, addressScanRange.getKey(), params));
}
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum);
}
if (params.fragment != null && params.fragment.queryCacheParam != null) {
expectedInstanceNum = scanRange.size();
}
// 2. split how many scanRange one instance should scan
List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges
= ListUtil.splitBySize(scanRange, expectedInstanceNum);
// 3.construct instanceExecParam add the scanRange should be scan by instance
for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange
: perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, addressScanRange.getKey(), params);
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList());
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
.addAll(nodeScanRange.getValue());
}
}
params.instanceExecParams.add(instanceParam);
}
}
}
params.ignoreDataDistribution = ignoreStorageDataDistribution;
params.parallelTasksNum = params.ignoreDataDistribution ? 1 : params.instanceExecParams.size();
}
private final Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap();
protected final Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
// cache the fragment id to its scan node ids. Used for colocate join.
private final Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = Maps.newHashMap();
private final Set<Integer> colocateFragmentIds = new HashSet<>();
protected final BucketShuffleJoinController bucketShuffleJoinController
= new BucketShuffleJoinController(fragmentIdToScanNodeIds);
public static class PipelineExecContext {
TPipelineFragmentParams rpcParams;
PlanFragmentId fragmentId;
boolean initiated;
boolean done;
TNetworkAddress brpcAddress;
TNetworkAddress address;
Backend backend;
long lastMissingHeartbeatTime = -1;
long beProcessEpoch = 0;
private long jobId;
public PipelineExecContext(PlanFragmentId fragmentId,
TPipelineFragmentParams rpcParams, Backend backend,
ExecutionProfile executionProfile, long jobId) {
this.fragmentId = fragmentId;
this.rpcParams = rpcParams;
this.initiated = false;
this.done = false;
this.backend = backend;
this.address = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
this.beProcessEpoch = backend.getProcessEpoch();
this.jobId = jobId;
this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
executionProfile.addFragmentBackend(fragmentId, backend.getId());
}
/**
* Some information common to all Fragments does not need to be sent repeatedly.
* Therefore, when we confirm that a certain BE has accepted the information,
* we will delete the information in the subsequent Fragment to avoid repeated
* sending.
* This information can be obtained from the cache of BE.
*/
public void unsetFields() {
this.rpcParams.unsetDescTbl();
this.rpcParams.unsetFileScanParams();
this.rpcParams.unsetCoord();
this.rpcParams.unsetQueryGlobals();
this.rpcParams.unsetResourceInfo();
this.rpcParams.setIsSimplifiedParam(true);
}
// update profile.
// return true if profile is updated. Otherwise, return false.
// Has to use synchronized to ensure there are not concurrent update threads. Or the done
// state maybe update wrong and will lose data. see https://github.com/apache/doris/pull/29802/files.
public synchronized boolean updatePipelineStatus(TReportExecStatusParams params) {
// The fragment or instance is not finished, not need update
if (!params.done) {
return false;
}
if (this.done) {
// duplicate packet
return false;
}
this.done = true;
return true;
}
public boolean isBackendStateHealthy() {
if (backend.getLastMissingHeartbeatTime() > lastMissingHeartbeatTime && !backend.isAlive()) {
LOG.warn("backend {} is down while joining the coordinator. job id: {}",
backend.getId(), jobId);
return false;
}
return true;
}
public List<QueryStatisticsItem.FragmentInstanceInfo> buildFragmentInstanceInfo() {
return this.rpcParams.local_params.stream().map(it -> new FragmentInstanceInfo.Builder()
.instanceId(it.fragment_instance_id).fragmentId(String.valueOf(fragmentId))
.address(this.address).build()).collect(Collectors.toList());
}
}
public static class PipelineExecContexts {
TUniqueId queryId;
long beId;
Backend backend;
TNetworkAddress brpcAddr;
List<PipelineExecContext> ctxs = Lists.newArrayList();
boolean twoPhaseExecution = false;
int instanceNumber;
ByteString serializedFragments = null;
boolean hasCancelled = false;
boolean cancelInProcess = false;
public PipelineExecContexts(TUniqueId queryId,
Backend backend, TNetworkAddress brpcAddr, boolean twoPhaseExecution,
int instanceNumber) {
this.queryId = queryId;
this.backend = backend;
this.beId = backend.getId();
this.brpcAddr = brpcAddr;
this.twoPhaseExecution = twoPhaseExecution;
this.instanceNumber = instanceNumber;
}
public void addContext(PipelineExecContext ctx) {
this.ctxs.add(ctx);
}
public int getInstanceNumber() {
return instanceNumber;
}
public List<PipelineExecContext> getCtxs() {
return ctxs;
}
public Backend getBackend() {
return backend;
}
/**
* The BackendExecState in states are all send to the same BE.
* So only the first BackendExecState need to carry some common fields, such as DescriptorTbl,
* the other BackendExecState does not need those fields. Unset them to reduce size.
*/
public void unsetFields() {
boolean first = true;
for (PipelineExecContext ctx : ctxs) {
if (first) {
first = false;
continue;
}
ctx.unsetFields();
}
}
public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(BackendServiceProxy proxy)
throws TException {
Preconditions.checkNotNull(serializedFragments);
try {
return proxy.execPlanFragmentsAsync(brpcAddr, serializedFragments, twoPhaseExecution);
} catch (RpcException e) {
// DO NOT throw exception here, return a complete future with error code,
// so that the following logic will cancel the fragment.
return futureWithException(e);
}
}
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync(BackendServiceProxy proxy)
throws TException {
try {
PExecPlanFragmentStartRequest.Builder builder = PExecPlanFragmentStartRequest.newBuilder();
PUniqueId qid = PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
builder.setQueryId(qid);
return proxy.execPlanFragmentStartAsync(brpcAddr, builder.build());
} catch (RpcException e) {
// DO NOT throw exception here, return a complete future with error code,
// so that the following logic will cancel the fragment.
return futureWithException(e);
}
}
@NotNull
private Future<PExecPlanFragmentResult> futureWithException(RpcException e) {
return new Future<PExecPlanFragmentResult>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public PExecPlanFragmentResult get() {
PExecPlanFragmentResult result = PExecPlanFragmentResult.newBuilder().setStatus(
Types.PStatus.newBuilder().addErrorMsgs(e.getMessage())
.setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build();
return result;
}
@Override
public PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
return get();
}
};
}
public void setSerializeFragments(ByteString serializedFragments) {
this.serializedFragments = serializedFragments;
}
public ByteString getSerializedFragments() {
return serializedFragments;
}
public long serializeFragments() throws TException {
TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList();
for (PipelineExecContext cts : ctxs) {
cts.initiated = true;
paramsList.addToParamsList(cts.rpcParams);
}
serializedFragments = ByteString.copyFrom(
new TSerializer(new TCompactProtocol.Factory()).serialize(paramsList));
return serializedFragments.size();
}
public String debugInfo() {
String infos = "";
for (PipelineExecContext pec : ctxs) {
infos += pec.fragmentId + " ";
}
return String.format("query %s, sending pipeline fragments: %s to be %s bprc address %s",
DebugUtil.printId(queryId), infos, beId, brpcAddr.toString());
}
// Just send the cancel message to BE, not care about the result, because there is no retry
// logic in upper logic.
private synchronized void cancelQuery(Status cancelReason) {
if (LOG.isDebugEnabled()) {
LOG.debug("cancelRemoteFragments backend: {}, query={}, reason: {}",
backend, DebugUtil.printId(queryId), cancelReason.toString());
}
if (this.hasCancelled || this.cancelInProcess) {
return;
}
try {
try {
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddr, queryId,
cancelReason);
Futures.addCallback(cancelResult, new FutureCallback<InternalService.PCancelPlanFragmentResult>() {
public void onSuccess(InternalService.PCancelPlanFragmentResult result) {
cancelInProcess = false;
if (result.hasStatus()) {
Status status = new Status(result.getStatus());
if (status.getErrorCode() == TStatusCode.OK) {
hasCancelled = true;
} else {
LOG.warn("Failed to cancel query {} backend: {}, reason: {}",
DebugUtil.printId(queryId), backend, status.toString());
}
}
}
public void onFailure(Throwable t) {
cancelInProcess = false;
LOG.warn("Failed to cancel query {} backend: {}, reason: {}",
DebugUtil.printId(queryId), backend, cancelReason.toString(), t);
}
}, backendRpcCallbackExecutor);
cancelInProcess = true;
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddr.getHostname(),
brpcAddr.getPort());
SimpleScheduler.addToBlacklist(beId, e.getMessage());
}
} catch (Exception e) {
LOG.warn("catch a exception", e);
return;
}
}
}
// execution parameters for a single fragment,
// per-fragment can have multiple FInstanceExecParam,
// used to assemble TPlanFragmentExecParams
protected class FragmentExecParams {
public PlanFragment fragment;
public int parallelTasksNum = 0;
public boolean ignoreDataDistribution = false;
public List<TPlanFragmentDestination> destinations = Lists.newArrayList();
public Map<Integer, Integer> perExchNumSenders = Maps.newHashMap();
public List<PlanFragmentId> inputFragments = Lists.newArrayList();
public List<FInstanceExecParam> instanceExecParams = Lists.newArrayList();
public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment();
public FragmentExecParams(PlanFragment fragment) {
this.fragment = fragment;
}
Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
Set<SortNode> topnSortNodes = scanNodes.stream()
.filter(scanNode -> scanNode instanceof OlapScanNode)
.flatMap(scanNode -> scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
long memLimit = queryOptions.getMemLimit();
// 2. update memory limit for colocate join
if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceExecParams.size());
memLimit = queryOptions.getMemLimit() / rate;
}
Map<TNetworkAddress, TPipelineFragmentParams> res = new HashMap();
Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
TPlanFragment fragmentThrift = fragment.toThrift();
fragmentThrift.query_cache_param = fragment.queryCacheParam;
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam = instanceExecParams.get(i);
Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
}
if (!res.containsKey(instanceExecParam.host)) {
TPipelineFragmentParams params = new TPipelineFragmentParams();
// Set global param
params.setIsNereids(context != null ? context.getState().isNereids() : false);
params.setProtocolVersion(PaloInternalServiceVersion.V1);
params.setDescTbl(descTable);
params.setQueryId(queryId);
params.setPerExchNumSenders(perExchNumSenders);
params.setDestinations(destinations);
params.setNumSenders(instanceExecParams.size());
params.setCoord(coordAddress);
params.setCurrentConnectFe(currentConnectFE);
params.setQueryGlobals(queryGlobals);
params.setQueryOptions(queryOptions);
params.query_options.setMemLimit(memLimit);
params.setSendQueryStatisticsWithEveryBatch(
fragment.isTransferQueryStatisticsWithEveryBatch());
params.setFragment(fragmentThrift);
params.setLocalParams(Lists.newArrayList());
if (tWorkloadGroups != null) {
params.setWorkloadGroups(tWorkloadGroups);
}
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setTotalInstances(instanceExecParams.size());
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
}
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>());
res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer, Integer>());
instanceIdx.put(instanceExecParam.host, 0);
}
// Set each bucket belongs to which instance on this BE.
// This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
int instanceId = instanceIdx.get(instanceExecParam.host);
for (int bucket : instanceExecParam.bucketSeqSet) {
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, instanceId);
}
instanceIdx.replace(instanceExecParam.host, ++instanceId);
TPipelineFragmentParams params = res.get(instanceExecParam.host);
res.get(instanceExecParam.host).getShuffleIdxToInstanceIdx().put(instanceExecParam.recvrId,
params.getLocalParams().size());
TPipelineInstanceParams localParams = new TPipelineInstanceParams();
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
localParams.setPerNodeScanRanges(scanRanges);
localParams.setSenderId(i);
localParams.setBackendNum(backendNum++);
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if (!topnFilters.isEmpty()) {
List<TTopnFilterDesc> filterDescs = new ArrayList<>();
for (TopnFilter filter : topnFilters) {
filterDescs.add(filter.toThrift());
}
localParams.setTopnFilterDescs(filterDescs);
}
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
Set<Integer> broadCastRf = assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
.map(r -> r.getFilterId().asInt()).collect(Collectors.toSet());
for (RuntimeFilter rf : assignedRuntimeFilters) {
if (!ridToTargetParam.containsKey(rf.getFilterId())) {
continue;
}
List<FRuntimeFilterTargetParam> fParams = ridToTargetParam.get(rf.getFilterId());
if (rf.hasRemoteTargets()) {
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParamsV2 = new HashMap<>();
for (FRuntimeFilterTargetParam targetParam : fParams) {
if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_ids
.add(targetParam.targetFragmentId);
} else {
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
new TRuntimeFilterTargetParamsV2());
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_addr
= targetParam.targetFragmentInstanceAddr;
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_ids
= new ArrayList<>();
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_ids
.add(targetParam.targetFragmentId);
// `target_fragment_instance_ids` is a required field
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
= new ArrayList<>();
}
}
localParams.runtime_filter_params.putToRidToTargetParamv2(rf.getFilterId().asInt(),
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParamsV2.values()));
}
}
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
entry.getKey().asInt(), broadCastRf.contains(entry.getKey().asInt()) ? 1 :
entry.getValue());
}
for (RuntimeFilter rf : assignedRuntimeFilters) {
localParams.runtime_filter_params.putToRidToRuntimeFilter(
rf.getFilterId().asInt(), rf.toThrift());
}
}
params.getLocalParams().add(localParams);
}
return res;
}
// Append range information
// [tablet_id(version),tablet_id(version)]
public void appendScanRange(StringBuilder sb, List<TScanRangeParams> params) {
sb.append("range=[");
int idx = 0;
for (TScanRangeParams range : params) {
TPaloScanRange paloScanRange = range.getScanRange().getPaloScanRange();
if (paloScanRange != null) {
if (idx++ != 0) {
sb.append(",");
}
sb.append("{tid=").append(paloScanRange.getTabletId())
.append(",ver=").append(paloScanRange.getVersion()).append("}");
}
TEsScanRange esScanRange = range.getScanRange().getEsScanRange();
if (esScanRange != null) {
sb.append("{ index=").append(esScanRange.getIndex())
.append(", shardid=").append(esScanRange.getShardId())
.append("}");
}
}
sb.append("]");
}
public void appendTo(StringBuilder sb) {
// append fragment
sb.append("{plan=");
fragment.getPlanRoot().appendTrace(sb);
sb.append(",instance=[");
// append instance
for (int i = 0; i < instanceExecParams.size(); ++i) {
if (i != 0) {
sb.append(",");
}
TNetworkAddress address = instanceExecParams.get(i).host;
Map<Integer, List<TScanRangeParams>> scanRanges =
scanRangeAssignment.get(address);
sb.append("{");
sb.append("id=").append(DebugUtil.printId(instanceExecParams.get(i).instanceId));
sb.append(",host=").append(instanceExecParams.get(i).host);
if (scanRanges == null) {
sb.append("}");
continue;
}
sb.append(",range=[");
int eIdx = 0;
for (Map.Entry<Integer, List<TScanRangeParams>> entry : scanRanges.entrySet()) {
if (eIdx++ != 0) {
sb.append(",");
}
sb.append("id").append(entry.getKey()).append(",");
appendScanRange(sb, entry.getValue());
}
sb.append("]");
sb.append("}");
}
sb.append("]"); // end of instances
sb.append("}");
}
}
public QueueToken getQueueToken() {
return queueToken;
}
// fragment instance exec param, it is used to assemble
// the per-instance TPlanFragmentExecParams, as a member of
// FragmentExecParams
static class FInstanceExecParam {
TUniqueId instanceId;
TNetworkAddress host;
Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newHashMap();
Set<Integer> bucketSeqSet = Sets.newHashSet();
FragmentExecParams fragmentExecParams;
int recvrId = -1;
List<TUniqueId> instancesSharingHashTable = Lists.newArrayList();
public void addBucketSeq(int bucketSeq) {
this.bucketSeqSet.add(bucketSeq);
}
public FInstanceExecParam(TUniqueId id, TNetworkAddress host, FragmentExecParams fragmentExecParams) {
this.instanceId = id;
this.host = host;
this.fragmentExecParams = fragmentExecParams;
}
public PlanFragment fragment() {
return fragmentExecParams.fragment;
}
}
// consistent with EXPLAIN's fragment index
public List<QueryStatisticsItem.FragmentInstanceInfo> getFragmentInstanceInfos() {
final List<QueryStatisticsItem.FragmentInstanceInfo> result =
Lists.newArrayList();
lock();
try {
for (int index = 0; index < fragments.size(); index++) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
if (fragments.get(index).getFragmentId() != ctx.fragmentId) {
continue;
}
final List<QueryStatisticsItem.FragmentInstanceInfo> info = ctx.buildFragmentInstanceInfo();
result.addAll(info);
}
}
} finally {
unlock();
}
return result;
}
@Override
public List<TNetworkAddress> getInvolvedBackends() {
List<TNetworkAddress> backendAddresses = Lists.newArrayList();
for (Long backendId : this.beToPipelineExecCtxs.keySet()) {
Backend backend = idToBackend.get(backendId);
backendAddresses.add(new TNetworkAddress(backend.getHost(), backend.getBePort()));
}
return backendAddresses;
}
public List<PlanFragment> getFragments() {
return fragments;
}
protected void updateProfileIfPresent(Consumer<SummaryProfile> profileAction) {
Optional.ofNullable(context).map(ConnectContext::getExecutor).map(StmtExecutor::getSummaryProfile)
.ifPresent(profileAction);
}
// Runtime filter target fragment instance param
static class FRuntimeFilterTargetParam {
public int targetFragmentId;
public TNetworkAddress targetFragmentInstanceAddr;
public FRuntimeFilterTargetParam(int id, TNetworkAddress host) {
this.targetFragmentId = id;
this.targetFragmentInstanceAddr = host;
}
}
@Override
public void setIsProfileSafeStmt(boolean isSafe) {
this.queryOptions.setEnableProfile(isSafe && queryOptions.isEnableProfile());
}
}