PlanTreeBuilder.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.profile;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.thrift.TExplainLevel;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.stream.Collectors;
public class PlanTreeBuilder {
private List<PlanFragment> fragments;
private PlanTreeNode treeRoot;
private List<PlanTreeNode> sinkNodes = Lists.newArrayList();
private List<PlanTreeNode> exchangeNodes = Lists.newArrayList();
public PlanTreeBuilder(List<PlanFragment> fragments) {
this.fragments = fragments;
}
public PlanTreeNode getTreeRoot() {
return treeRoot;
}
public void build() throws UserException {
buildFragmentPlans();
assembleFragmentPlans();
}
private void buildFragmentPlans() {
int i = 0;
for (PlanFragment fragment : fragments) {
DataSink sink = fragment.getSink();
PlanTreeNode sinkNode = null;
if (sink != null) {
StringBuilder sb = new StringBuilder();
if (sink.getExchNodeId() != null) {
sb.append("[").append(sink.getExchNodeId().asInt()).append(": ")
.append(sink.getClass().getSimpleName()).append("]");
} else {
sb.append("[").append(sink.getClass().getSimpleName()).append("]");
}
sb.append("\n[Fragment: ").append(fragment.getFragmentSequenceNum()).append("]");
sb.append("\n").append(sink.getExplainString("", TExplainLevel.BRIEF));
List<PlanNodeId> exchangeIds;
if (sink instanceof MultiCastDataSink) {
exchangeIds = ((MultiCastDataSink) sink).getDataStreamSinks().stream()
.map(s -> s.getExchNodeId()).collect(Collectors.toList());
} else if (sink.getExchNodeId() != null) {
exchangeIds = ImmutableList.of(sink.getExchNodeId());
} else {
exchangeIds = ImmutableList.of();
}
sinkNode = new PlanTreeNode(exchangeIds, sb.toString());
if (i == 0) {
// sink of first fragment, set it as tree root
treeRoot = sinkNode;
} else {
sinkNodes.add(sinkNode);
}
}
PlanNode planRoot = fragment.getPlanRoot();
if (planRoot != null) {
buildForPlanNode(planRoot, sinkNode);
}
i++;
}
}
private void assembleFragmentPlans() throws UserException {
for (PlanTreeNode sender : sinkNodes) {
if (sender == treeRoot) {
// This is the result sink, skip it
continue;
}
List<PlanNodeId> senderIds = sender.getIds();
for (PlanNodeId senderId : senderIds) {
PlanTreeNode exchangeNode = findExchangeNode(senderId);
if (exchangeNode == null) {
throw new UserException("Failed to find exchange node for sender id: " + senderId.asInt());
}
exchangeNode.addChild(sender);
}
}
}
private PlanTreeNode findExchangeNode(PlanNodeId senderId) {
for (PlanTreeNode exchangeNode : exchangeNodes) {
if (exchangeNode.getIds().stream().anyMatch(senderId::equals)) {
return exchangeNode;
}
}
return null;
}
private void buildForPlanNode(PlanNode planNode, PlanTreeNode parent) {
PlanTreeNode node = new PlanTreeNode(ImmutableList.of(planNode.getId()), planNode.getPlanTreeExplainStr());
if (parent != null) {
parent.addChild(node);
}
if (planNode.getPlanNodeName().contains(ExchangeNode.EXCHANGE_NODE)) {
exchangeNodes.add(node);
} else {
// Do not traverse children of exchange node,
// They will be visited in other fragments.
for (PlanNode child : planNode.getChildren()) {
buildForPlanNode(child, node);
}
}
}
}