LocalExchangeNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/ExchangeNode.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprToThriftVisitor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TLocalExchangeNode;
import org.apache.doris.thrift.TLocalPartitionType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** LocalExchangeNode */
public class LocalExchangeNode extends PlanNode {
public static final String EXCHANGE_NODE = "LOCAL-EXCHANGE";
private LocalExchangeType exchangeType;
/**
* use for Nereids only.
*/
public LocalExchangeNode(PlanNodeId id, PlanNode inputNode, LocalExchangeType exchangeType) {
this(id, inputNode, exchangeType, null);
}
public LocalExchangeNode(PlanNodeId id, PlanNode inputNode, LocalExchangeType exchangeType,
List<Expr> distributeExprs) {
super(id, inputNode, EXCHANGE_NODE);
this.offset = 0;
this.limit = -1;
this.conjuncts = Collections.emptyList();
this.children.add(inputNode);
this.exchangeType = exchangeType;
this.fragment = inputNode.getFragment();
List<Expr> hashExprs = distributeExprs;
boolean isHashShuffle = (exchangeType == LocalExchangeType.BUCKET_HASH_SHUFFLE
|| exchangeType == LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE
|| exchangeType == LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
if (isHashShuffle && hashExprs != null && !hashExprs.isEmpty()) {
setDistributeExprLists(hashExprs);
}
TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
updateTupleIds(outputTupleDesc);
}
public void updateTupleIds(TupleDescriptor outputTupleDesc) {
if (outputTupleDesc != null) {
clearTupleIds();
tupleIds.add(outputTupleDesc.getId());
} else {
clearTupleIds();
tupleIds.addAll(getChild(0).getOutputTupleIds());
}
}
@Override
protected void toThrift(TPlanNode msg) {
// FE-planned LocalExchangeNode itself must stay non-serial. In the BE-planned path,
// the serial semantics belong to the upstream scan/exchange pipeline rather than the
// downstream LocalExchangeSource pipeline. Marking LocalExchangeNode as serial would
// incorrectly reduce the downstream pipeline's task count to 1.
msg.setIsSerialOperator(false);
msg.node_type = TPlanNodeType.LOCAL_EXCHANGE_NODE;
msg.local_exchange_node = new TLocalExchangeNode(exchangeType.toThrift());
if (exchangeType.isHashShuffle()) {
List<TExpr> thriftDistributeExprLists = new ArrayList<>();
for (Expr expr : distributeExprLists()) {
thriftDistributeExprLists.add(ExprToThriftVisitor.treeToThrift(expr));
}
msg.local_exchange_node.setDistributeExprLists(thriftDistributeExprLists);
}
}
private List<Expr> distributeExprLists() {
if (distributeExprLists == null) {
return Collections.emptyList();
}
return distributeExprLists;
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return prefix + "type: " + exchangeType.name() + "\n";
}
public LocalExchangeType getExchangeType() {
return exchangeType;
}
@Override
protected boolean shouldResetSerialFlagForChild(int childIndex) {
return true;
}
/**
* Describes what a parent operator demands of its child's output distribution.
* Returned by the parent in {@code enforceAndDeriveLocalExchange} and consumed by
* {@link PlanNode#enforceRequire}, which decides whether to insert a LocalExchangeNode.
*
* <p>How to pick the right require when overriding {@code enforceAndDeriveLocalExchange}:
* <ul>
* <li>{@link NoRequire} — "I don't care about the child's distribution". Use for
* operators whose correctness doesn't depend on partitioning (e.g. base default,
* limit, select). The framework still upgrades this to {@code requirePassthrough}
* automatically when the child turns out to be serial — see
* {@link PlanNode#enforceRequire} step 3.</li>
*
* <li>{@link RequireHash} (via {@code requireHash()}) — "I need hash-partitioned
* input, any flavour of hash will do". Accepts {@code GLOBAL_EXECUTION_HASH_SHUFFLE},
* {@code LOCAL_EXECUTION_HASH_SHUFFLE}, and {@code BUCKET_HASH_SHUFFLE}. This is
* the right choice for shuffled correctness consumers (finalize AggSink with keys,
* partitioned HashJoin, Intersect, Except) — the upstream may already provide a
* compatible flavour and we shouldn't insert a redundant exchange.</li>
*
* <li>{@link RequireSpecific} (via {@code requirePassthrough()},
* {@code requireBroadcast()}, {@code requireBucketHash()},
* {@code requireGlobalExecutionHash()}, etc.) — "I need exactly this exchange type".
* Use only when the operator's correctness or efficiency hinges on that exact
* type (e.g. NLJ probe wants ADAPTIVE_PASSTHROUGH; BucketShuffle join build wants
* BUCKET_HASH_SHUFFLE). Note: PASSTHROUGH is satisfied by ADAPTIVE_PASSTHROUGH
* (superset), but other specific types require exact match.</li>
* </ul>
*
* <p>Rule of thumb: prefer {@code requireHash()} over
* {@code requireSpecific(GLOBAL_EXECUTION_HASH_SHUFFLE)} unless you genuinely need to
* reject other hash flavours. RequireSpecific is fragile because the upstream may
* legitimately output a different (still correct) hash type.
*/
public interface LocalExchangeTypeRequire {
boolean satisfy(LocalExchangeType provide);
LocalExchangeType preferType();
default LocalExchangeTypeRequire autoRequireHash() {
return RequireHash.INSTANCE;
}
static NoRequire noRequire() {
return NoRequire.INSTANCE;
}
static RequireHash requireHash() {
return RequireHash.INSTANCE;
}
static RequireSpecific requirePassthrough() {
return requireSpecific(LocalExchangeType.PASSTHROUGH);
}
static RequireSpecific requirePassToOne() {
return requireSpecific(LocalExchangeType.PASS_TO_ONE);
}
static RequireSpecific requireBroadcast() {
return requireSpecific(LocalExchangeType.BROADCAST);
}
static RequireSpecific requireAdaptivePassthrough() {
return requireSpecific(LocalExchangeType.ADAPTIVE_PASSTHROUGH);
}
static RequireSpecific requireBucketHash() {
return requireSpecific(LocalExchangeType.BUCKET_HASH_SHUFFLE);
}
static RequireSpecific requireGlobalExecutionHash() {
return requireSpecific(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
}
static RequireSpecific requireSpecific(LocalExchangeType require) {
return new RequireSpecific(require);
}
default LocalExchangeType noopTo(LocalExchangeType defaultType) {
LocalExchangeType preferType = preferType();
return (preferType == LocalExchangeType.NOOP) ? defaultType : preferType;
}
}
/** NoRequire */
public static class NoRequire implements LocalExchangeTypeRequire {
public static final NoRequire INSTANCE = new NoRequire();
@Override
public boolean satisfy(LocalExchangeType provide) {
return true;
}
@Override
public LocalExchangeType preferType() {
return LocalExchangeType.NOOP;
}
}
/** RequireHash */
public static class RequireHash implements LocalExchangeTypeRequire {
public static final RequireHash INSTANCE = new RequireHash();
@Override
public boolean satisfy(LocalExchangeType provide) {
switch (provide) {
case GLOBAL_EXECUTION_HASH_SHUFFLE:
case LOCAL_EXECUTION_HASH_SHUFFLE:
case BUCKET_HASH_SHUFFLE:
return true;
default:
return false;
}
}
@Override
public LocalExchangeType preferType() {
return LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
}
@Override
public LocalExchangeTypeRequire autoRequireHash() {
return this;
}
}
public static class RequireSpecific implements LocalExchangeTypeRequire {
LocalExchangeType requireType;
public RequireSpecific(LocalExchangeType requireType) {
this.requireType = requireType;
}
@Override
public boolean satisfy(LocalExchangeType provide) {
if (requireType == provide) {
return true;
}
// ADAPTIVE_PASSTHROUGH is a superset of PASSTHROUGH — both fan out data
// from fewer to more tasks. BE's need_to_local_exchange treats them as
// compatible, so ADAPTIVE_PASSTHROUGH satisfies a PASSTHROUGH requirement.
if (requireType == LocalExchangeType.PASSTHROUGH
&& provide == LocalExchangeType.ADAPTIVE_PASSTHROUGH) {
return true;
}
return false;
}
@Override
public LocalExchangeType preferType() {
return requireType;
}
@Override
public LocalExchangeTypeRequire autoRequireHash() {
if (requireType == LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE
|| requireType == LocalExchangeType.BUCKET_HASH_SHUFFLE) {
return this;
}
return RequireHash.INSTANCE;
}
}
public enum LocalExchangeType {
NOOP,
GLOBAL_EXECUTION_HASH_SHUFFLE,
LOCAL_EXECUTION_HASH_SHUFFLE,
BUCKET_HASH_SHUFFLE,
PASSTHROUGH,
ADAPTIVE_PASSTHROUGH,
BROADCAST,
PASS_TO_ONE,
LOCAL_MERGE_SORT;
public boolean isHashShuffle() {
switch (this) {
case GLOBAL_EXECUTION_HASH_SHUFFLE:
case LOCAL_EXECUTION_HASH_SHUFFLE:
case BUCKET_HASH_SHUFFLE:
return true;
default:
return false;
}
}
// Mirrors BE Pipeline::heavy_operations_on_the_sink():
// HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH perform
// heavy computation on the sink side. When the upstream pipeline has only
// 1 task (serial/pooling scan), a PASSTHROUGH fan-out must be inserted
// before these exchanges to avoid a single-task bottleneck.
public boolean isHeavyOperation() {
switch (this) {
case GLOBAL_EXECUTION_HASH_SHUFFLE:
case LOCAL_EXECUTION_HASH_SHUFFLE:
case BUCKET_HASH_SHUFFLE:
case ADAPTIVE_PASSTHROUGH:
return true;
default:
return false;
}
}
public TLocalPartitionType toThrift() {
switch (this) {
case GLOBAL_EXECUTION_HASH_SHUFFLE:
return TLocalPartitionType.GLOBAL_EXECUTION_HASH_SHUFFLE;
case LOCAL_EXECUTION_HASH_SHUFFLE:
return TLocalPartitionType.LOCAL_EXECUTION_HASH_SHUFFLE;
case BUCKET_HASH_SHUFFLE:
return TLocalPartitionType.BUCKET_HASH_SHUFFLE;
case PASSTHROUGH:
return TLocalPartitionType.PASSTHROUGH;
case ADAPTIVE_PASSTHROUGH:
return TLocalPartitionType.ADAPTIVE_PASSTHROUGH;
case BROADCAST:
return TLocalPartitionType.BROADCAST;
case PASS_TO_ONE:
return TLocalPartitionType.PASS_TO_ONE;
case LOCAL_MERGE_SORT:
return TLocalPartitionType.LOCAL_MERGE_SORT;
default: {
throw new IllegalStateException("Unsupported LocalExchangeType: " + this);
}
}
}
}
}