PartitionSortNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
import org.apache.doris.nereids.trees.plans.WindowFuncType;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartTopNPhase;
import org.apache.doris.thrift.TPartitionSortNode;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TSortInfo;
import org.apache.doris.thrift.TopNAlgorithm;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
/**
* PartitionSortNode.
* PartitionSortNode is only used in the Nereids.
*/
public class PartitionSortNode extends PlanNode {
private final WindowFuncType function;
private final List<Expr> partitionExprs;
private final SortInfo info;
private final boolean hasGlobalLimit;
private final long partitionLimit;
private final PartitionTopnPhase phase;
/**
* Constructor.
*/
public PartitionSortNode(PlanNodeId id, PlanNode input, WindowFuncType function, List<Expr> partitionExprs,
SortInfo info, boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase) {
super(id, "PartitionTopN", StatisticalType.PARTITION_TOPN_NODE);
Preconditions.checkArgument(info.getOrderingExprs().size() == info.getIsAscOrder().size());
this.function = function;
this.partitionExprs = partitionExprs;
this.info = info;
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.phase = phase;
this.tupleIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
this.tblRefIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
this.nullableTupleIds.addAll(input.getNullableTupleIds());
this.children.add(input);
}
public SortInfo getSortInfo() {
return info;
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (detailLevel == TExplainLevel.BRIEF) {
return "";
}
StringBuilder output = new StringBuilder();
// Add the function name.
String funcName;
if (function == WindowFuncType.ROW_NUMBER) {
funcName = "row_number";
} else if (function == WindowFuncType.RANK) {
funcName = "rank";
} else {
funcName = "dense_rank";
}
output.append(prefix).append("functions: ").append(funcName).append("\n");
// Add the partition expr.
List<String> strings = Lists.newArrayList();
if (!partitionExprs.isEmpty()) {
output.append(prefix).append("partition by: ");
for (Expr partitionExpr : partitionExprs) {
strings.add(partitionExpr.toSql());
}
output.append(Joiner.on(", ").join(strings));
output.append("\n");
}
// Add the order by.
output.append(prefix).append("order by: ");
Iterator<Expr> expr = info.getOrderingExprs().iterator();
Iterator<Boolean> isAsc = info.getIsAscOrder().iterator();
boolean start = true;
while (expr.hasNext()) {
if (start) {
start = false;
} else {
output.append(", ");
}
output.append(expr.next().toSql()).append(" ");
output.append(isAsc.next() ? "ASC" : "DESC");
}
output.append("\n");
if (!conjuncts.isEmpty()) {
output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n");
}
// Add the limit information;
output.append(prefix).append("has global limit: ").append(hasGlobalLimit).append("\n");
output.append(prefix).append("partition limit: ").append(partitionLimit).append("\n");
// mark partition topn phase
output.append(prefix).append("partition topn phase: ").append(phase).append("\n");
return output.toString();
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.PARTITION_SORT_NODE;
TSortInfo sortInfo = info.toThrift();
Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for tupleIds in PartitionSortNode");
TopNAlgorithm topNAlgorithm;
if (hasGlobalLimit) {
// only need row number if has global limit, so we change algorithm directly
topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
} else if (function == WindowFuncType.ROW_NUMBER) {
topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
} else if (function == WindowFuncType.RANK) {
topNAlgorithm = TopNAlgorithm.RANK;
} else {
topNAlgorithm = TopNAlgorithm.DENSE_RANK;
}
TPartTopNPhase pTopNPhase;
if (phase == PartitionTopnPhase.ONE_PHASE_GLOBAL_PTOPN) {
pTopNPhase = TPartTopNPhase.ONE_PHASE_GLOBAL;
} else if (phase == PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN) {
pTopNPhase = TPartTopNPhase.TWO_PHASE_LOCAL;
} else if (phase == PartitionTopnPhase.TWO_PHASE_GLOBAL_PTOPN) {
pTopNPhase = TPartTopNPhase.TWO_PHASE_GLOBAL;
} else {
pTopNPhase = TPartTopNPhase.UNKNOWN;
}
TPartitionSortNode partitionSortNode = new TPartitionSortNode();
partitionSortNode.setTopNAlgorithm(topNAlgorithm);
partitionSortNode.setPartitionExprs(Expr.treesToThrift(partitionExprs));
partitionSortNode.setSortInfo(sortInfo);
partitionSortNode.setHasGlobalLimit(hasGlobalLimit);
partitionSortNode.setPartitionInnerLimit(partitionLimit);
partitionSortNode.setPtopnPhase(pTopNPhase);
msg.partition_sort_node = partitionSortNode;
}
}