ExchangeNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/ExchangeNode.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExchangeNode;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
/**
* Receiver side of a 1:n data stream. Logically, an ExchangeNode consumes the data
* produced by its children. For each of the sending child nodes the actual data
* transmission is performed by the DataStreamSink of the PlanFragment housing
* that child node. Typically, an ExchangeNode only has a single sender child but,
* e.g., for distributed union queries an ExchangeNode may have one sender child per
* union operand.
*
* If a (optional) SortInfo field is set, the ExchangeNode will merge its
* inputs on the parameters specified in the SortInfo object. It is assumed that the
* inputs are also sorted individually on the same SortInfo parameter.
*/
public class ExchangeNode extends PlanNode {
private static final Logger LOG = LogManager.getLogger(ExchangeNode.class);
public static final String EXCHANGE_NODE = "EXCHANGE";
public static final String MERGING_EXCHANGE_NODE = "MERGING-EXCHANGE";
// The parameters based on which sorted input streams are merged by this
// exchange node. Null if this exchange does not merge sorted streams
private SortInfo mergeInfo;
private boolean isRightChildOfBroadcastHashJoin = false;
private TPartitionType partitionType;
/**
* use for Nereids only.
*/
public ExchangeNode(PlanNodeId id, PlanNode inputNode) {
super(id, inputNode, EXCHANGE_NODE, StatisticalType.EXCHANGE_NODE);
offset = 0;
limit = -1;
this.conjuncts = Collections.emptyList();
children.add(inputNode);
computeTupleIds();
}
public TPartitionType getPartitionType() {
return partitionType;
}
public void setPartitionType(TPartitionType partitionType) {
this.partitionType = partitionType;
}
/**
* Create ExchangeNode that consumes output of inputNode.
* An ExchangeNode doesn't have an input node as a child, which is why we
* need to compute the cardinality here.
*/
public ExchangeNode(PlanNodeId id, PlanNode inputNode, boolean copyConjuncts) {
super(id, inputNode, EXCHANGE_NODE, StatisticalType.EXCHANGE_NODE);
offset = 0;
children.add(inputNode);
if (!copyConjuncts) {
this.conjuncts = Lists.newArrayList();
}
// Only apply the limit at the receiver if there are multiple senders.
if (inputNode.getFragment().isPartitioned()) {
limit = inputNode.limit;
}
if (!(inputNode instanceof ExchangeNode)) {
offset = inputNode.offset;
}
computeTupleIds();
}
public boolean isFunctionalExchange() {
return mergeInfo != null || limit != -1 || offset != 0;
}
@Override
public final void computeTupleIds() {
PlanNode inputNode = getChild(0);
TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
updateTupleIds(outputTupleDesc);
}
public void updateTupleIds(TupleDescriptor outputTupleDesc) {
if (outputTupleDesc != null) {
tupleIds.clear();
tupleIds.add(outputTupleDesc.getId());
tblRefIds.add(outputTupleDesc.getId());
nullableTupleIds.add(outputTupleDesc.getId());
} else {
clearTupleIds();
tupleIds.addAll(getChild(0).getTupleIds());
tblRefIds.addAll(getChild(0).getTblRefIds());
nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
}
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
Preconditions.checkState(conjuncts.isEmpty());
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
return;
}
computeStats(analyzer);
}
@Override
protected void computeStats(Analyzer analyzer) throws UserException {
Preconditions.checkState(children.size() == 1);
StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this);
cardinality = (long) statsDeriveResult.getRowCount();
if (LOG.isDebugEnabled()) {
LOG.debug("stats Exchange:" + id + ", cardinality: " + cardinality);
}
}
public SortInfo getMergeInfo() {
return mergeInfo;
}
/**
* Set the parameters used to merge sorted input streams. This can be called
* after init().
*/
public void setMergeInfo(SortInfo info) {
this.mergeInfo = info;
this.planNodeName = "V" + MERGING_EXCHANGE_NODE;
}
@Override
protected void toThrift(TPlanNode msg) {
// If this fragment has another scan node, this exchange node is serial or not should be decided by the scan
// node.
msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode())
&& fragment.useSerialSource(ConnectContext.get()));
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
msg.exchange_node = new TExchangeNode();
for (TupleId tid : tupleIds) {
msg.exchange_node.addToInputRowTuples(tid.asInt());
}
if (mergeInfo != null) {
msg.exchange_node.setSortInfo(mergeInfo.toThrift());
}
msg.exchange_node.setOffset(offset);
msg.exchange_node.setPartitionType(partitionType);
}
@Override
protected String debugString() {
ToStringHelper helper = MoreObjects.toStringHelper(this);
helper.addValue(super.debugString());
helper.add("offset", offset);
return helper.toString();
}
@Override
public int getNumInstances() {
return numInstances;
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return prefix + "offset: " + offset + "\n";
}
public boolean isRightChildOfBroadcastHashJoin() {
return isRightChildOfBroadcastHashJoin;
}
public void setRightChildOfBroadcastHashJoin(boolean value) {
isRightChildOfBroadcastHashJoin = value;
}
/**
* If table `t1` has unique key `k1` and value column `v1`.
* Now use plan below to load data into `t1`:
* ```
* FRAGMENT 0:
* Merging Exchange (id = 1)
* NL Join (id = 2)
* DataStreamSender (id = 3, dst_id = 3) (OLAP_TABLE_SINK_HASH_PARTITIONED)
*
* FRAGMENT 1:
* Exchange (id = 3)
* OlapTableSink (id = 4) ```
*
* In this plan, `Exchange (id = 1)` needs to do merge sort using column `k1` and `v1` so parallelism
* of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which also has only 1 instance
* because this loading job relies on the global ordering of column `k1` and `v1`.
*
* So FRAGMENT 0 should not use serial source.
*/
@Override
public boolean isSerialOperator() {
return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange()
|| partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null;
}
@Override
public boolean hasSerialChildren() {
return isSerialOperator();
}
@Override
public boolean hasSerialScanChildren() {
return false;
}
}