JoinNodeBase.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.analysis.TupleIsNullPredicate;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TNullSide;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public abstract class JoinNodeBase extends PlanNode {
private static final Logger LOG = LogManager.getLogger(JoinNodeBase.class);
protected final TableRef innerRef;
protected final JoinOperator joinOp;
protected final boolean isMark;
protected ExprSubstitutionMap vSrcToOutputSMap;
protected List<TupleDescriptor> vIntermediateTupleDescList;
public JoinNodeBase(PlanNodeId id, String planNodeName, StatisticalType statisticalType,
PlanNode outer, PlanNode inner, TableRef innerRef) {
super(id, planNodeName, statisticalType);
this.innerRef = innerRef;
tblRefIds.addAll(outer.getTblRefIds());
tblRefIds.addAll(inner.getTblRefIds());
children.add(outer);
children.add(inner);
// Inherits all the nullable tuple from the children
// Mark tuples that form the "nullable" side of the outer join as nullable.
nullableTupleIds.addAll(outer.getNullableTupleIds());
nullableTupleIds.addAll(inner.getNullableTupleIds());
joinOp = innerRef.getJoinOp();
if (joinOp.equals(JoinOperator.FULL_OUTER_JOIN)) {
nullableTupleIds.addAll(outer.getOutputTupleIds());
nullableTupleIds.addAll(inner.getOutputTupleIds());
} else if (joinOp.equals(JoinOperator.LEFT_OUTER_JOIN)) {
nullableTupleIds.addAll(inner.getOutputTupleIds());
} else if (joinOp.equals(JoinOperator.RIGHT_OUTER_JOIN)) {
nullableTupleIds.addAll(outer.getOutputTupleIds());
}
this.isMark = this.innerRef != null && innerRef.isMark();
}
public boolean isMarkJoin() {
return isMark;
}
public JoinOperator getJoinOp() {
return joinOp;
}
protected boolean isMaterializedByChild(SlotDescriptor slotDesc, ExprSubstitutionMap smap) {
if (slotDesc.isMaterialized()) {
return true;
}
Expr child = smap.get(new SlotRef(slotDesc));
if (child == null) {
return false;
}
List<SlotRef> slotRefList = Lists.newArrayList();
child.collect(SlotRef.class, slotRefList);
for (SlotRef slotRef : slotRefList) {
if (slotRef.getDesc() != null && !slotRef.getDesc().isMaterialized()) {
return false;
}
}
return true;
}
protected void computeOutputTuple(Analyzer analyzer) throws UserException {
// 1. create new tuple
outputTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
boolean copyLeft = false;
boolean copyRight = false;
boolean leftNullable = false;
boolean rightNullable = false;
switch (joinOp) {
case INNER_JOIN:
case CROSS_JOIN:
copyLeft = true;
copyRight = true;
break;
case LEFT_OUTER_JOIN:
copyLeft = true;
copyRight = true;
rightNullable = true;
break;
case RIGHT_OUTER_JOIN:
copyLeft = true;
copyRight = true;
leftNullable = true;
break;
case FULL_OUTER_JOIN:
copyLeft = true;
copyRight = true;
leftNullable = true;
rightNullable = true;
break;
case LEFT_ANTI_JOIN:
case LEFT_SEMI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
copyLeft = true;
break;
case RIGHT_ANTI_JOIN:
case RIGHT_SEMI_JOIN:
copyRight = true;
break;
default:
break;
}
ExprSubstitutionMap srcTblRefToOutputTupleSmap = new ExprSubstitutionMap();
int leftNullableNumber = 0;
int rightNullableNumber = 0;
if (copyLeft) {
//cross join do not have OutputTblRefIds
List<TupleId> srcTupleIds = getChild(0) instanceof JoinNodeBase ? getChild(0).getOutputTupleIds()
: getChild(0).getOutputTblRefIds();
for (TupleDescriptor leftTupleDesc : analyzer.getDescTbl().getTupleDesc(srcTupleIds)) {
// if the child is cross join node, the only way to get the correct nullable info of its output slots
// is to check if the output tuple ids are outer joined or not.
// then pass this nullable info to hash join node will be correct.
boolean needSetToNullable =
getChild(0) instanceof JoinNodeBase && analyzer.isOuterJoined(leftTupleDesc.getId());
for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) {
if (!isMaterializedByChild(leftSlotDesc, getChild(0).getOutputSmap())) {
continue;
}
SlotDescriptor outputSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, leftSlotDesc);
if (leftNullable) {
outputSlotDesc.setIsNullable(true);
leftNullableNumber++;
}
if (needSetToNullable) {
outputSlotDesc.setIsNullable(true);
}
srcTblRefToOutputTupleSmap.put(new SlotRef(leftSlotDesc), new SlotRef(outputSlotDesc));
}
}
}
if (copyRight) {
List<TupleId> srcTupleIds = getChild(1) instanceof JoinNodeBase ? getChild(1).getOutputTupleIds()
: getChild(1).getOutputTblRefIds();
for (TupleDescriptor rightTupleDesc : analyzer.getDescTbl().getTupleDesc(srcTupleIds)) {
boolean needSetToNullable =
getChild(1) instanceof JoinNodeBase && analyzer.isOuterJoined(rightTupleDesc.getId());
for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) {
if (!isMaterializedByChild(rightSlotDesc, getChild(1).getOutputSmap())) {
continue;
}
SlotDescriptor outputSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, rightSlotDesc);
if (rightNullable) {
outputSlotDesc.setIsNullable(true);
rightNullableNumber++;
}
if (needSetToNullable) {
outputSlotDesc.setIsNullable(true);
}
srcTblRefToOutputTupleSmap.put(new SlotRef(rightSlotDesc), new SlotRef(outputSlotDesc));
}
}
}
// add mark slot if needed
if (isMarkJoin() && analyzer.needPopUpMarkTuple(innerRef)) {
SlotDescriptor markSlot = analyzer.getMarkTuple(innerRef).getSlots().get(0);
SlotDescriptor outputSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, markSlot);
srcTblRefToOutputTupleSmap.put(new SlotRef(markSlot), new SlotRef(outputSlotDesc));
}
// 2. compute srcToOutputMap
vSrcToOutputSMap = ExprSubstitutionMap.subtraction(outputSmap, srcTblRefToOutputTupleSmap, analyzer);
for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
Preconditions.checkState(vSrcToOutputSMap.getRhs().get(i) instanceof SlotRef);
SlotRef rSlotRef = (SlotRef) vSrcToOutputSMap.getRhs().get(i);
if (vSrcToOutputSMap.getLhs().get(i) instanceof SlotRef) {
SlotRef lSlotRef = (SlotRef) vSrcToOutputSMap.getLhs().get(i);
rSlotRef.getDesc().setIsMaterialized(lSlotRef.getDesc().isMaterialized());
} else {
rSlotRef.getDesc().setIsMaterialized(true);
rSlotRef.materializeSrcExpr();
}
}
outputTupleDesc.computeStatAndMemLayout();
// 3. add tupleisnull in null-side
Preconditions.checkState(srcTblRefToOutputTupleSmap.getLhs().size() == vSrcToOutputSMap.getLhs().size());
// Condition1: the left child is null-side
// Condition2: the left child is a inline view
// Then: add tuple is null in left child columns
if (leftNullable && getChild(0).getTblRefIds().size() == 1
&& analyzer.isInlineView(getChild(0).getTblRefIds().get(0))) {
List<Expr> tupleIsNullLhs = TupleIsNullPredicate.wrapExprs(
vSrcToOutputSMap.getLhs().subList(0, leftNullableNumber), new ArrayList<>(), TNullSide.LEFT,
analyzer);
tupleIsNullLhs
.addAll(vSrcToOutputSMap.getLhs().subList(leftNullableNumber, vSrcToOutputSMap.getLhs().size()));
vSrcToOutputSMap.updateLhsExprs(tupleIsNullLhs);
}
// Condition1: the right child is null-side
// Condition2: the right child is a inline view
// Then: add tuple is null in right child columns
if (rightNullable && getChild(1).getTblRefIds().size() == 1
&& analyzer.isInlineView(getChild(1).getTblRefIds().get(0))) {
if (rightNullableNumber != 0) {
int rightBeginIndex = vSrcToOutputSMap.size() - rightNullableNumber;
List<Expr> tupleIsNullLhs = TupleIsNullPredicate.wrapExprs(
vSrcToOutputSMap.getLhs().subList(rightBeginIndex, vSrcToOutputSMap.size()), new ArrayList<>(),
TNullSide.RIGHT, analyzer);
List<Expr> newLhsList = Lists.newArrayList();
if (rightBeginIndex > 0) {
newLhsList.addAll(vSrcToOutputSMap.getLhs().subList(0, rightBeginIndex));
}
newLhsList.addAll(tupleIsNullLhs);
vSrcToOutputSMap.updateLhsExprs(newLhsList);
}
}
// 4. change the outputSmap
outputSmap = ExprSubstitutionMap.composeAndReplace(outputSmap, srcTblRefToOutputTupleSmap, analyzer);
}
@Override
public void initOutputSlotIds(Set<SlotId> requiredSlotIdSet, Analyzer analyzer) {
outputSlotIds = Lists.newArrayList();
List<TupleDescriptor> outputTupleDescList = Lists.newArrayList();
if (outputTupleDesc != null) {
outputTupleDescList.add(outputTupleDesc);
} else {
for (TupleId tupleId : tupleIds) {
outputTupleDescList.add(analyzer.getTupleDesc(tupleId));
}
}
SlotId firstMaterializedSlotId = null;
for (TupleDescriptor tupleDescriptor : outputTupleDescList) {
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
if (slotDescriptor.isMaterialized()) {
if ((requiredSlotIdSet == null || requiredSlotIdSet.contains(slotDescriptor.getId()))) {
outputSlotIds.add(slotDescriptor.getId());
}
if (firstMaterializedSlotId == null) {
firstMaterializedSlotId = slotDescriptor.getId();
}
}
}
}
// be may be possible to output correct row number without any column data in future
// but for now, in order to have correct output row number, should keep at least one slot.
// use first materialized slot if outputSlotIds is empty.
if (outputSlotIds.isEmpty() && firstMaterializedSlotId != null) {
outputSlotIds.add(firstMaterializedSlotId);
}
}
@Override
public void projectOutputTuple() {
if (outputTupleDesc == null) {
return;
}
if (outputTupleDesc.getSlots().size() == outputSlotIds.size()) {
return;
}
Iterator<SlotDescriptor> iterator = outputTupleDesc.getSlots().iterator();
while (iterator.hasNext()) {
SlotDescriptor slotDescriptor = iterator.next();
boolean keep = false;
for (SlotId outputSlotId : outputSlotIds) {
if (slotDescriptor.getId().equals(outputSlotId)) {
keep = true;
break;
}
}
if (!keep) {
iterator.remove();
SlotRef slotRef = new SlotRef(slotDescriptor);
vSrcToOutputSMap.removeByRhsExpr(slotRef);
}
}
outputTupleDesc.computeStatAndMemLayout();
}
protected abstract Pair<Boolean, Boolean> needToCopyRightAndLeft();
protected abstract void computeOtherConjuncts(Analyzer analyzer, ExprSubstitutionMap originToIntermediateSmap);
protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisException {
// 1. create new tuple
TupleDescriptor vIntermediateLeftTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
TupleDescriptor vIntermediateRightTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
vIntermediateTupleDescList = new ArrayList<>();
vIntermediateTupleDescList.add(vIntermediateLeftTupleDesc);
vIntermediateTupleDescList.add(vIntermediateRightTupleDesc);
// if join type is MARK, add mark tuple to intermediate tuple. mark slot will be generated after join.
if (isMarkJoin()) {
TupleDescriptor markTuple = analyzer.getMarkTuple(innerRef);
if (markTuple != null) {
vIntermediateTupleDescList.add(markTuple);
}
}
boolean leftNullable = false;
boolean rightNullable = false;
switch (joinOp) {
case LEFT_OUTER_JOIN:
rightNullable = true;
break;
case RIGHT_OUTER_JOIN:
leftNullable = true;
break;
case FULL_OUTER_JOIN:
leftNullable = true;
rightNullable = true;
break;
default:
break;
}
Pair<Boolean, Boolean> tmpPair = needToCopyRightAndLeft();
boolean copyleft = tmpPair.first;
boolean copyRight = tmpPair.second;
// 2. exprsmap: <originslot, intermediateslot>
ExprSubstitutionMap originToIntermediateSmap = new ExprSubstitutionMap();
Map<List<TupleId>, TupleId> originTidsToIntermediateTidMap = Maps.newHashMap();
// left
if (copyleft) {
originTidsToIntermediateTidMap.put(getChild(0).getOutputTupleIds(), vIntermediateLeftTupleDesc.getId());
for (TupleDescriptor tupleDescriptor : analyzer.getDescTbl()
.getTupleDesc(getChild(0).getOutputTupleIds())) {
for (SlotDescriptor slotDescriptor : tupleDescriptor.getMaterializedSlots()) {
SlotDescriptor intermediateSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(vIntermediateLeftTupleDesc, slotDescriptor);
if (leftNullable) {
intermediateSlotDesc.setIsNullable(true);
}
originToIntermediateSmap.put(new SlotRef(slotDescriptor), new SlotRef(intermediateSlotDesc));
}
}
}
vIntermediateLeftTupleDesc.computeMemLayout();
// right
if (copyRight) {
originTidsToIntermediateTidMap.put(getChild(1).getOutputTupleIds(), vIntermediateRightTupleDesc.getId());
for (TupleDescriptor tupleDescriptor : analyzer.getDescTbl()
.getTupleDesc(getChild(1).getOutputTupleIds())) {
for (SlotDescriptor slotDescriptor : tupleDescriptor.getMaterializedSlots()) {
SlotDescriptor intermediateSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(vIntermediateRightTupleDesc, slotDescriptor);
if (rightNullable) {
intermediateSlotDesc.setIsNullable(true);
}
originToIntermediateSmap.put(new SlotRef(slotDescriptor), new SlotRef(intermediateSlotDesc));
}
}
}
vIntermediateRightTupleDesc.computeMemLayout();
// 3. replace srcExpr by intermediate tuple
Preconditions.checkState(vSrcToOutputSMap != null);
// Set `preserveRootTypes` to true because we should keep the consistent for types. See Issue-11314.
vSrcToOutputSMap.substituteLhs(originToIntermediateSmap, analyzer, true);
// 4. replace other conjuncts and conjuncts
computeOtherConjuncts(analyzer, originToIntermediateSmap);
conjuncts = Expr.substituteList(conjuncts, originToIntermediateSmap, analyzer, false);
// 5. replace tuple is null expr
TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), originTidsToIntermediateTidMap);
Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == outputTupleDesc.getSlots().size());
List<Expr> exprs = vSrcToOutputSMap.getLhs();
ArrayList<SlotDescriptor> slots = outputTupleDesc.getSlots();
for (int i = 0; i < slots.size(); i++) {
slots.get(i).setIsNullable(exprs.get(i).isNullable());
}
vSrcToOutputSMap.reCalculateNullableInfoForSlotInRhs();
outputTupleDesc.computeMemLayout();
}
protected abstract List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer);
@Override
public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException {
Set<SlotId> result = Sets.newHashSet();
Preconditions.checkState(outputSlotIds != null);
// step1: change output slot id to src slot id
if (vSrcToOutputSMap != null) {
for (SlotId slotId : outputSlotIds) {
SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
if (srcExpr == null) {
result.add(slotId);
} else {
List<SlotRef> srcSlotRefList = Lists.newArrayList();
srcExpr.collect(SlotRef.class, srcSlotRefList);
result.addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList()));
}
}
}
result.addAll(computeSlotIdsForJoinConjuncts(analyzer));
// conjunct
List<SlotId> conjunctSlotIds = Lists.newArrayList();
Expr.getIds(conjuncts, null, conjunctSlotIds);
result.addAll(conjunctSlotIds);
return result;
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
super.finalize(analyzer);
computeIntermediateTuple(analyzer);
}
/**
* Only for Nereids.
*/
public JoinNodeBase(PlanNodeId id, String planNodeName,
StatisticalType statisticalType, JoinOperator joinOp, boolean isMark) {
super(id, planNodeName, statisticalType);
this.innerRef = null;
this.joinOp = joinOp;
this.isMark = isMark;
}
public TableRef getInnerRef() {
return innerRef;
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
assignedConjuncts = analyzer.getAssignedConjuncts();
// outSmap replace in outer join may cause NULL be replace by literal
// so need replace the outsmap in nullableTupleID
computeStats(analyzer);
if (isMarkJoin() && !joinOp.supportMarkJoin()) {
throw new UserException("Mark join is supported only for LEFT SEMI JOIN/LEFT ANTI JOIN/CROSS JOIN");
}
}
/**
* If parent wants to get join node tupleids,
* it will call this function instead of read properties directly.
* The reason is that the tuple id of outputTupleDesc the real output tuple id for join node.
* <p>
* If you read the properties of @tupleids directly instead of this function,
* it reads the input id of the current node.
*/
@Override
public ArrayList<TupleId> getTupleIds() {
Preconditions.checkState(tupleIds != null);
if (outputTupleDesc != null) {
return Lists.newArrayList(outputTupleDesc.getId());
}
return tupleIds;
}
@Override
public ArrayList<TupleId> getOutputTblRefIds() {
if (outputTupleDesc != null) {
return Lists.newArrayList(outputTupleDesc.getId());
}
switch (joinOp) {
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
return getChild(0).getOutputTblRefIds();
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
return getChild(1).getOutputTblRefIds();
default:
return getTblRefIds();
}
}
@Override
public List<TupleId> getOutputTupleIds() {
if (outputTupleDesc != null) {
return Lists.newArrayList(outputTupleDesc.getId());
}
switch (joinOp) {
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
return getChild(0).getOutputTupleIds();
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
return getChild(1).getOutputTupleIds();
default:
return tupleIds;
}
}
@Override
public void computeStats(Analyzer analyzer) throws UserException {
super.computeStats(analyzer);
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
return;
}
StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this);
cardinality = (long) statsDeriveResult.getRowCount();
}
@Override
public int getNumInstances() {
return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances());
}
/**
* Used by nereids.
*/
public void setvIntermediateTupleDescList(List<TupleDescriptor> vIntermediateTupleDescList) {
this.vIntermediateTupleDescList = vIntermediateTupleDescList;
}
public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) {
outputSmap = smap;
ExprSubstitutionMap tmpSmap = new ExprSubstitutionMap(Lists.newArrayList(vSrcToOutputSMap.getRhs()),
Lists.newArrayList(vSrcToOutputSMap.getLhs()));
List<Expr> newRhs = Lists.newArrayList();
boolean bSmapChanged = false;
for (Expr rhsExpr : smap.getRhs()) {
if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(outputTupleDesc.getId())) {
newRhs.add(rhsExpr);
} else {
// we need do project in the join node
// add a new slot for projection result and add the project expr to vSrcToOutputSMap
SlotDescriptor slotDesc = analyzer.addSlotDescriptor(outputTupleDesc);
slotDesc.initFromExpr(rhsExpr);
slotDesc.setIsMaterialized(true);
// the project expr is from smap, which use the slots of hash join node's output tuple
// we need substitute it to make sure the project expr use slots from intermediate tuple
rhsExpr = rhsExpr.substitute(tmpSmap);
vSrcToOutputSMap.getLhs().add(rhsExpr);
SlotRef slotRef = new SlotRef(slotDesc);
slotRef.materializeSrcExpr();
vSrcToOutputSMap.getRhs().add(slotRef);
newRhs.add(slotRef);
bSmapChanged = true;
}
}
if (bSmapChanged) {
outputSmap.updateRhsExprs(newRhs);
outputTupleDesc.computeStatAndMemLayout();
}
}
}