SingleNodePlanner.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/SingleNodePlanner.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.AnalyticExpr;
import org.apache.doris.analysis.AnalyticInfo;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.AssertNumRowsElement;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CaseExpr;
import org.apache.doris.analysis.CaseWhenClause;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.GroupByClause;
import org.apache.doris.analysis.GroupingInfo;
import org.apache.doris.analysis.InlineViewRef;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.LateralViewRef;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SetOperationStmt;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TableValuedFunctionRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.AggregateFunction;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.source.HudiScanNode;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
import org.apache.doris.datasource.odbc.source.OdbcScanNode;
import org.apache.doris.datasource.paimon.source.PaimonScanNode;
import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.thrift.TPushAggOp;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Constructs a non-executable single-node plan from an analyzed parse tree.
* The single-node plan does not contain data exchanges or data-reduction optimizations
* such as local aggregations that are important for distributed execution.
* The single-node plan needs to be wrapped in a plan fragment for it to be executable.
*/
public class SingleNodePlanner {
private static final Logger LOG = LogManager.getLogger(SingleNodePlanner.class);
private final PlannerContext ctx;
private final ArrayList<ScanNode> scanNodes = Lists.newArrayList();
private Map<Analyzer, List<ScanNode>> selectStmtToScanNodes = Maps.newHashMap();
public SingleNodePlanner(PlannerContext ctx) {
this.ctx = ctx;
}
public PlannerContext getPlannerContext() {
return ctx;
}
public ArrayList<ScanNode> getScanNodes() {
return scanNodes;
}
/**
* Generates and returns the root of the single-node plan for the analyzed parse tree
* in the planner context. The planning process recursively walks the parse tree and
* performs the following actions.
* In the top-down phase over query statements:
* - Materialize the slots required for evaluating expressions of that statement.
* - Migrate conjuncts from parent blocks into inline views and union operands.
* In the bottom-up phase generate the plan tree for every query statement:
* - Generate the plan for the FROM-clause of a select statement: The plan trees of
* absolute and uncorrelated table refs are connected via JoinNodes. The relative
* and correlated table refs are associated with one or more SubplanNodes.
* - A new SubplanNode is placed on top of an existing plan node whenever the tuples
* materialized by that plan node enable evaluation of one or more relative or
* correlated table refs, i.e., SubplanNodes are placed at the lowest possible point
* in the plan, often right after a ScanNode materializing the (single) parent tuple.
* - The right-hand side of each SubplanNode is a plan tree generated by joining a
* SingularRowSrcTableRef with those applicable relative and correlated refs.
* A SingularRowSrcTableRef represents the current row being processed by the
* SubplanNode from its input (first child).
* - Connecting table ref plans via JoinNodes is done in a cost-based fashion
* (join-order optimization). All materialized slots, including those of tuples
* materialized inside a SubplanNode, must be known for an accurate estimate of row
* sizes needed for cost-based join ordering.
* - The remaining aggregate/analytic/orderby portions of a select statement are added
* on top of the FROM-clause plan.
* - Whenever a new node is added to the plan tree, assign conjuncts that can be
* evaluated at that node and compute the stats of that node (cardinality, etc.).
* - Apply combined expression substitution map of child plan nodes; if a plan node
* re-maps its input, set a substitution map to be applied by parents.
*/
public PlanNode createSingleNodePlan() throws UserException, AnalysisException {
QueryStmt queryStmt = ctx.getQueryStmt();
// Use the stmt's analyzer which is not necessarily the root analyzer
// to detect empty result sets.
Analyzer analyzer = queryStmt.getAnalyzer();
// TODO(zc)
// analyzer.computeEquivClasses();
// ctx_.getAnalysisResult().getTimeline().markEvent("Equivalence classes computed");
// Mark slots referenced by output exprs as materialized, prior to generating the
// plan tree.
// We need to mark the result exprs of the topmost select block as materialized, so
// that PlanNode.init() can compute the final mem layout of materialized tuples
// (the byte size of tuples is needed for cost computations).
// TODO: instead of materializing everything produced by the plan root, derive
// referenced slots from destination fragment and add a materialization node
// if not all output is needed by destination fragment
// TODO 2: should the materialization decision be cost-based?
if (queryStmt.getBaseTblResultExprs() != null) {
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
}
if (LOG.isTraceEnabled()) {
LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
}
long sqlSelectLimit = -1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
sqlSelectLimit = ConnectContext.get().getSessionVariable().getSqlSelectLimit();
}
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
ctx.getQueryOptions().getDefaultOrderByLimit(), sqlSelectLimit);
Preconditions.checkNotNull(singleNodePlan);
analyzer.getDescTbl().materializeIntermediateSlots();
return singleNodePlan;
}
/**
* Creates an EmptyNode that 'materializes' the tuples of the given stmt.
* Marks all collection-typed slots referenced in stmt as non-materialized because
* they are never unnested, and therefore the corresponding parent scan should not
* materialize them.
*/
private PlanNode createEmptyNode(PlanNode inputPlan, QueryStmt stmt, Analyzer analyzer) throws UserException {
ArrayList<TupleId> tupleIds = Lists.newArrayList();
if (inputPlan != null) {
tupleIds.addAll(inputPlan.getOutputTupleIds());
}
if (tupleIds.isEmpty()) {
// Constant selects do not have materialized tuples at this stage.
Preconditions.checkState(stmt instanceof SelectStmt,
"Only constant selects should have no materialized tuples");
SelectStmt selectStmt = (SelectStmt) stmt;
Preconditions.checkState(selectStmt.getTableRefs().isEmpty());
tupleIds.add(createResultTupleDescriptor(selectStmt, "empty", analyzer).getId());
}
unmarkCollectionSlots(stmt);
EmptySetNode node = new EmptySetNode(ctx.getNextNodeId(), tupleIds);
node.init(analyzer);
// Set the output smap to resolve exprs referencing inline views within stmt.
// Not needed for a UnionStmt because it materializes its input operands.
if (stmt instanceof SelectStmt) {
node.setOutputSmap(((SelectStmt) stmt).getBaseTblSmap(), analyzer);
}
return node;
}
/**
* Mark all collection-typed slots in stmt as non-materialized.
*/
private void unmarkCollectionSlots(QueryStmt stmt) {
List<TableRef> tblRefs = Lists.newArrayList();
stmt.collectTableRefs(tblRefs);
for (TableRef ref : tblRefs) {
if (!ref.isRelative()) {
continue;
}
// Preconditions.checkState(ref instanceof CollectionTableRef);
// CollectionTableRef collTblRef = (CollectionTableRef) ref;
// Expr collExpr = collTblRef.getCollectionExpr();
// Preconditions.checkState(collExpr instanceof SlotRef);
// SlotRef collSlotRef = (SlotRef) collExpr;
// collSlotRef.getDesc().setIsMaterialized(false);
// collSlotRef.getDesc().getParent().recomputeMemLayout();
}
}
/**
* Create plan tree for single-node execution. Generates PlanNodes for the
* Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt.
*/
private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, long defaultOrderByLimit, long sqlSelectLimit)
throws UserException {
long newDefaultOrderByLimit = defaultOrderByLimit;
long defaultLimit = analyzer.getContext().getSessionVariable().getDefaultOrderByLimit();
if (newDefaultOrderByLimit == -1) {
if (defaultLimit <= -1) {
newDefaultOrderByLimit = Long.MAX_VALUE;
} else {
newDefaultOrderByLimit = defaultLimit;
}
}
PlanNode root;
if (stmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) stmt;
pushDownPredicates(analyzer, selectStmt);
root = createSelectPlan(selectStmt, analyzer, newDefaultOrderByLimit);
// TODO(zc)
// insert possible AnalyticEvalNode before SortNode
if (selectStmt.getAnalyticInfo() != null) {
AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
AnalyticPlanner analyticPlanner = new AnalyticPlanner(analyticInfo, analyzer, ctx);
List<Expr> inputPartitionExprs = Lists.newArrayList();
AggregateInfo aggInfo = selectStmt.getAggInfo();
root = analyticPlanner.createSingleNodePlan(root,
aggInfo != null ? aggInfo.getGroupingExprs() : null, inputPartitionExprs);
List<Expr> predicates = getBoundPredicates(analyzer,
selectStmt.getAnalyticInfo().getOutputTupleDesc());
if (!predicates.isEmpty()) {
root = new SelectNode(ctx.getNextNodeId(), root, predicates);
root.init(analyzer);
Preconditions.checkState(root.hasValidStats());
}
if (aggInfo != null && !inputPartitionExprs.isEmpty()) {
// analytic computation will benefit from a partition on inputPartitionExprs
aggInfo.setPartitionExprs(inputPartitionExprs);
}
}
} else {
Preconditions.checkState(stmt instanceof SetOperationStmt);
root = createSetOperationPlan((SetOperationStmt) stmt, analyzer, newDefaultOrderByLimit, sqlSelectLimit);
}
// Avoid adding a sort node if the sort tuple has no materialized slots.
boolean sortHasMaterializedSlots = false;
if (stmt.evaluateOrderBy()) {
for (SlotDescriptor sortSlotDesc :
stmt.getSortInfo().getSortTupleDescriptor().getSlots()) {
if (sortSlotDesc.isMaterialized()) {
sortHasMaterializedSlots = true;
break;
}
}
}
if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) {
long limit = stmt.getLimit();
// TODO: External sort could be used for very large limits
// not just unlimited order-by
boolean useTopN = true;
root = new SortNode(ctx.getNextNodeId(), root, stmt.getSortInfo(),
useTopN);
root.setOffset(stmt.getOffset());
if (useTopN) {
if (sqlSelectLimit >= 0) {
newDefaultOrderByLimit = Math.min(newDefaultOrderByLimit, sqlSelectLimit);
}
if (newDefaultOrderByLimit == Long.MAX_VALUE) {
root.setLimit(limit);
} else {
root.setLimit(limit != -1 ? limit : newDefaultOrderByLimit);
}
} else {
root.setLimit(limit);
}
Preconditions.checkState(root.hasValidStats());
root.init(analyzer);
// TODO chenhao, before merge ValueTransferGraph, force evaluate conjuncts
// from SelectStmt outside
root = addUnassignedConjuncts(analyzer, root);
} else {
if (!stmt.hasLimit() && sqlSelectLimit >= 0) {
root.setLimitAndOffset(sqlSelectLimit, stmt.getOffset());
} else {
root.setLimitAndOffset(stmt.getLimit(), stmt.getOffset());
}
root.computeStats(analyzer);
}
// adding assert node at the end of single node planner
if (stmt.getAssertNumRowsElement() != null) {
root = createAssertRowCountNode(root, stmt.getAssertNumRowsElement(), analyzer);
}
if (analyzer.hasEmptyResultSet() || root.getLimit() == 0) {
// Must clear the scanNodes, otherwise we will get NPE in Coordinator::computeScanRangeAssignment
Set<TupleId> scanTupleIds = new HashSet<>(root.getAllScanTupleIds());
scanNodes.removeIf(scanNode -> scanTupleIds.contains(scanNode.getTupleIds().get(0)));
PlanNode node = createEmptyNode(root, stmt, analyzer);
// Ensure result exprs will be substituted by right outputSmap
node.setOutputSmap(root.outputSmap, analyzer);
return node;
}
return root;
}
/**
* If there are unassigned conjuncts that are bound by tupleIds or if there are slot
* equivalences for tupleIds that have not yet been enforced, returns a SelectNode on
* top of root that evaluates those conjuncts; otherwise returns root unchanged.
* TODO: change this to assign the unassigned conjuncts to root itself, if that is
* semantically correct
*/
private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root)
throws UserException {
Preconditions.checkNotNull(root);
// List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root.getTupleIds());
List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root);
if (conjuncts.isEmpty()) {
return root;
}
// evaluate conjuncts in SelectNode
SelectNode selectNode = new SelectNode(ctx.getNextNodeId(), root, conjuncts);
selectNode.init(analyzer);
Preconditions.checkState(selectNode.hasValidStats());
return selectNode;
}
private PlanNode addUnassignedConjuncts(
Analyzer analyzer, List<TupleId> tupleIds, PlanNode root) throws UserException {
// No point in adding SelectNode on top of an EmptyNode.
if (root instanceof EmptySetNode) {
return root;
}
Preconditions.checkNotNull(root);
// Gather unassigned conjuncts and generate predicates to enfore
// slot equivalences for each tuple id.
List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root);
if (conjuncts.isEmpty()) {
return root;
}
// evaluate conjuncts in SelectNode
SelectNode selectNode = new SelectNode(ctx.getNextNodeId(), root, conjuncts);
// init() marks conjuncts as assigned
selectNode.init(analyzer);
Preconditions.checkState(selectNode.hasValidStats());
return selectNode;
}
private TPushAggOp freshTPushAggOpByName(String functionName, TPushAggOp originAggOp) {
TPushAggOp newPushAggOp = null;
if (functionName.equalsIgnoreCase("COUNT")) {
newPushAggOp = TPushAggOp.COUNT;
} else {
newPushAggOp = TPushAggOp.MINMAX;
}
if (originAggOp == null || newPushAggOp == originAggOp) {
return newPushAggOp;
}
return TPushAggOp.MIX;
}
private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, Analyzer analyzer, PlanNode root) {
do {
if (CollectionUtils.isNotEmpty(root.getConjuncts())
|| CollectionUtils.isNotEmpty(root.getProjectList())) {
break;
}
// TODO: Support muti table in the future
if (selectStmt.getTableRefs().size() != 1) {
break;
}
// No not support group by and where clause
if (null == aggInfo || !aggInfo.getGroupingExprs().isEmpty()) {
break;
}
List<Expr> allConjuncts = analyzer.getAllConjuncts(selectStmt.getTableRefs().get(0).getId());
// planner will generate some conjuncts for analysis. These conjuncts are marked as aux expr.
// we skip these conjuncts when do pushAggOp
if (allConjuncts != null && allConjuncts.stream().noneMatch(Expr::isAuxExpr)) {
break;
}
List<FunctionCallExpr> aggExprs = aggInfo.getAggregateExprs();
boolean aggExprValidate = true;
TPushAggOp aggOp = null;
for (FunctionCallExpr aggExpr : aggExprs) {
String functionName = aggExpr.getFnName().getFunction();
if (!functionName.equalsIgnoreCase("MAX")
&& !functionName.equalsIgnoreCase("MIN")
&& !functionName.equalsIgnoreCase("COUNT")) {
aggExprValidate = false;
break;
}
if (!root.pushDownAggNoGrouping(aggExpr)) {
aggExprValidate = false;
break;
}
aggOp = freshTPushAggOpByName(functionName, aggOp);
if (aggExpr.getChildren().size() > 1) {
aggExprValidate = false;
break;
}
boolean returnColumnValidate = true;
if (aggExpr.getChildren().size() == 1) {
List<Column> returnColumns = Lists.newArrayList();
if (!(aggExpr.getChild(0) instanceof SlotRef)) {
Expr child = aggExpr.getChild(0);
if ((child instanceof CastExpr) && (child.getChild(0) instanceof SlotRef)) {
if (child.getType().isNumericType()
&& child.getChild(0).getType().isNumericType()) {
returnColumns.add(((SlotRef) child.getChild(0)).getDesc().getColumn());
} else {
aggExprValidate = false;
break;
}
} else {
// want to support the agg of count(const value) in dup table
aggExprValidate = (aggOp == TPushAggOp.COUNT && child.isConstant() && !child.isNullable());
if (!aggExprValidate) {
break;
}
}
} else {
returnColumns.add(((SlotRef) aggExpr.getChild(0)).getDesc().getColumn());
}
// check return columns
for (Column col : returnColumns) {
// TODO(zc): Here column is null is too bad
// Only column of Inline-view will be null
if (col == null) {
continue;
}
if (!root.pushDownAggNoGroupingCheckCol(aggExpr, col)) {
returnColumnValidate = false;
break;
}
// The zone map max length of CharFamily is 512, do not
// over the length: https://github.com/apache/doris/pull/6293
if (aggOp == TPushAggOp.MINMAX || aggOp == TPushAggOp.MIX) {
PrimitiveType colType = col.getDataType();
if (colType.isComplexType() || colType.isHllType() || colType.isBitmapType()
|| colType == PrimitiveType.STRING) {
returnColumnValidate = false;
break;
}
if (colType.isCharFamily() && col.getType().getLength() > 512) {
returnColumnValidate = false;
break;
}
}
if (aggOp == TPushAggOp.COUNT || aggOp == TPushAggOp.MIX) {
// NULL value behavior in `count` function is zero, so
// we should not use row_count to speed up query. the col
// must be not null
if (col.isAllowNull()) {
returnColumnValidate = false;
break;
}
}
}
}
if (!returnColumnValidate) {
aggExprValidate = false;
break;
}
}
if (!aggExprValidate) {
break;
}
root.setPushDownAggNoGrouping(aggOp);
} while (false);
}
private void turnOffPreAgg(AggregateInfo aggInfo, SelectStmt selectStmt, Analyzer analyzer, PlanNode root) {
String turnOffReason = null;
do {
if (!(root instanceof OlapScanNode)) {
turnOffReason = "left-deep Node is not OlapScanNode";
break;
}
if (((OlapScanNode) root).getForceOpenPreAgg()) {
((OlapScanNode) root).setIsPreAggregation(true, "");
return;
}
if (null == aggInfo) {
turnOffReason = "No AggregateInfo";
break;
}
ArrayList<FunctionCallExpr> aggExprs = aggInfo.getAggregateExprs();
// multi table join
boolean aggTableValidate = true;
if (selectStmt.getTableRefs().size() > 1) {
for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
final JoinOperator joinOperator = selectStmt.getTableRefs().get(i).getJoinOp();
// TODO chenhao , right out join ?
if (joinOperator.isRightOuterJoin() || joinOperator.isFullOuterJoin()) {
turnOffReason = selectStmt.getTableRefs().get(i)
+ " joinOp is full outer join or right outer join.";
aggTableValidate = false;
break;
}
}
if (!aggTableValidate) {
break;
}
for (FunctionCallExpr aggExpr : aggExprs) {
TableRef olapTableRef = selectStmt.getTableRefs().get(0);
if (Expr.isBound(Lists.newArrayList(aggExpr), Lists.newArrayList(olapTableRef.getId()))) {
// do nothing
if (LOG.isDebugEnabled()) {
LOG.debug("All agg exprs is bound to olapTable: {}" + olapTableRef.getTable().getName());
}
} else {
List<TupleId> tupleIds = Lists.newArrayList();
List<SlotId> slotIds = Lists.newArrayList();
aggExpr.getIds(tupleIds, slotIds);
for (TupleId tupleId : tupleIds) {
// if tupleid is agg's result tuple, there is no tableref
// for only scanNode has the tableref
if (analyzer.getTupleDesc(tupleId).getRef() == null) {
aggTableValidate = false;
break;
}
if (analyzer.getTupleDesc(tupleId).getRef() != olapTableRef) {
if (analyzer.getTupleDesc(tupleId).getTable() != null
&& analyzer.getTupleDesc(tupleId).getTable().getType()
== Table.TableType.OLAP) {
turnOffReason = "agg expr [" + aggExpr.debugString() + "] is not bound ["
+ selectStmt.getTableRefs().get(0).toSql() + "]";
aggTableValidate = false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("The table which agg expr [{}] is bound to, is not OLAP table [{}]",
aggExpr.debugString(),
analyzer.getTupleDesc(tupleId).getTable() == null ? "inline view" :
analyzer.getTupleDesc(tupleId).getTable().getName());
}
}
}
}
}
}
if (!aggTableValidate) {
break;
}
}
boolean valueColumnValidate = true;
List<Expr> allConjuncts = analyzer.getAllConjuncts(selectStmt.getTableRefs().get(0).getId());
List<SlotId> conjunctSlotIds = Lists.newArrayList();
if (allConjuncts != null) {
for (Expr conjunct : allConjuncts) {
if (!conjunct.isAuxExpr()) {
conjunct.getIds(null, conjunctSlotIds);
}
}
for (SlotDescriptor slot : selectStmt.getTableRefs().get(0).getDesc().getSlots()) {
if (!slot.getColumn().isKey()) {
if (conjunctSlotIds.contains(slot.getId())) {
turnOffReason = "conjunct on `" + slot.getColumn().getName()
+ "` which is StorageEngine value column";
valueColumnValidate = false;
break;
}
}
}
}
if (!valueColumnValidate) {
break;
}
boolean aggExprValidate = true;
for (FunctionCallExpr aggExpr : aggExprs) {
if (aggExpr.getChildren().size() != 1) {
turnOffReason = "aggExpr has more than one child";
aggExprValidate = false;
break;
}
List<Column> returnColumns = Lists.newArrayList();
List<Column> conditionColumns = Lists.newArrayList();
if (!(aggExpr.getChild(0) instanceof SlotRef)) {
Expr child = aggExpr.getChild(0);
// ignore cast
boolean castReturnExprValidate = true;
while (child instanceof CastExpr) {
if (child.getChild(0) instanceof SlotRef) {
if (child.getType().isNumericType() && child.getChild(0).getType().isNumericType()) {
returnColumns.add(((SlotRef) child.getChild(0)).getDesc().getColumn());
} else {
turnOffReason = "aggExpr.getChild(0)["
+ aggExpr.getChild(0).toSql()
+ "] is not Numeric CastExpr";
castReturnExprValidate = false;
break;
}
}
child = child.getChild(0);
}
if (!castReturnExprValidate) {
aggExprValidate = false;
break;
}
// convert IF to CASE WHEN.
// For example:
// IF(a > 1, 1, 0) -> CASE WHEN a > 1 THEN 1 ELSE 0 END
if (child instanceof FunctionCallExpr && ((FunctionCallExpr) child)
.getFnName().getFunction().equalsIgnoreCase("IF")) {
Preconditions.checkArgument(child.getChildren().size() == 3);
CaseWhenClause caseWhenClause = new CaseWhenClause(child.getChild(0), child.getChild(1));
child = new CaseExpr(ImmutableList.of(caseWhenClause), child.getChild(2));
}
if (child instanceof CaseExpr) {
CaseExpr caseExpr = (CaseExpr) child;
List<Expr> conditionExprs = caseExpr.getConditionExprs();
for (Expr conditionExpr : conditionExprs) {
List<TupleId> conditionTupleIds = Lists.newArrayList();
List<SlotId> conditionSlotIds = Lists.newArrayList();
conditionExpr.getIds(conditionTupleIds, conditionSlotIds);
for (SlotId conditionSlotId : conditionSlotIds) {
conditionColumns.add(analyzer.getDescTbl().getSlotDesc(conditionSlotId).getColumn());
}
}
boolean caseReturnExprValidate = true;
List<Expr> returnExprs = caseExpr.getReturnExprs();
for (Expr returnExpr : returnExprs) {
// ignore cast in return expr
while (returnExpr instanceof CastExpr) {
returnExpr = returnExpr.getChild(0);
}
if (returnExpr instanceof SlotRef) {
returnColumns.add(((SlotRef) returnExpr).getDesc().getColumn());
} else if (returnExpr.isNullLiteral() || returnExpr.isZeroLiteral()) {
// If then expr is NULL or Zero, open the preaggregation
} else {
turnOffReason = "aggExpr.getChild(0)[" + aggExpr.getChild(0).toSql()
+ "] is not SlotExpr";
caseReturnExprValidate = false;
break;
}
}
if (!caseReturnExprValidate) {
aggExprValidate = false;
break;
}
} else {
turnOffReason = "aggExpr.getChild(0)[" + aggExpr.getChild(0).debugString()
+ "] is not SlotRef or CastExpr|CaseExpr";
aggExprValidate = false;
break;
}
} else {
returnColumns.add(((SlotRef) aggExpr.getChild(0)).getDesc().getColumn());
}
// check condition columns
boolean conditionColumnValidate = true;
for (Column col : conditionColumns) {
// TODO(zc): Here column is null is too bad
// Only column of Inline-view will be null
if (col == null) {
continue;
}
if (!col.isKey()) {
turnOffReason = "the condition column [" + col.getName() + "] is not key type in aggr expr ["
+ aggExpr.toSql() + "].";
conditionColumnValidate = false;
break;
}
}
if (!conditionColumnValidate) {
aggExprValidate = false;
break;
}
// check return columns
boolean returnColumnValidate = true;
for (Column col : returnColumns) {
// TODO(zc): Here column is null is too bad
// Only column of Inline-view will be null
if (col == null) {
continue;
}
String functionName = aggExpr.getFnName().getFunction();
if (col.isKey()) {
if ((!functionName.equalsIgnoreCase("MAX"))
&& (!aggExpr.getFnName().getFunction().equalsIgnoreCase("MIN"))) {
returnColumnValidate = false;
turnOffReason = "the type of agg on StorageEngine's Key column should only be MAX or MIN."
+ "agg expr: " + aggExpr.toSql();
break;
}
}
if (functionName.equalsIgnoreCase("SUM")) {
if (col.getAggregationType() != AggregateType.SUM) {
turnOffReason = "Aggregate Operator not match: SUM <--> " + col.getAggregationType();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase("MAX")) {
if ((!col.isKey()) && col.getAggregationType() != AggregateType.MAX) {
turnOffReason = "Aggregate Operator not match: MAX <--> " + col.getAggregationType();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase("MIN")) {
if ((!col.isKey()) && col.getAggregationType() != AggregateType.MIN) {
turnOffReason = "Aggregate Operator not match: MIN <--> " + col.getAggregationType();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase(FunctionSet.HLL_UNION_AGG)
|| functionName.equalsIgnoreCase(FunctionSet.HLL_RAW_AGG)
|| functionName.equalsIgnoreCase(FunctionSet.HLL_UNION)) {
if (col.getAggregationType() != AggregateType.HLL_UNION) {
turnOffReason =
"Aggregate Operator not match: HLL_UNION <--> " + col.getAggregationType();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase("NDV")) {
if ((!col.isKey())) {
turnOffReason = "NDV function with non-key column: " + col.getName();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase(FunctionSet.BITMAP_UNION_INT)) {
if ((!col.isKey())) {
turnOffReason = "BITMAP_UNION_INT function with non-key column: " + col.getName();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase(FunctionSet.BITMAP_UNION)
|| functionName.equalsIgnoreCase(FunctionSet.BITMAP_UNION_COUNT)
|| functionName.equalsIgnoreCase(FunctionSet.ORTHOGONAL_BITMAP_UNION_COUNT)) {
if (col.getAggregationType() != AggregateType.BITMAP_UNION) {
turnOffReason =
"Aggregate Operator not match: BITMAP_UNION <--> " + col.getAggregationType();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase(FunctionSet.QUANTILE_UNION)) {
if (col.getAggregationType() != AggregateType.QUANTILE_UNION) {
turnOffReason =
"Aggregate Operator not match: QUANTILE_UNION <---> " + col.getAggregationType();
returnColumnValidate = false;
break;
}
} else if (functionName.equalsIgnoreCase("multi_distinct_count")) {
// count(distinct k1), count(distinct k2) / count(distinct k1,k2) can turn on pre aggregation
if ((!col.isKey())) {
turnOffReason = "Multi count or sum distinct with non-key column: " + col.getName();
returnColumnValidate = false;
break;
}
} else {
turnOffReason = "Invalid Aggregate Operator: " + functionName;
returnColumnValidate = false;
break;
}
}
if (!returnColumnValidate) {
aggExprValidate = false;
break;
}
}
if (!aggExprValidate) {
break;
}
boolean groupExprValidate = true;
ArrayList<Expr> groupExprs = aggInfo.getGroupingExprs();
for (Expr groupExpr : groupExprs) {
List<SlotId> groupSlotIds = Lists.newArrayList();
groupExpr.getIds(null, groupSlotIds);
for (SlotDescriptor slot : selectStmt.getTableRefs().get(0).getDesc().getSlots()) {
if (!slot.getColumn().isKey()) {
if (groupSlotIds.contains(slot.getId())) {
turnOffReason = "groupExpr contains StorageEngine's Value";
groupExprValidate = false;
break;
}
}
}
if (!groupExprValidate) {
break;
}
}
if (!groupExprValidate) {
break;
}
OlapScanNode olapNode = (OlapScanNode) root;
if (!olapNode.getCanTurnOnPreAggr()) {
turnOffReason = "this olap scan node[" + olapNode.debugString()
+ "] has already been turned off pre-aggregation.";
break;
}
olapNode.setIsPreAggregation(true, null);
} while (false);
if ((root instanceof OlapScanNode) && turnOffReason != null) {
((OlapScanNode) root).setIsPreAggregation(false, turnOffReason);
}
}
/**
* Return the cheapest plan that materializes the joins of all TableRefs in
* refPlans and the subplans of all applicable TableRefs in subplanRefs.
* Assumes that refPlans are in the order as they originally appeared in
* the query.
* For this plan:
* - the plan is executable, ie, all non-cross joins have equi-join predicates
* - the leftmost scan is over the largest of the inputs for which we can still
* construct an executable plan
* - from bottom to top, all rhs's are in increasing order of selectivity (percentage
* of surviving rows)
* - outer/cross/semi joins: rhs serialized size is < lhs serialized size;
* enforced via join inversion, if necessary
* - SubplanNodes are placed as low as possible in the plan tree - as soon as the
* required tuple ids of one or more TableRefs in subplanRefs are materialized
* Returns null if we can't create an executable plan.
*/
private PlanNode createCheapestJoinPlan(Analyzer analyzer,
List<Pair<TableRef, PlanNode>> refPlans) throws UserException {
if (refPlans.size() == 1) {
return refPlans.get(0).second;
}
// collect eligible candidates for the leftmost input; list contains
// (plan, materialized size)
List<Pair<TableRef, Long>> candidates = new ArrayList<>();
for (Pair<TableRef, PlanNode> entry : refPlans) {
TableRef ref = entry.first;
JoinOperator joinOp = ref.getJoinOp();
// Avoid reordering outer/semi joins which is generally incorrect.
// consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
// TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
continue;
}
PlanNode plan = entry.second;
if (plan.getCardinality() == -1) {
// use 0 for the size to avoid it becoming the leftmost input
// TODO: Consider raw size of scanned partitions in the absence of stats.
candidates.add(Pair.of(ref, new Long(0)));
if (LOG.isDebugEnabled()) {
LOG.debug("The candidate of " + ref.getUniqueAlias() + ": -1. "
+ "Using 0 instead of -1 to avoid error");
}
continue;
}
Preconditions.checkState(ref.isAnalyzed());
long materializedSize = plan.getCardinality();
candidates.add(Pair.of(ref, new Long(materializedSize)));
if (LOG.isDebugEnabled()) {
LOG.debug("The candidate of " + ref.getUniqueAlias() + ": " + materializedSize);
}
}
if (candidates.isEmpty()) {
// This branch should not be reached, because the first one should be inner join.
LOG.warn("Something wrong happens, the code should not be runned");
return null;
}
// order candidates by descending materialized size; we want to minimize the memory
// consumption of the materialized hash tables required for the join sequence
Collections.sort(candidates,
new Comparator<Pair<TableRef, Long>>() {
@Override
public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
long diff = b.second - a.second;
return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
}
});
for (Pair<TableRef, Long> candidate : candidates) {
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
if (result != null) {
return result;
}
}
return null;
}
boolean candidateCardinalityIsSmaller(PlanNode candidate, long candidateInnerNodeCardinality,
PlanNode newRoot, long newRootInnerNodeCardinality) {
if (candidate.getCardinality() < newRoot.getCardinality()) {
return true;
} else if (candidate.getCardinality() == newRoot.getCardinality()) {
if (((candidate instanceof HashJoinNode) && ((HashJoinNode) candidate).getJoinOp().isInnerJoin())
&& ((newRoot instanceof HashJoinNode) && ((HashJoinNode) newRoot).getJoinOp().isInnerJoin())) {
if (candidateInnerNodeCardinality < newRootInnerNodeCardinality) {
return true;
}
}
}
return false;
}
/**
* Returns a plan with leftmostRef's plan as its leftmost input; the joins
* are in decreasing order of selectiveness (percentage of rows they eliminate).
* Creates and adds subplan nodes as soon as the tuple ids required by at least one
* subplan ref are materialized by a join node added during plan generation.
*/
// (ML): change the function name
private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
throws UserException {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to create a query plan starting with " + leftmostRef.getUniqueAlias());
}
// the refs that have yet to be joined
List<Pair<TableRef, PlanNode>> remainingRefs = new ArrayList<>();
PlanNode root = null; // root of accumulated join plan
for (Pair<TableRef, PlanNode> entry : refPlans) {
if (entry.first == leftmostRef) {
root = entry.second;
} else {
remainingRefs.add(entry);
}
}
Preconditions.checkNotNull(root);
// Maps from a TableRef in refPlans with an outer/semi join op to the set of
// TableRefs that precede it refPlans (i.e., in FROM-clause order).
// The map is used to place outer/semi joins at a fixed position in the plan tree
// (IMPALA-860), s.t. all the tables appearing to the left/right of an outer/semi
// join in the original query still remain to the left/right after join ordering.
// This prevents join re-ordering across outer/semi joins which is generally wrong.
// Key of precedingRefs: the right table ref of outer or semi join
// Value of precedingRefs: the preceding refs of this key
// For example:
// select * from t1, t2, t3 left join t4, t5, t6 right semi join t7, t8, t9
// Map:
// { t4: [t1, t2, t3],
// t7: [t1, t2, t3, t4, t5, t6]
// }
Map<TableRef, Set<TableRef>> precedingRefs = new HashMap<>();
List<TableRef> tmpTblRefs = new ArrayList<>();
for (Pair<TableRef, PlanNode> entry : refPlans) {
TableRef tblRef = entry.first;
if (tblRef.getJoinOp().isOuterJoin() || tblRef.getJoinOp().isSemiJoin()) {
precedingRefs.put(tblRef, Sets.newHashSet(tmpTblRefs));
}
tmpTblRefs.add(tblRef);
}
// Refs that have been joined. The union of joinedRefs and the refs in remainingRefs
// are the set of all table refs.
Set<TableRef> joinedRefs = Sets.newHashSet(leftmostRef);
// two statistical value
long numOps = 0;
// A total of several rounds of successful selection
int successfulSelectionTimes = 0;
while (!remainingRefs.isEmpty()) {
// We minimize the resulting cardinality at each step in the join chain,
// which minimizes the total number of hash table lookups.
PlanNode newRoot = null;
Pair<TableRef, PlanNode> minEntry = null;
long newRootRightChildCardinality = 0;
for (Pair<TableRef, PlanNode> tblRefToPlanNodeOfCandidate : remainingRefs) {
TableRef tblRefOfCandidate = tblRefToPlanNodeOfCandidate.first;
long cardinalityOfCandidate = tblRefToPlanNodeOfCandidate.second.getCardinality();
PlanNode rootPlanNodeOfCandidate = tblRefToPlanNodeOfCandidate.second;
JoinOperator joinOp = tblRefOfCandidate.getJoinOp();
// Place outer/semi joins at a fixed position in the plan tree.
Set<TableRef> requiredRefs = precedingRefs.get(tblRefOfCandidate);
if (requiredRefs != null) {
Preconditions.checkState(joinOp.isOuterJoin()
|| joinOp.isSemiJoin());
// The semi and outer join nodes are similar to the stop nodes in each round of the algorithm.
// If the stop node is encountered during the current round of optimal selection,
// it means that the following nodes do not need to be referred to.
// This round has been completed.
// There are two situation in here.
// Situation 1: required table refs have not been placed yet
// t1, t2, t3 left join t4, t5
// Round 1: t3, t1(new root) meets t4(stop)
// stop this round and begin next round
// Situation 2: the remaining table refs to prevent incorrect re-ordering
// of tables across outer/semi joins
// Round 1: t5, t1, t2, t3(root) meets t4(stop)
// stop this round while the new root is null
// planning failed and return null
if (!requiredRefs.equals(joinedRefs)) {
break;
}
}
// reset assigned conjuncts of analyzer in every compare
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
PlanNode candidate = createJoinNode(analyzer, root, rootPlanNodeOfCandidate, tblRefOfCandidate);
// it may not return null, but protect.
if (candidate == null) {
continue;
}
// Have the build side of a join copy data to a compact representation
// in the tuple buffer.
candidate.getChildren().get(1).setCompactData(true);
if (LOG.isDebugEnabled()) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("The " + tblRefOfCandidate.getUniqueAlias() + " is right child of join node.");
stringBuilder.append("The join cardinality is " + candidate.getCardinality() + ".");
stringBuilder.append("In round " + successfulSelectionTimes);
if (LOG.isDebugEnabled()) {
LOG.debug(stringBuilder.toString());
}
}
// Use 'candidate' as the new root; don't consider any other table refs at this
// position in the plan.
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
newRoot = candidate;
minEntry = tblRefToPlanNodeOfCandidate;
break;
}
// Always prefer Hash Join over Nested-Loop Join due to limited costing
// infrastructure.
//
// The following three conditions are met while the candidate is better.
// 1. The first candidate
// 2. The candidate is better than new root: [t3, t2] pk [t3, t1] => [t3, t1]
// 3. The hash join is better than cross join: [t3 cross t1] pk [t3 hash t2] => t3 hash t2
if (newRoot == null
|| ((candidate.getClass().equals(newRoot.getClass())
&& candidateCardinalityIsSmaller(
candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(),
newRoot, newRootRightChildCardinality)))
|| (candidate instanceof HashJoinNode && newRoot instanceof NestedLoopJoinNode)) {
newRoot = candidate;
minEntry = tblRefToPlanNodeOfCandidate;
newRootRightChildCardinality = cardinalityOfCandidate;
}
}
// The table after the outer or semi join is wrongly planned to the front,
// causing the first tblRefToPlanNodeOfCandidate (outer or semi tbl ref)
// in this round of loop to fail and exit the loop.
// This means that the current leftmost node must be wrong, and the correct result cannot be planned.
//
// For example:
// Query: t1 left join t2 inner join t3
// Input params: t3(left most tbl ref), [t1,t2] (remaining refs)
// Round 1: t3, t1 (joined refs) t2 (remaining refs)
// Round 2: requiredRefs.equals(joinedRefs) is false and break, the newRoot is null
// Result: null
// The t3 should not appear before t2 so planning is fail
if (newRoot == null) {
// Could not generate a valid plan.
// for example: the biggest table is the last table
return null;
}
// we need to insert every rhs row into the hash table and then look up
// every lhs row
long lhsCardinality = root.getCardinality();
long rhsCardinality = minEntry.second.getCardinality();
numOps += lhsCardinality + rhsCardinality;
if (LOG.isDebugEnabled()) {
LOG.debug("Round " + successfulSelectionTimes + " chose " + minEntry.first.getUniqueAlias()
+ " #lhs=" + lhsCardinality + " #rhs=" + rhsCardinality + " #ops=" + numOps);
}
remainingRefs.remove(minEntry);
joinedRefs.add(minEntry.first);
root = newRoot;
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
++successfulSelectionTimes;
}
if (LOG.isDebugEnabled()) {
LOG.debug("The final join sequence is "
+ joinedRefs.stream().map(TableRef::getUniqueAlias).collect(Collectors.joining(",")));
}
return root;
}
/**
* Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
* of the selectStmt query block.
*/
private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer, long defaultOrderByLimit)
throws UserException, AnalysisException {
// no from clause -> nothing to plan
if (selectStmt.getTableRefs().isEmpty()) {
return createConstantSelectPlan(selectStmt, analyzer);
}
if (analyzer.enableStarJoinReorder()) {
if (LOG.isDebugEnabled()) {
LOG.debug("use old reorder logical in select stmt");
}
selectStmt.reorderTable(analyzer);
}
// Slot materialization:
// We need to mark all slots as materialized that are needed during the execution
// of selectStmt, and we need to do that prior to creating plans for the TableRefs
// (because createTableRefNode() might end up calling computeMemLayout() on one or
// more TupleDescriptors, at which point all referenced slots need to be marked).
//
// For non-join predicates, slots are marked as follows:
// - for base table scan predicates, this is done directly by ScanNode.init(), which
// can do a better job because it doesn't need to materialize slots that are only
// referenced for partition pruning, for instance
// - for inline views, non-join predicates are pushed down, at which point the
// process repeats itself.
selectStmt.materializeRequiredSlots(analyzer);
// collect ids of tuples materialized by the subtree that includes all joins
// and scans
ArrayList<TupleId> rowTuples = Lists.newArrayList();
for (TableRef tblRef : selectStmt.getTableRefs()) {
rowTuples.addAll(tblRef.getMaterializedTupleIds());
}
if (analyzer.hasEmptySpjResultSet() && selectStmt.getAggInfo() != null) {
GroupByClause groupByClause = selectStmt.getGroupByClause();
if (Objects.nonNull(groupByClause) && groupByClause.isGroupByExtension()) {
rowTuples.add(selectStmt.getGroupingInfo().getVirtualTuple().getId());
}
final PlanNode emptySetNode = new EmptySetNode(ctx.getNextNodeId(), rowTuples);
emptySetNode.init(analyzer);
emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap(), analyzer);
return createAggregationPlan(selectStmt, analyzer, emptySetNode);
}
PlanNode root = null;
AggregateInfo aggInfo = selectStmt.getAggInfo();
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using new join reorder strategy when enable_join_reorder_based_cost is true");
}
// create plans for our table refs; use a list here instead of a map to
// maintain a deterministic order of traversing the TableRefs during join
// plan generation (helps with tests)
List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
for (TableRef ref : selectStmt.getTableRefs()) {
materializeTableResultForCrossJoinOrCountStar(ref, analyzer);
PlanNode plan = createTableRefNode(analyzer, ref, selectStmt);
turnOffPreAgg(aggInfo, selectStmt, analyzer, plan);
if (ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) {
pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, plan);
}
if (plan instanceof OlapScanNode) {
OlapScanNode olapNode = (OlapScanNode) plan;
// this olap scan node has been turn off pre-aggregation, should not be turned on again.
// e.g. select sum(v1) from (select v1 from test_table);
if (!olapNode.isPreAggregation()) {
olapNode.setCanTurnOnPreAggr(false);
}
}
Preconditions.checkState(plan != null);
refPlans.add(Pair.of(ref, plan));
}
// save state of conjunct assignment; needed for join plan generation
for (Pair<TableRef, PlanNode> entry : refPlans) {
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
}
root = createCheapestJoinPlan(analyzer, refPlans);
Preconditions.checkState(root != null);
} else {
// create left-deep sequence of binary hash joins; assign node ids as we go along
TableRef tblRef = selectStmt.getTableRefs().get(0);
materializeTableResultForCrossJoinOrCountStar(tblRef, analyzer);
if (selectStmt.getResultExprs().size() == 1) {
final List<SlotId> slotIds = Lists.newArrayList();
final List<TupleId> tupleIds = Lists.newArrayList();
Expr resultExprSelected = selectStmt.getResultExprs().get(0);
if (resultExprSelected != null && resultExprSelected instanceof SlotRef) {
resultExprSelected.getIds(tupleIds, slotIds);
for (SlotId id : slotIds) {
final SlotDescriptor slot = analyzer.getDescTbl().getSlotDesc(id);
slot.setIsMaterialized(true);
slot.materializeSrcExpr();
}
for (TupleId id : tupleIds) {
final TupleDescriptor tuple = analyzer.getDescTbl().getTupleDesc(id);
tuple.setIsMaterialized(true);
}
}
}
root = createTableRefNode(analyzer, tblRef, selectStmt);
// to change the inner contains analytic function
// selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap());
turnOffPreAgg(aggInfo, selectStmt, analyzer, root);
if (ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) {
pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, root);
}
if (root instanceof OlapScanNode) {
OlapScanNode olapNode = (OlapScanNode) root;
// this olap scan node has been turn off pre-aggregation, should not be turned on again.
// e .g. select sum(v1) from (select v1 from test_table);
if (!olapNode.isPreAggregation()) {
olapNode.setCanTurnOnPreAggr(false);
}
}
for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
TableRef innerRef = selectStmt.getTableRefs().get(i);
// Optimization of CountStar would change the materialization of some outputSlot,
// it would cause the materilization inconsistent of SlotRef between outputTupleDesc
// and agg/groupBy expr. So if some not materialized slots in outputTuple of aggInfo
// is set to materialized after the optimization of CountStar, we need to call
// aggregateInfo.materializeRequiredSlots again to make sure all required slots is
// materialized.
boolean aggOutputNotHaveMaterializedSlot = false;
AggregateInfo aggregateInfo = null;
TupleDescriptor output = null;
if (innerRef instanceof InlineViewRef) {
InlineViewRef inlineViewRef = (InlineViewRef) innerRef;
QueryStmt queryStmt = inlineViewRef.getViewStmt();
if (queryStmt instanceof SelectStmt) {
aggregateInfo = ((SelectStmt) queryStmt).getAggInfo();
if (aggregateInfo != null) {
output = aggregateInfo.getOutputTupleDesc();
aggOutputNotHaveMaterializedSlot =
output.getSlots().stream().noneMatch(SlotDescriptor::isMaterialized);
}
}
}
root = createJoinNode(analyzer, root, innerRef, selectStmt);
// Have the build side of a join copy data to a compact representation
// in the tuple buffer.
if (aggOutputNotHaveMaterializedSlot
&& aggregateInfo
.getOutputTupleDesc()
.getSlots()
.stream()
.anyMatch(SlotDescriptor::isMaterialized)) {
aggregateInfo.materializeRequiredSlots(analyzer, null);
}
root.getChildren().get(1).setCompactData(true);
root.assignConjuncts(analyzer);
}
}
if (selectStmt.getSortInfo() != null && selectStmt.getLimit() == -1
&& defaultOrderByLimit == -1) {
// TODO: only use topN if the memory footprint is expected to be low;
// how to account for strings?
throw new AnalysisException("ORDER BY without LIMIT currently not supported");
}
if (root != null) {
// add unassigned conjuncts before aggregation
// (scenario: agg input comes from an inline view which wasn't able to
// evaluate all Where clause conjuncts from this scope)
if (!selectStmt.hasOrderByClause()) {
root = addUnassignedConjuncts(analyzer, root);
}
}
// add aggregation, if required
if (aggInfo != null) {
// introduce repeatNode for group by extension
GroupByClause groupByClause = selectStmt.getGroupByClause();
if (groupByClause != null && groupByClause.isGroupByExtension()) {
root = createRepeatNodePlan(selectStmt, analyzer, root);
}
root = createAggregationPlan(selectStmt, analyzer, root);
}
return root;
}
/**
* Returns a new RepeatNode.
*/
private PlanNode createRepeatNodePlan(SelectStmt selectStmt, Analyzer analyzer,
PlanNode root) throws UserException {
GroupByClause groupByClause = selectStmt.getGroupByClause();
GroupingInfo groupingInfo = selectStmt.getGroupingInfo();
Preconditions.checkState(groupByClause != null && groupByClause.isGroupByExtension()
&& groupingInfo != null);
root = new RepeatNode(ctx.getNextNodeId(), root, groupingInfo, groupByClause);
root.init(analyzer);
return root;
}
public boolean selectMaterializedView(QueryStmt queryStmt, Analyzer analyzer) throws UserException {
boolean selectFailed = false;
boolean haveError = false;
StringBuilder errorMsg = new StringBuilder("select fail reason: ");
if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
Set<TupleId> disableTuplesMVRewriter = Sets.newHashSet();
for (TableRef tableRef : selectStmt.getTableRefs()) {
if (tableRef instanceof InlineViewRef) {
selectFailed |= selectMaterializedView(((InlineViewRef) tableRef).getViewStmt(),
((InlineViewRef) tableRef).getAnalyzer());
}
}
List<ScanNode> scanNodeList = selectStmtToScanNodes.get(selectStmt.getAnalyzer());
if (scanNodeList == null) {
return selectFailed;
}
MaterializedViewSelector materializedViewSelector = new MaterializedViewSelector(selectStmt, analyzer);
for (ScanNode scanNode : scanNodeList) {
if (!(scanNode instanceof OlapScanNode)) {
continue;
}
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
if (olapScanNode.getSelectedPartitionIds().size() == 0 && !FeConstants.runningUnitTest) {
continue;
}
boolean tupleSelectFailed = false;
try {
// select index by the old Rollup selector
olapScanNode.selectBestRollupByRollupSelector(analyzer);
} catch (UserException e) {
tupleSelectFailed = true;
}
// select index by the new Materialized selector
MaterializedViewSelector.BestIndexInfo bestIndexInfo = materializedViewSelector
.selectBestMV(olapScanNode);
if (bestIndexInfo == null) {
tupleSelectFailed = true;
} else {
try {
// if the new selected index id is different from the old one, scan node will be
// updated.
olapScanNode.updateScanRangeInfoByNewMVSelector(bestIndexInfo.getBestIndexId(),
bestIndexInfo.isPreAggregation(), bestIndexInfo.getReasonOfDisable());
// mv index have where clause, so where expr on scan node is unused.
olapScanNode.ignoreConjuncts(olapScanNode.getOlapTable()
.getIndexMetaByIndexId(bestIndexInfo.getBestIndexId())
.getWhereClause());
if (selectStmt.getAggInfo() != null) {
selectStmt.getAggInfo().updateTypeOfAggregateExprs();
}
} catch (Exception e) {
if (haveError) {
errorMsg.append(",");
}
errorMsg.append(e.getMessage());
haveError = true;
tupleSelectFailed = true;
}
}
if (tupleSelectFailed) {
selectFailed = true;
disableTuplesMVRewriter.add(olapScanNode.getTupleId());
}
}
selectStmt.updateDisableTuplesMVRewriter(disableTuplesMVRewriter);
} else {
Preconditions.checkState(queryStmt instanceof SetOperationStmt);
SetOperationStmt unionStmt = (SetOperationStmt) queryStmt;
for (SetOperationStmt.SetOperand unionOperand : unionStmt.getOperands()) {
selectFailed |= selectMaterializedView(unionOperand.getQueryStmt(), analyzer);
}
}
if (haveError) {
throw new MVSelectFailedException(errorMsg.toString());
}
return selectFailed;
}
/**
* Returns a new AggregationNode that materializes the aggregation of the given stmt.
* Assigns conjuncts from the Having clause to the returned node.
*/
private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer,
PlanNode root) throws UserException {
Preconditions.checkState(selectStmt.getAggInfo() != null);
// add aggregation, if required
AggregateInfo aggInfo = selectStmt.getAggInfo();
// aggInfo.substitueGroupingExpr(analyzer);
PlanNode newRoot = new AggregationNode(ctx.getNextNodeId(), root, aggInfo);
newRoot.init(analyzer);
Preconditions.checkState(newRoot.hasValidStats());
// if we're computing DISTINCT agg fns, the analyzer already created the
// 2nd phase agginfo
if (aggInfo.isDistinctAgg()) {
((AggregationNode) newRoot).unsetNeedsFinalize();
// The output of the 1st phase agg is the 1st phase intermediate.
((AggregationNode) newRoot).setIntermediateTuple();
newRoot = new AggregationNode(ctx.getNextNodeId(), newRoot,
aggInfo.getSecondPhaseDistinctAggInfo());
newRoot.init(analyzer);
Preconditions.checkState(newRoot.hasValidStats());
}
// add Having clause
newRoot.assignConjuncts(analyzer);
return newRoot;
}
/**
* Returns a MergeNode that materializes the exprs of the constant selectStmt. Replaces the resultExprs of the
* selectStmt with SlotRefs into the materialized tuple.
*/
private PlanNode createConstantSelectPlan(SelectStmt selectStmt, Analyzer analyzer) throws UserException {
Preconditions.checkState(selectStmt.getTableRefs().isEmpty());
ArrayList<Expr> resultExprs = selectStmt.getResultExprs();
// Create tuple descriptor for materialized tuple.
TupleDescriptor tupleDesc = createResultTupleDescriptor(selectStmt, "union", analyzer);
UnionNode unionNode = new UnionNode(ctx.getNextNodeId(), tupleDesc.getId());
// Analysis guarantees that selects without a FROM clause only have constant exprs.
if (selectStmt.getValueList() != null) {
for (ArrayList<Expr> row : selectStmt.getValueList().getRows()) {
unionNode.addConstExprList(row);
}
} else {
unionNode.addConstExprList(Lists.newArrayList(resultExprs));
}
// Replace the select stmt's resultExprs with SlotRefs into tupleDesc.
for (int i = 0; i < resultExprs.size(); ++i) {
SlotRef slotRef = new SlotRef(tupleDesc.getSlots().get(i));
resultExprs.set(i, slotRef);
selectStmt.getBaseTblResultExprs().set(i, slotRef);
}
// UnionNode.init() needs tupleDesc to have been initialized
unionNode.init(analyzer);
return unionNode;
}
/**
* Create tuple descriptor that can hold the results of the given SelectStmt, with one
* slot per result expr.
*/
private TupleDescriptor createResultTupleDescriptor(SelectStmt selectStmt,
String debugName, Analyzer analyzer) {
TupleDescriptor tupleDesc = analyzer.getDescTbl().createTupleDescriptor(
debugName);
tupleDesc.setIsMaterialized(true);
List<Expr> resultExprs = selectStmt.getResultExprs();
List<String> colLabels = selectStmt.getColLabels();
for (int i = 0; i < resultExprs.size(); ++i) {
Expr resultExpr = resultExprs.get(i);
String colLabel = colLabels.get(i);
SlotDescriptor slotDesc = analyzer.addSlotDescriptor(tupleDesc);
slotDesc.setLabel(colLabel);
slotDesc.setSourceExpr(resultExpr);
slotDesc.setType(resultExpr.getType());
// slotDesc.setStats(ColumnStats.fromExpr(resultExpr));
slotDesc.setIsMaterialized(true);
}
tupleDesc.computeStatAndMemLayout();
return tupleDesc;
}
/**
* Returns plan tree for an inline view ref:
* - predicates from the enclosing scope that can be evaluated directly within
* the inline-view plan are pushed down
* - predicates that cannot be evaluated directly within the inline-view plan
* but only apply to the inline view are evaluated in a SelectNode placed
* on top of the inline view plan
* - all slots that are referenced by predicates from the enclosing scope that cannot
* be pushed down are marked as materialized (so that when computeMemLayout() is
* called on the base table descriptors materialized by the inline view it has a
* complete picture)
*/
private PlanNode createInlineViewPlan(Analyzer analyzer, InlineViewRef inlineViewRef)
throws UserException, AnalysisException {
// If possible, "push down" view predicates; this is needed in order to ensure
// that predicates such as "x + y = 10" are evaluated in the view's plan tree
// rather than a SelectNode grafted on top of that plan tree.
// This doesn't prevent predicate propagation, because predicates like
// "x = 10" that get pushed down are still connected to equivalent slots
// via the equality predicates created for the view's select list.
// Include outer join conjuncts here as well because predicates from the
// On-clause of an outer join may be pushed into the inline view as well.
migrateConjunctsToInlineView(analyzer, inlineViewRef);
// Turn a constant select into a MergeNode that materializes the exprs.
QueryStmt viewStmt = inlineViewRef.getViewStmt();
if (viewStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) viewStmt;
if (selectStmt.getTableRefs().isEmpty()) {
if (inlineViewRef.getAnalyzer().hasEmptyResultSet()) {
PlanNode emptySetNode = createEmptyNode(null, viewStmt, inlineViewRef.getAnalyzer());
// Still substitute exprs in parent nodes with the inline-view's smap to make
// sure no exprs reference the non-materialized inline view slots. No wrapping
// with TupleIsNullPredicates is necessary here because we do not migrate
// conjuncts into outer-joined inline views, so hasEmptyResultSet() cannot be
// true for an outer-joined inline view that has no table refs.
Preconditions.checkState(!analyzer.isOuterJoined(inlineViewRef.getId()));
emptySetNode.setOutputSmap(inlineViewRef.getSmap(), analyzer);
return emptySetNode;
}
// Analysis should have generated a tuple id into which to materialize the exprs.
Preconditions.checkState(inlineViewRef.getMaterializedTupleIds().size() == 1);
// we need to materialize all slots of our inline view tuple
analyzer.getTupleDesc(inlineViewRef.getId()).materializeSlots();
UnionNode unionNode = new UnionNode(ctx.getNextNodeId(),
inlineViewRef.getMaterializedTupleIds().get(0));
if (analyzer.hasEmptyResultSet()) {
return unionNode;
}
unionNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId()));
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
unionNode.addConstExprList(row);
}
} else {
unionNode.addConstExprList(selectStmt.getBaseTblResultExprs());
}
unionNode.init(analyzer);
//set outputSmap to substitute literal in outputExpr
unionNode.setWithoutTupleIsNullOutputSmap(inlineViewRef.getSmap());
unionNode.setOutputSmap(inlineViewRef.getSmap(), analyzer);
return unionNode;
}
}
PlanNode rootNode = createQueryPlan(inlineViewRef.getViewStmt(), inlineViewRef.getAnalyzer(), -1, -1);
// TODO: we should compute the "physical layout" of the view's descriptor, so that
// the avg row size is available during optimization; however, that means we need to
// select references to its resultExprs from the enclosing scope(s)
rootNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId()));
// The output smap is the composition of the inline view's smap and the output smap
// of the inline view's plan root. This ensures that all downstream exprs referencing
// the inline view are replaced with exprs referencing the physical output of the
// inline view's plan.
ExprSubstitutionMap outputSmap = ExprSubstitutionMap.compose(
inlineViewRef.getSmap(), rootNode.getOutputSmap(), analyzer);
// Set output smap of rootNode *before* creating a SelectNode for proper resolution.
rootNode.setOutputSmap(outputSmap, analyzer);
if (rootNode instanceof UnionNode && ((UnionNode) rootNode).isConstantUnion()) {
rootNode.setWithoutTupleIsNullOutputSmap(outputSmap);
}
// rootNode.setOutputSmap(ExprSubstitutionMap.compose(inlineViewRef.getBaseTblSmap(),
// rootNode.getOutputSmap(), analyzer));
// Expr.substituteList(inlineViewRef.getViewStmt().getResultExprs(), analyzer.getChangeResSmap());
// analyzer.setChangeResSmap(inlineViewRef.getAnalyzer().getChangeResSmap());
// Expr.SubstitutionMap inlineViewSmap = inlineViewRef.getExprSMap();
// if (analyzer.isOuterJoined(inlineViewRef.getId())) {
// // Exprs against non-matched rows of an outer join should always return NULL.
// // Make the rhs exprs of the inline view's smap nullable, if necessary.
// List<Expr> nullableRhs = TupleIsNullPredicate.wrapExprs(
// inlineViewSmap.getRhs(), node.getTupleIds(), analyzer);
// inlineViewSmap = new Expr.SubstitutionMap(inlineViewSmap.getLhs(), nullableRhs);
// }
//
// // Set output smap of rootNode *before* creating a SelectNode for proper resolution.
// // The output smap is the composition of the inline view's smap and the output smap
// // of the inline view's plan root. This ensures that all downstream exprs referencing
// // the inline view are replaced with exprs referencing the physical output of
// // the inline view's plan.
// Expr.SubstitutionMap composedSmap = Expr.SubstitutionMap.compose(inlineViewSmap,
// node.getOutputSmap(), analyzer);
// node.setOutputSmap(composedSmap);
// If the inline view has a LIMIT/OFFSET or unassigned conjuncts due to analytic
// functions, we may have conjuncts that need to be assigned to a SELECT node on
// top of the current plan root node.
//
// TODO: This check is also repeated in migrateConjunctsToInlineView() because we
// need to make sure that equivalences are not enforced multiple times. Consolidate
// the assignment of conjuncts and the enforcement of equivalences into a single
// place.
if (!canMigrateConjuncts(inlineViewRef)) {
rootNode = addUnassignedConjuncts(
analyzer, inlineViewRef.getDesc().getId().asList(), rootNode);
}
return rootNode;
}
/**
* Migrates unassigned conjuncts into an inline view. Conjuncts are not
* migrated into the inline view if the view has a LIMIT/OFFSET clause or if the
* view's stmt computes analytic functions (see IMPALA-1243/IMPALA-1900).
* The reason is that analytic functions compute aggregates over their entire input,
* and applying filters from the enclosing scope *before* the aggregate computation
* would alter the results. This is unlike regular aggregate computation, which only
* makes the *output* of the computation visible to the enclosing scope, so that
* filters from the enclosing scope can be safely applied (to the grouping cols, say).
*/
public void migrateConjunctsToInlineView(Analyzer analyzer,
InlineViewRef inlineViewRef) throws AnalysisException {
// All conjuncts
final List<Expr> unassignedConjuncts =
analyzer.getUnassignedConjuncts(inlineViewRef.getId().asList(), true);
// Constant conjuncts
final List<Expr> unassignedConstantConjuncts = Lists.newArrayList();
for (Expr e : unassignedConjuncts) {
if (e.isConstant()) {
unassignedConstantConjuncts.add(e);
}
}
// Non constant conjuncts
unassignedConjuncts.removeAll(unassignedConstantConjuncts);
migrateNonconstantConjuncts(inlineViewRef, unassignedConjuncts, analyzer);
migrateConstantConjuncts(inlineViewRef, unassignedConstantConjuncts);
}
/**
* For handling non-constant conjuncts. This only substitute conjunct's tuple with InlineView's
* and register it in InlineView's Analyzer, whcih will be assigned by the next planning.
*
* @param inlineViewRef
* @param unassignedConjuncts
* @param analyzer
* @throws AnalysisException
*/
private void migrateNonconstantConjuncts(
InlineViewRef inlineViewRef, List<Expr> unassignedConjuncts, Analyzer analyzer) throws AnalysisException {
final List<Expr> preds = Lists.newArrayList();
for (Expr e : unassignedConjuncts) {
if (analyzer.canEvalPredicate(inlineViewRef.getId().asList(), e)) {
preds.add(e);
}
}
final List<Expr> pushDownFailedPredicates = Lists.newArrayList();
final List<Expr> viewPredicates = getPushDownPredicatesForInlineView(
inlineViewRef, preds, analyzer, pushDownFailedPredicates);
if (viewPredicates.size() <= 0) {
// mark (fully resolve) slots referenced by unassigned conjuncts as
// materialized
List<Expr> substUnassigned = Expr.substituteList(unassignedConjuncts,
inlineViewRef.getBaseTblSmap(), analyzer, false);
analyzer.materializeSlots(substUnassigned);
return;
}
preds.removeAll(pushDownFailedPredicates);
unassignedConjuncts.removeAll(preds);
unassignedConjuncts.addAll(pushDownFailedPredicates);
// Remove unregistered predicates that reference the same slot on
// both sides (e.g. a = a). Such predicates have been generated from slot
// equivalences and may incorrectly reject rows with nulls (IMPALA-1412/IMPALA-2643).
// TODO(zc)
final Predicate<Expr> isIdentityPredicate = new Predicate<Expr>() {
@Override
public boolean apply(Expr expr) {
return org.apache.doris.analysis.Predicate.isEquivalencePredicate(expr)
&& ((BinaryPredicate) expr).isInferred()
&& expr.getChild(0).equals(expr.getChild(1));
}
};
Iterables.removeIf(viewPredicates, isIdentityPredicate);
// Migrate the conjuncts by marking the original ones as assigned, and
// re-registering the substituted ones with new ids.
analyzer.markConjunctsAssigned(preds);
// Unset the On-clause flag of the migrated conjuncts because the migrated conjuncts
// apply to the post-join/agg/analytic result of the inline view.
for (Expr e : viewPredicates) {
e.setIsOnClauseConjunct(false);
}
inlineViewRef.getAnalyzer().registerConjuncts(viewPredicates, inlineViewRef.getAllTupleIds());
// mark (fully resolve) slots referenced by remaining unassigned conjuncts as
// materialized
List<Expr> substUnassigned = Expr.substituteList(unassignedConjuncts,
inlineViewRef.getBaseTblSmap(), analyzer, false);
analyzer.materializeSlots(substUnassigned);
}
/**
* For handling constant conjuncts when migrating conjuncts to InlineViews. Constant conjuncts
* should be assigned to query block from top to bottom, it will try to push down constant conjuncts.
*
* @param inlineViewRef
* @param conjuncts
* @throws AnalysisException
*/
private void migrateConstantConjuncts(InlineViewRef inlineViewRef, List<Expr> conjuncts) throws AnalysisException {
if (conjuncts.isEmpty()) {
return;
}
if (!canMigrateConjuncts(inlineViewRef)) {
return;
}
final QueryStmt stmt = inlineViewRef.getViewStmt();
final Analyzer viewAnalyzer = inlineViewRef.getAnalyzer();
viewAnalyzer.markConjunctsAssigned(conjuncts);
// even if the conjuncts are constant, they may contains slotRef
// for example: case when slotRef is null then 0 else 1
// we need substitute the conjuncts using inlineViewRef's analyzer
// otherwise, when analyzing the conjunct in the inline view
// the analyzer is not able to find the column because it comes from outside
List<Expr> newConjuncts =
Expr.substituteList(conjuncts, inlineViewRef.getSmap(), viewAnalyzer, false);
if (stmt instanceof SelectStmt) {
final SelectStmt select = (SelectStmt) stmt;
if (select.getAggInfo() != null) {
viewAnalyzer.registerConjuncts(newConjuncts, select.getAggInfo().getOutputTupleId().asList());
} else if (select.getTableRefs().size() > 0) {
for (int i = select.getTableRefs().size() - 1; i >= 0; i--) {
viewAnalyzer.registerConjuncts(newConjuncts,
select.getTableRefs().get(i).getDesc().getId().asList());
newConjuncts = cloneExprs(newConjuncts);
}
} else {
for (Expr e : conjuncts) {
viewAnalyzer.registerMigrateFailedConjuncts(inlineViewRef, e);
}
}
} else {
Preconditions.checkArgument(stmt instanceof SetOperationStmt);
final SetOperationStmt union = (SetOperationStmt) stmt;
viewAnalyzer.registerConjuncts(newConjuncts, union.getTupleId().asList());
}
}
private List<Expr> cloneExprs(List<Expr> candidates) {
final List<Expr> clones = Lists.newArrayList();
candidates.forEach(expr -> clones.add(expr.clone()));
return clones;
}
/**
* Get predicates can be migrated into an inline view.
*/
private List<Expr> getPushDownPredicatesForInlineView(
InlineViewRef inlineViewRef, List<Expr> viewPredicates,
Analyzer analyzer, List<Expr> pushDownFailedPredicates) {
// TODO chenhao, remove evaluateOrderBy when SubQuery's default limit is removed.
final List<Expr> pushDownPredicates = Lists.newArrayList();
if (inlineViewRef.getViewStmt().evaluateOrderBy()
|| inlineViewRef.getViewStmt().hasLimit()
|| inlineViewRef.getViewStmt().hasOffset()) {
return pushDownPredicates;
}
// UnionNode will handle predicates and assigns predicates to it's children.
final List<Expr> candicatePredicates =
Expr.substituteList(viewPredicates, inlineViewRef.getSmap(), analyzer, false);
if (inlineViewRef.getViewStmt() instanceof SetOperationStmt) {
final SetOperationStmt setOperationStmt = (SetOperationStmt) inlineViewRef.getViewStmt();
for (int i = 0; i < candicatePredicates.size(); i++) {
final Expr predicate = candicatePredicates.get(i);
if (predicate.isBound(setOperationStmt.getTupleId())) {
pushDownPredicates.add(predicate);
} else {
pushDownFailedPredicates.add(viewPredicates.get(i));
}
}
return pushDownPredicates;
}
final SelectStmt selectStmt = (SelectStmt) inlineViewRef.getViewStmt();
if (selectStmt.hasAnalyticInfo()) {
pushDownPredicates.addAll(getWindowsPushDownPredicates(candicatePredicates, viewPredicates,
selectStmt.getAnalyticInfo(), pushDownFailedPredicates));
} else {
pushDownPredicates.addAll(candicatePredicates);
}
return pushDownPredicates;
}
/**
* Get predicates which can be pushed down past Windows.
*
* @param predicates
* @param viewPredicates
* @param analyticInfo
* @param pushDownFailedPredicates
* @return
*/
private List<Expr> getWindowsPushDownPredicates(
List<Expr> predicates, List<Expr> viewPredicates,
AnalyticInfo analyticInfo, List<Expr> pushDownFailedPredicates) {
final List<Expr> pushDownPredicates = Lists.newArrayList();
final List<Expr> partitionExprs = analyticInfo.getCommonPartitionExprs();
final List<SlotId> partitionByIds = Lists.newArrayList();
for (Expr expr : partitionExprs) {
if (expr instanceof SlotRef) {
final SlotRef slotRef = (SlotRef) expr;
partitionByIds.add(slotRef.getSlotId());
}
}
if (partitionByIds.size() <= 0) {
return pushDownPredicates;
}
for (int i = 0; i < predicates.size(); i++) {
final Expr predicate = predicates.get(i);
if (predicate.isBound(partitionByIds)) {
pushDownPredicates.add(predicate);
} else {
pushDownFailedPredicates.add(viewPredicates.get(i));
}
}
return pushDownPredicates;
}
/**
* Checks if conjuncts can be migrated into an inline view.
*/
private boolean canMigrateConjuncts(InlineViewRef inlineViewRef) {
// TODO chenhao, remove evaluateOrderBy when SubQuery's default limit is removed.
return inlineViewRef.getViewStmt().evaluateOrderBy() ? false :
(!inlineViewRef.getViewStmt().hasLimit()
&& !inlineViewRef.getViewStmt().hasOffset()
&& (!(inlineViewRef.getViewStmt() instanceof SelectStmt)
|| !((SelectStmt) inlineViewRef.getViewStmt()).hasAnalyticInfo()));
}
/**
* Create node for scanning all data files of a particular table.
*/
private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt)
throws UserException {
SessionVariable sv = ConnectContext.get().getSessionVariable();
ScanNode scanNode;
switch (tblRef.getTable().getType()) {
case OLAP:
case MATERIALIZED_VIEW:
OlapScanNode olapNode = new OlapScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
"OlapScanNode");
olapNode.setForceOpenPreAgg(tblRef.isForcePreAggOpened());
olapNode.setSampleTabletIds(tblRef.getSampleTabletIds());
olapNode.setTableSample(tblRef.getTableSample());
scanNode = olapNode;
break;
case ODBC:
scanNode = new OdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (OdbcTable) tblRef.getTable());
break;
case MYSQL:
scanNode = new MysqlScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (MysqlTable) tblRef.getTable());
break;
case SCHEMA:
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
tblRef.getDesc().getTable().getName())) {
scanNode = new BackendPartitionedSchemaScanNode(ctx.getNextNodeId(), tblRef.getTable(),
tblRef.getDesc(), null, null, null);
} else {
scanNode = new SchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc(), null, null, null);
}
break;
case BROKER:
throw new RuntimeException("Broker external table is not supported, try to use table function please");
case ELASTICSEARCH:
scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc());
break;
case HIVE:
throw new RuntimeException("Hive external table is not supported, try to use hive catalog please");
case ICEBERG:
throw new RuntimeException("Iceberg external table is not supported, use iceberg catalog please");
case JDBC:
scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), false);
break;
case TABLE_VALUED_FUNCTION:
scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId(), sv);
break;
case HMS_EXTERNAL_TABLE:
TableIf table = tblRef.getDesc().getTable();
switch (((HMSExternalTable) table).getDlaType()) {
case HUDI:
// Old planner does not support hudi incremental read,
// so just pass Optional.empty() to HudiScanNode
if (tblRef.getScanParams() != null) {
throw new UserException("Hudi incremental read is not supported, "
+ "please set enable_nereids_planner = true to enable new optimizer");
}
scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true,
Optional.empty(), Optional.empty(), sv);
break;
case ICEBERG:
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv);
break;
case HIVE:
scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv);
((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample());
break;
default:
throw new UserException("Not supported table type" + table.getType());
}
break;
case ICEBERG_EXTERNAL_TABLE:
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv);
break;
case PAIMON_EXTERNAL_TABLE:
scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv);
break;
case TRINO_CONNECTOR_EXTERNAL_TABLE:
scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv);
break;
case MAX_COMPUTE_EXTERNAL_TABLE:
scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv);
break;
case ES_EXTERNAL_TABLE:
scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case JDBC_EXTERNAL_TABLE:
scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case LAKESOUl_EXTERNAL_TABLE:
scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv);
break;
case TEST_EXTERNAL_TABLE:
scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), tblRef.getDesc());
break;
default:
throw new UserException("Not supported table type: " + tblRef.getTable().getType());
}
if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode
|| scanNode instanceof OdbcScanNode || scanNode instanceof JdbcScanNode
|| scanNode instanceof FileQueryScanNode || scanNode instanceof MysqlScanNode) {
if (analyzer.enableInferPredicate()) {
PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), analyzer);
}
scanNode.setSortColumn(tblRef.getSortColumn());
}
scanNodes.add(scanNode);
// now we put the selectStmtToScanNodes's init before the scanNode.init
List<ScanNode> scanNodeList = selectStmtToScanNodes.computeIfAbsent(
selectStmt.getAnalyzer(), k -> Lists.newArrayList());
scanNodeList.add(scanNode);
scanNode.init(analyzer);
return scanNode;
}
/**
* Return join conjuncts that can be used for hash table lookups. - for inner joins, those are equi-join predicates
* in which one side is fully bound by lhsIds and the other by rhs' id; - for outer joins: same type of conjuncts as
* inner joins, but only from the JOIN clause Returns the original form in 'joinPredicates'.
*/
private void getHashLookupJoinConjuncts(Analyzer analyzer, PlanNode left, PlanNode right,
List<Expr> joinConjuncts,
Reference<String> errMsg, JoinOperator op) {
joinConjuncts.clear();
final List<TupleId> lhsIds = left.getTblRefIds();
final List<TupleId> rhsIds = right.getTblRefIds();
List<Expr> candidates;
candidates = analyzer.getEqJoinConjuncts(lhsIds, rhsIds);
if (candidates == null) {
if (op.isOuterJoin() || op.isSemiAntiJoin()) {
errMsg.setRef("non-equal " + op.toString() + " is not supported");
LOG.warn(errMsg);
}
if (LOG.isDebugEnabled()) {
LOG.debug("no candidates for join.");
}
return;
}
for (Expr e : candidates) {
// Ignore predicate if one of its children is a constant.
if (e.getChild(0).isLiteral() || e.getChild(1).isLiteral()) {
if (LOG.isDebugEnabled()) {
LOG.debug("double is constant.");
}
continue;
}
/**
* The left and right child of origin predicates need to be swap sometimes.
* Case A:
* select * from t1 join t2 on t2.id=t1.id
* The left plan node is t1 and the right plan node is t2.
* The left child of origin predicate is t2.id and the right child of origin predicate is t1.id.
* In this situation, the children of predicate need to be swap => t1.id=t2.id.
*/
Expr rhsExpr = null;
if (e.getChild(0).isBoundByTupleIds(rhsIds)) {
rhsExpr = e.getChild(0);
} else {
Preconditions.checkState(e.getChild(1).isBoundByTupleIds(rhsIds));
rhsExpr = e.getChild(1);
}
Expr lhsExpr = null;
if (e.getChild(1).isBoundByTupleIds(lhsIds)) {
lhsExpr = e.getChild(1);
} else if (e.getChild(0).isBoundByTupleIds(lhsIds)) {
lhsExpr = e.getChild(0);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("not an equi-join condition between lhsIds and rhsId");
}
continue;
}
Preconditions.checkState(lhsExpr != rhsExpr);
Preconditions.checkState(e instanceof BinaryPredicate);
// The new predicate id must same as the origin predicate.
// This expr id is used to mark as assigned in the future.
BinaryPredicate newEqJoinPredicate = (BinaryPredicate) e.clone();
newEqJoinPredicate.setChild(0, lhsExpr);
newEqJoinPredicate.setChild(1, rhsExpr);
joinConjuncts.add(newEqJoinPredicate);
}
}
private PlanNode createJoinNodeBase(Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef innerRef)
throws UserException {
materializeTableResultForCrossJoinOrCountStar(innerRef, analyzer);
List<Expr> eqJoinConjuncts = Lists.newArrayList();
Reference<String> errMsg = new Reference<String>();
// get eq join predicates for the TableRefs' ids (not the PlanNodes' ids, which
// are materialized)
getHashLookupJoinConjuncts(analyzer, outer, inner,
eqJoinConjuncts, errMsg, innerRef.getJoinOp());
analyzer.markConjunctsAssigned(eqJoinConjuncts);
List<Expr> ojConjuncts = Lists.newArrayList();
if (innerRef.getJoinOp().isOuterJoin()) {
// Also assign conjuncts from On clause. All remaining unassigned conjuncts
// that can be evaluated by this join are assigned in createSelectPlan().
ojConjuncts = analyzer.getUnassignedOjConjuncts(innerRef);
} else if (innerRef.getJoinOp().isAntiJoinNullAware()) {
ojConjuncts = analyzer.getUnassignedAntiJoinNullAwareConjuncts(innerRef);
} else if (innerRef.getJoinOp().isSemiOrAntiJoinNoNullAware()) {
ojConjuncts = analyzer.getUnassignedSemiAntiJoinNoNullAwareConjuncts(innerRef);
}
analyzer.markConjunctsAssigned(ojConjuncts);
if (eqJoinConjuncts.isEmpty()) {
NestedLoopJoinNode result =
new NestedLoopJoinNode(ctx.getNextNodeId(), outer, inner, innerRef);
List<Expr> joinConjuncts = Lists.newArrayList(eqJoinConjuncts);
joinConjuncts.addAll(ojConjuncts);
result.setJoinConjuncts(joinConjuncts);
result.addConjuncts(analyzer.getMarkConjuncts(innerRef));
result.init(analyzer);
result.setOutputLeftSideOnly(innerRef.isInBitmap() && joinConjuncts.isEmpty());
return result;
}
HashJoinNode result = new HashJoinNode(ctx.getNextNodeId(), outer, inner,
innerRef, eqJoinConjuncts, ojConjuncts);
result.addConjuncts(analyzer.getMarkConjuncts(innerRef));
result.init(analyzer);
return result;
}
/*
for joinreorder
*/
public PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef innerRef)
throws UserException {
return createJoinNodeBase(analyzer, outer, inner, innerRef);
}
/**
* Creates a new node to join outer with inner. Collects and assigns join conjunct
* as well as regular conjuncts. Calls init() on the new join node.
* Throws if the JoinNode.init() fails.
*/
private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef innerRef,
SelectStmt selectStmt) throws UserException {
// the rows coming from the build node only need to have space for the tuple
// materialized by that node
PlanNode inner = createTableRefNode(analyzer, innerRef, selectStmt);
return createJoinNodeBase(analyzer, outer, inner, innerRef);
}
/**
* Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
* CollectionTableRef or an InlineViewRef.
* <p>
* 'fastPartitionKeyScans' indicates whether to try to produce the slots with
* metadata instead of table scans. Only applicable to BaseTableRef which is also
* an HDFS table.
* <p>
* Throws if a PlanNode.init() failed or if planning of the given
* table ref is not implemented.
*/
private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt)
throws UserException {
PlanNode scanNode = null;
if (tblRef instanceof BaseTableRef || tblRef instanceof TableValuedFunctionRef) {
scanNode = createScanNode(analyzer, tblRef, selectStmt);
}
if (tblRef instanceof InlineViewRef) {
InlineViewRef inlineViewRef = (InlineViewRef) tblRef;
scanNode = createInlineViewPlan(analyzer, inlineViewRef);
Analyzer viewAnalyzer = inlineViewRef.getAnalyzer();
Set<Expr> exprs = viewAnalyzer.findMigrateFailedConjuncts(inlineViewRef);
if (CollectionUtils.isNotEmpty(exprs)) {
for (Expr expr : exprs) {
scanNode.addConjunct(expr);
}
}
}
if (scanNode == null) {
throw new UserException("unknown TableRef node");
}
List<LateralViewRef> lateralViewRefs = tblRef.getLateralViewRefs();
if (lateralViewRefs == null || lateralViewRefs.size() == 0) {
return scanNode;
}
return createTableFunctionNode(analyzer, scanNode, lateralViewRefs, selectStmt);
}
private PlanNode createTableFunctionNode(Analyzer analyzer, PlanNode inputNode,
List<LateralViewRef> lateralViewRefs, SelectStmt selectStmt)
throws UserException {
Preconditions.checkNotNull(lateralViewRefs);
Preconditions.checkState(lateralViewRefs.size() > 0);
TableFunctionNode tableFunctionNode = new TableFunctionNode(ctx.getNextNodeId(), inputNode,
lateralViewRefs);
tableFunctionNode.init(analyzer);
tableFunctionNode.projectSlots(analyzer, selectStmt);
inputNode = tableFunctionNode;
return inputNode;
}
/**
* Create a plan tree corresponding to 'setOperands' for the given SetOperationStmt.
* The individual operands' plan trees are attached to a single SetOperationNode.
* If result is not null, it is expected to contain the plan for the
* distinct portion of the given SetOperationStmt. The result is then added
* as a child of the returned SetOperationNode.
*/
private SetOperationNode createSetOperationPlan(
Analyzer analyzer, SetOperationStmt setOperationStmt, List<SetOperationStmt.SetOperand> setOperands,
PlanNode result, long defaultOrderByLimit, long sqlSelectLimit)
throws UserException, AnalysisException {
SetOperationNode setOpNode;
SetOperationStmt.Operation operation = null;
for (SetOperationStmt.SetOperand setOperand : setOperands) {
if (setOperand.getOperation() != null) {
if (operation == null) {
operation = setOperand.getOperation();
}
Preconditions.checkState(operation == setOperand.getOperation(), "can not support mixed set "
+ "operations at here");
}
}
switch (operation) {
case UNION:
setOpNode = new UnionNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
setOperationStmt.getSetOpsResultExprs(), false);
break;
case INTERSECT:
setOpNode = new IntersectNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
setOperationStmt.getSetOpsResultExprs(), false);
break;
case EXCEPT:
setOpNode = new ExceptNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
setOperationStmt.getSetOpsResultExprs(), false);
break;
default:
throw new AnalysisException("not supported set operations: " + operation);
}
// If it is a union or other same operation, there are only two possibilities,
// one is the root node, and the other is a distinct node in front, so the setOperationDistinctPlan will
// be aggregate node, if this is a mixed operation
// e.g. :
// a union b -> result == null
// a union b union all c -> result == null -> result == AggregationNode
// a union all b except c -> result == null -> result == UnionNode
// a union b except c -> result == null -> result == AggregationNode
if (result != null && result instanceof SetOperationNode) {
Preconditions.checkState(!result.getClass().equals(setOpNode.getClass()));
setOpNode.addChild(result, setOperationStmt.getResultExprs());
} else if (result != null) {
Preconditions.checkState(setOperationStmt.hasDistinctOps());
Preconditions.checkState(result instanceof AggregationNode);
setOpNode.addChild(result,
setOperationStmt.getDistinctAggInfo().getGroupingExprs());
}
for (SetOperationStmt.SetOperand op : setOperands) {
if (op.getAnalyzer().hasEmptyResultSet()) {
unmarkCollectionSlots(op.getQueryStmt());
continue;
}
QueryStmt queryStmt = op.getQueryStmt();
if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (selectStmt.getTableRefs().isEmpty() && setOpNode instanceof UnionNode) {
setOpNode.addConstExprList(selectStmt.getResultExprs());
continue;
}
}
PlanNode opPlan = createQueryPlan(queryStmt, op.getAnalyzer(), defaultOrderByLimit, sqlSelectLimit);
// There may still be unassigned conjuncts if the operand has an order by + limit.
// Place them into a SelectNode on top of the operand's plan.
opPlan = addUnassignedConjuncts(analyzer, opPlan.getTupleIds(), opPlan);
if (opPlan instanceof EmptySetNode) {
continue;
}
setOpNode.addChild(opPlan, op.getQueryStmt().getResultExprs());
}
setOpNode.init(analyzer);
return setOpNode;
}
/**
* Returns plan tree for unionStmt:
* - distinctOperands' plan trees are collected in a single UnionNode
* and duplicates removed via distinct aggregation
* - the output of that plus the allOperands' plan trees are collected in
* another UnionNode which materializes the result of unionStmt
* - if any of the union operands contains analytic exprs, we avoid pushing
* predicates directly into the operands and instead evaluate them
* *after* the final UnionNode (see createInlineViewPlan() for the reasoning)
* TODO: optimize this by still pushing predicates into the union operands
* that don't contain analytic exprs and evaluating the conjuncts in Select
* directly above the AnalyticEvalNodes
* TODO: Simplify the plan of unions with empty operands using an empty set node.
* TODO: Simplify the plan of unions with only a single non-empty operand to not
* use a union node (this is tricky because a union materializes a new tuple).
*/
private PlanNode createSetOperationPlan(
SetOperationStmt setOperationStmt, Analyzer analyzer, long defaultOrderByLimit, long sqlSelectLimit)
throws UserException, AnalysisException {
// TODO(zc): get unassigned conjuncts
// List<Expr> conjuncts =
// analyzer.getUnassignedConjuncts(unionStmt.getTupleId().asList(), false);
List<Expr> conjuncts = analyzer.getUnassignedConjuncts(setOperationStmt.getTupleId().asList());
// TODO chenhao
// Because Conjuncts can't be assigned to UnionNode and Palo's fe can't evaluate conjuncts,
// it needs to add SelectNode as UnionNode's parent, when UnionStmt's Ops contains constant
// Select.
boolean hasConstantOp = false;
if (!setOperationStmt.hasAnalyticExprs()) {
// Turn unassigned predicates for unionStmt's tupleId_ into predicates for
// the individual operands.
// Do this prior to creating the operands' plan trees so they get a chance to
// pick up propagated predicates.
for (SetOperationStmt.SetOperand op : setOperationStmt.getOperands()) {
List<Expr> opConjuncts =
Expr.substituteList(conjuncts, op.getSmap(), analyzer, false);
boolean selectHasTableRef = true;
final QueryStmt queryStmt = op.getQueryStmt();
// Check whether UnionOperand is constant Select.
if (queryStmt instanceof SelectStmt) {
final SelectStmt selectStmt = (SelectStmt) queryStmt;
if (selectStmt.getTableRefs().isEmpty()) {
selectHasTableRef = false;
hasConstantOp = !selectHasTableRef;
}
}
// Forbid to register Conjuncts with SelectStmt' tuple when Select is constant
if ((queryStmt instanceof SelectStmt) && selectHasTableRef) {
final SelectStmt select = (SelectStmt) queryStmt;
// if there is an agg node, we need register the constant conjuncts on agg node's tuple
// this is consistent with migrateConstantConjuncts()
if (select.getAggInfo() != null) {
Map<Boolean, List<Expr>> splittedConjuncts = opConjuncts.stream()
.collect(Collectors.partitioningBy(expr -> expr.isConstant()));
op.getAnalyzer().registerConjuncts(splittedConjuncts.get(true),
select.getAggInfo().getOutputTupleId().asList());
op.getAnalyzer().registerConjuncts(splittedConjuncts.get(false),
select.getTableRefIds());
} else {
op.getAnalyzer().registerConjuncts(opConjuncts, select.getTableRefIds());
}
} else if (queryStmt instanceof SetOperationStmt) {
final SetOperationStmt subSetOp = (SetOperationStmt) queryStmt;
op.getAnalyzer().registerConjuncts(opConjuncts, subSetOp.getTupleId().asList());
} else {
if (selectHasTableRef) {
Preconditions.checkArgument(false);
}
}
}
if (!hasConstantOp) {
analyzer.markConjunctsAssigned(conjuncts);
}
} else {
// mark slots referenced by the yet-unassigned conjuncts
analyzer.materializeSlots(conjuncts);
}
// mark slots after predicate propagation but prior to plan tree generation
setOperationStmt.materializeRequiredSlots(analyzer);
PlanNode result = null;
SetOperationStmt.Operation operation = null;
List<SetOperationStmt.SetOperand> partialOperands = new ArrayList<>();
// create plan for a union b intersect c except b to three fragments
// 3:[2:[1:[a union b] intersect c] except c]
for (SetOperationStmt.SetOperand op : setOperationStmt.getOperands()) {
if (op.getOperation() == null) {
partialOperands.add(op);
} else if (operation == null && op.getOperation() != null) {
operation = op.getOperation();
partialOperands.add(op);
} else if (operation != null && op.getOperation() == operation) {
partialOperands.add(op);
} else if (operation != null && op.getOperation() != operation) {
if (partialOperands.size() > 0) {
if (operation == SetOperationStmt.Operation.INTERSECT
|| operation == SetOperationStmt.Operation.EXCEPT) {
result = createSetOperationPlan(analyzer, setOperationStmt, partialOperands, result,
defaultOrderByLimit, sqlSelectLimit);
} else {
result = createUnionPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result,
defaultOrderByLimit, sqlSelectLimit);
}
partialOperands.clear();
}
operation = op.getOperation();
partialOperands.add(op);
} else {
throw new AnalysisException("invalid set operation statement.");
}
}
if (partialOperands.size() > 0) {
if (operation == SetOperationStmt.Operation.INTERSECT
|| operation == SetOperationStmt.Operation.EXCEPT) {
result = createSetOperationPlan(analyzer, setOperationStmt, partialOperands, result,
defaultOrderByLimit, sqlSelectLimit);
} else {
result = createUnionPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result,
defaultOrderByLimit, sqlSelectLimit);
}
}
if (setOperationStmt.hasAnalyticExprs() || hasConstantOp) {
result = addUnassignedConjuncts(
analyzer, setOperationStmt.getTupleId().asList(), result);
}
return result;
}
// create the partial plan, or example: a union b intersect c
// the first partial plan is a union b as a result d,
// the second partial plan d intersect c
// notice that when query is a union b the union operation is in right-hand child(b),
// while the left-hand child(a)'s operation is null
private PlanNode createUnionPartialSetOperationPlan(Analyzer analyzer, SetOperationStmt setOperationStmt,
List<SetOperationStmt.SetOperand> setOperands,
PlanNode result, long defaultOrderByLimit, long sqlSelectLimit)
throws UserException {
boolean hasDistinctOps = false;
boolean hasAllOps = false;
List<SetOperationStmt.SetOperand> allOps = new ArrayList<>();
List<SetOperationStmt.SetOperand> distinctOps = new ArrayList<>();
for (SetOperationStmt.SetOperand op : setOperands) {
if (op.getQualifier() == SetOperationStmt.Qualifier.DISTINCT) {
hasDistinctOps = true;
distinctOps.add(op);
}
if (op.getQualifier() == SetOperationStmt.Qualifier.ALL) {
hasAllOps = true;
allOps.add(op);
}
}
// create DISTINCT tree
if (hasDistinctOps) {
result = createSetOperationPlan(
analyzer, setOperationStmt, distinctOps, result, defaultOrderByLimit, sqlSelectLimit);
result = new AggregationNode(ctx.getNextNodeId(), result,
setOperationStmt.getDistinctAggInfo());
result.init(analyzer);
}
// create ALL tree
if (hasAllOps) {
result = createSetOperationPlan(analyzer, setOperationStmt, allOps,
result, defaultOrderByLimit, sqlSelectLimit);
}
return result;
}
private PlanNode createAssertRowCountNode(PlanNode input, AssertNumRowsElement assertNumRowsElement,
Analyzer analyzer) throws UserException {
AssertNumRowsNode root = new AssertNumRowsNode(ctx.getNextNodeId(), input, assertNumRowsElement);
root.init(analyzer);
return root;
}
/**
* According to the way to materialize slots from top to bottom, Materialization will prune columns
* which are not referenced by Statement outside. However, in some cases, in order to ensure The
* correct execution, it is necessary to materialize the slots that are not needed by Statement
* outside.
*
* @param tblRef
* @param analyzer
*/
private void materializeTableResultForCrossJoinOrCountStar(TableRef tblRef, Analyzer analyzer) {
if (tblRef instanceof BaseTableRef || tblRef instanceof TableValuedFunctionRef) {
materializeSlotForEmptyMaterializedTableRef(tblRef, analyzer);
} else if (tblRef instanceof InlineViewRef) {
materializeInlineViewResultExprForCrossJoinOrCountStar((InlineViewRef) tblRef, analyzer);
} else {
Preconditions.checkArgument(false);
}
}
/**
* When materialized table ref is a empty tbl ref, the planner should add a mini column for this tuple.
* There are situations:
* 1. The tbl ref is empty, such as select a from (select 'c1' a from test) t;
* Inner tuple: tuple 0 without slot
* 2. The materialized slot in tbl ref is empty, such as select a from (select 'c1' a, k1 from test) t;
* Inner tuple: tuple 0 with a not materialized slot k1
* In the above two cases, it is necessary to add a mini column to the inner tuple
* to ensure that the number of rows in the inner query result is the number of rows in the table.
* 2. count star: select count(*) from t;
* <p>
* After this function, the inner tuple is following:
* case1. tuple 0: slot<k1> materialized true (new slot)
* case2. tuple 0: slot<k1> materialized true (changed)
*
* @param tblRef
* @param analyzer
*/
private void materializeSlotForEmptyMaterializedTableRef(TableRef tblRef, Analyzer analyzer) {
if (tblRef.getDesc().getMaterializedSlots().isEmpty()) {
Column minimuColumn = null;
for (Column col : tblRef.getTable().getBaseSchema()) {
if (minimuColumn == null || col.getDataType().getSlotSize() < minimuColumn
.getDataType().getSlotSize()) {
minimuColumn = col;
}
}
if (minimuColumn != null) {
SlotDescriptor slot = tblRef.getDesc().getColumnSlot(minimuColumn.getName());
if (slot != null) {
slot.setIsMaterialized(true);
} else {
slot = analyzer.getDescTbl().addSlotDescriptor(tblRef.getDesc());
slot.setColumn(minimuColumn);
slot.setIsMaterialized(true);
slot.setIsNullable(minimuColumn.isAllowNull());
}
}
}
}
/**
* Materialize InlineViewRef result'exprs for Cross Join or Count Star
* For example: select count(*) from (select k1+1 ,k2 ,k3 from base) tmp
* Columns: k1 tinyint, k2 bigint, k3 double
* Input: tmp, analyzer
* Output:
* Materialized slot: k1 true, k2 false, k3 false
* Materialized tuple: base
*
* @param inlineView
* @param analyzer
*/
private void materializeInlineViewResultExprForCrossJoinOrCountStar(InlineViewRef inlineView, Analyzer analyzer) {
final List<Expr> baseResultExprs = inlineView.getViewStmt().getBaseTblResultExprs();
if (baseResultExprs.size() <= 0) {
return;
}
Expr resultExprSelected = null;
int resultExprSelectedSize = 0;
// check whether inlineView contains materialized result expr
for (Expr e : baseResultExprs) {
final List<SlotId> slotIds = Lists.newArrayList();
e.getIds(null, slotIds);
boolean exprIsMaterialized = true;
int exprSize = 0;
for (SlotId id : slotIds) {
final SlotDescriptor slot = analyzer.getDescTbl().getSlotDesc(id);
if (!slot.isMaterialized()) {
exprIsMaterialized = false;
}
exprSize += slot.getType().getSlotSize();
}
// Result Expr contains materialized expr, return
if (exprIsMaterialized) {
return;
}
if (resultExprSelected == null || exprSize < resultExprSelectedSize) {
resultExprSelectedSize = exprSize;
resultExprSelected = e;
}
}
// materialize slots which expr refer and It's total size is smallest
final List<SlotId> slotIds = Lists.newArrayList();
final List<TupleId> tupleIds = Lists.newArrayList();
resultExprSelected.getIds(tupleIds, slotIds);
for (SlotId id : slotIds) {
final SlotDescriptor slot = analyzer.getDescTbl().getSlotDesc(id);
slot.setIsMaterialized(true);
slot.materializeSrcExpr();
}
for (TupleId id : tupleIds) {
final TupleDescriptor tuple = analyzer.getDescTbl().getTupleDesc(id);
tuple.setIsMaterialized(true);
}
}
/**
------------------------------------------------------------------------------
*/
/**
* Push down predicates rules
*/
/**
* Entrance for push-down rules, it will execute possible push-down rules from top to down
* and the planner will be responsible for assigning all predicates to PlanNode.
*/
private void pushDownPredicates(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
// Push down predicates according to the semantic requirements of SQL.
pushDownPredicatesPastSort(analyzer, stmt);
pushDownPredicatesPastWindows(analyzer, stmt);
pushDownPredicatesPastAggregation(analyzer, stmt);
}
private void pushDownPredicatesPastSort(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
// TODO chenhao, remove isEvaluateOrderBy when SubQuery's default limit is removed.
if (stmt.evaluateOrderBy() || stmt.getLimit() >= 0 || stmt.getOffset() > 0 || stmt.getSortInfo() == null) {
return;
}
final List<Expr> predicates = getBoundPredicates(analyzer, stmt.getSortInfo().getSortTupleDescriptor());
if (predicates.size() <= 0) {
return;
}
final List<Expr> pushDownPredicates = getPredicatesReplacedSlotWithSourceExpr(predicates, analyzer);
if (pushDownPredicates.size() <= 0) {
return;
}
// Push down predicates to sort's child until they are assigned successfully.
if (putPredicatesOnWindows(stmt, analyzer, pushDownPredicates)) {
return;
}
if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
return;
}
putPredicatesOnTargetTupleIds(stmt.getTableRefIds(), analyzer, predicates);
}
private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
if (analyticInfo == null || analyticInfo.getCommonPartitionExprs().size() == 0) {
return;
}
final List<Expr> predicates = getBoundPredicates(analyzer, analyticInfo.getOutputTupleDesc());
if (predicates.size() <= 0) {
return;
}
// Push down predicates to Windows' child until they are assigned successfully.
final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer, stmt);
if (pushDownPredicates.size() <= 0) {
return;
}
if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
return;
}
putPredicatesOnTargetTupleIds(stmt.getTableRefIds(), analyzer, predicates);
}
/**
* Push down predicates past one phase aggregation.
*
* @param aggregateInfo one phase aggregate info. Either first phase or second phase
* @param analyzer current statement's analyzer
* @param stmt current stmt
* @param targetTupleIds target tuple to register.
* Table tuple ids when process first phase agg.
* First aggregate's tuple id when process second phase agg.
* @throws AnalysisException throw exception when register predicate to tuple failed
*/
private void pushDownPredicatesPastAggregationOnePhase(AggregateInfo aggregateInfo,
Analyzer analyzer, SelectStmt stmt, List<TupleId> targetTupleIds) throws AnalysisException {
if (aggregateInfo == null || aggregateInfo.getGroupingExprs().isEmpty()) {
return;
}
// The output of the 1st phase agg is the 1st phase intermediate.
// see createSecondPhaseAggInfo method
final List<Expr> predicates = getBoundPredicates(analyzer,
aggregateInfo.getSecondPhaseDistinctAggInfo() != null
? aggregateInfo.getIntermediateTupleDesc()
: aggregateInfo.getOutputTupleDesc());
if (predicates.isEmpty()) {
return;
}
// Push down predicates to aggregation's child until they are assigned successfully.
final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer, stmt);
if (CollectionUtils.isEmpty(pushDownPredicates)) {
return;
}
putPredicatesOnTargetTupleIds(targetTupleIds, analyzer, pushDownPredicates);
}
/**
* Push down predicates past whole aggregate stage. Include first phase and second phase.
*
* @param analyzer current statement's analyzer
* @param stmt current stmt
* @throws AnalysisException throw exception when register predicate to tuple failed
*/
private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
final AggregateInfo firstPhaseAggInfo = stmt.getAggInfo();
if (firstPhaseAggInfo == null) {
return;
}
final AggregateInfo secondPhaseAggInfo = firstPhaseAggInfo.getSecondPhaseDistinctAggInfo();
// The output of the 1st phase agg is the 1st phase intermediate.
// see createSecondPhaseAggInfo method
final List<TupleId> firstPhaseTupleIds = Lists.newArrayList(
secondPhaseAggInfo != null ? firstPhaseAggInfo.getIntermediateTupleId()
: firstPhaseAggInfo.getOutputTupleId());
pushDownPredicatesPastAggregationOnePhase(secondPhaseAggInfo, analyzer, stmt, firstPhaseTupleIds);
pushDownPredicatesPastAggregationOnePhase(firstPhaseAggInfo, analyzer, stmt, stmt.getTableRefIds());
}
private List<Expr> getPredicatesBoundedByGroupbysSourceExpr(
List<Expr> predicates, Analyzer analyzer, SelectStmt stmt) {
final List<Expr> predicatesCanPushDown = Lists.newArrayList();
for (Expr predicate : predicates) {
if (predicate.isConstant()) {
// Constant predicates can't be pushed down past Groupby.
continue;
}
final List<TupleId> tupleIds = Lists.newArrayList();
final List<SlotId> slotIds = Lists.newArrayList();
predicate.getIds(tupleIds, slotIds);
boolean isAllSlotReferToGroupBys = true;
for (SlotId slotId : slotIds) {
Expr sourceExpr = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
// Every phase in aggregate will wrap expression with SlotRef.
// When we process one phase aggregate, we just need to unwrap once.
// But when we process 2 phase aggregate, we need to unwrap twice.
// So use loop here to adapt to different situations.
while (sourceExpr instanceof SlotRef) {
SlotRef slotRef = (SlotRef) sourceExpr;
SlotDescriptor slotDesc = slotRef.getDesc();
if (slotDesc.getSourceExprs().size() != 1) {
break;
}
sourceExpr = slotDesc.getSourceExprs().get(0);
}
if (sourceExpr instanceof AnalyticExpr) {
isAllSlotReferToGroupBys = false;
break;
}
// if grouping set is given and column is not in all grouping set list
// we cannot push the predicate since the column value can be null
if (stmt.getGroupByClause() == null) {
//group by clause may be null when distinct grouping.
//eg: select distinct c from ( select distinct c from table) t where c > 1;
continue;
}
if (sourceExpr.getFn() instanceof AggregateFunction) {
isAllSlotReferToGroupBys = false;
} else if (stmt.getGroupByClause().isGroupByExtension()) {
// if grouping type is CUBE or ROLLUP will definitely produce null
if (stmt.getGroupByClause().getGroupingType() == GroupByClause.GroupingType.CUBE
|| stmt.getGroupByClause().getGroupingType() == GroupByClause.GroupingType.ROLLUP) {
isAllSlotReferToGroupBys = false;
} else {
// if grouping type is GROUPING_SETS and the predicate not in all grouping list,
// the predicate cannot be push down
for (List<Expr> exprs : stmt.getGroupByClause().getGroupingSetList()) {
if (!exprs.contains(sourceExpr)) {
isAllSlotReferToGroupBys = false;
break;
}
}
}
}
GroupByClause groupByClause = stmt.getGroupByClause();
List<Expr> exprs = groupByClause.getGroupingExprs();
final Expr srcExpr = sourceExpr;
if (!exprs.contains(srcExpr) && !exprs.stream().anyMatch(expr -> expr.comeFrom(srcExpr))) {
// the sourceExpr doesn't come from any of the group by exprs
isAllSlotReferToGroupBys = false;
break;
}
}
if (isAllSlotReferToGroupBys) {
predicatesCanPushDown.add(predicate);
}
}
return getPredicatesReplacedSlotWithSourceExpr(predicatesCanPushDown, analyzer);
}
private List<Expr> getPredicatesReplacedSlotWithSourceExpr(List<Expr> predicates, Analyzer analyzer) {
final List<Expr> predicatesCanPushDown = Lists.newArrayList();
analyzer.markConjunctsAssigned(predicates);
for (Expr predicate : predicates) {
final Expr newPredicate = predicate.clone();
replacePredicateSlotRefWithSource(newPredicate, analyzer);
predicatesCanPushDown.add(newPredicate);
}
return predicatesCanPushDown;
}
private void replacePredicateSlotRefWithSource(Expr predicate, Analyzer analyzer) {
replacePredicateSlotRefWithSource(null, predicate, -1, analyzer);
}
private void replacePredicateSlotRefWithSource(Expr parent, Expr predicate, int childIndex, Analyzer analyzer) {
if (predicate instanceof SlotRef) {
final SlotRef slotRef = (SlotRef) predicate;
if (parent != null && childIndex >= 0) {
final Expr newReplacedExpr = slotRef.getDesc().getSourceExprs().get(0).clone();
parent.setChild(childIndex, newReplacedExpr);
}
}
for (int i = 0; i < predicate.getChildren().size(); i++) {
final Expr child = predicate.getChild(i);
replacePredicateSlotRefWithSource(predicate, child, i, analyzer);
}
}
// Register predicates with Aggregation's output tuple id.
private boolean putPredicatesOnAggregation(SelectStmt stmt, Analyzer analyzer,
List<Expr> predicates) throws AnalysisException {
final AggregateInfo aggregateInfo = stmt.getAggInfo();
if (aggregateInfo != null) {
analyzer.registerConjuncts(predicates, aggregateInfo.getOutputTupleId());
return true;
}
return false;
}
// Register predicates with Windows's tuple id.
private boolean putPredicatesOnWindows(SelectStmt stmt, Analyzer analyzer,
List<Expr> predicates) throws AnalysisException {
final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
if (analyticInfo != null) {
analyzer.registerConjuncts(predicates, analyticInfo.getOutputTupleId());
return true;
}
return false;
}
/**
* Register predicates on target tuple ids.
*
* @param analyzer current stmt analyzer
* @param predicates predicates try to register
* @param tupleIds target tupleIds
* @throws AnalysisException throw exception when register failed
*/
private void putPredicatesOnTargetTupleIds(List<TupleId> tupleIds,
Analyzer analyzer, List<Expr> predicates)
throws AnalysisException {
if (CollectionUtils.isEmpty(tupleIds)) {
return;
}
for (Expr predicate : predicates) {
Preconditions.checkArgument(predicate.isBoundByTupleIds(tupleIds),
"Predicate:" + predicate.toSql() + " can't be assigned to some PlanNode.");
final List<TupleId> predicateTupleIds = Lists.newArrayList();
predicate.getIds(predicateTupleIds, null);
analyzer.registerConjunct(predicate, predicateTupleIds);
}
}
/**
* ------------------------------------------------------------------------------
*/
private List<Expr> getBoundPredicates(Analyzer analyzer, TupleDescriptor tupleDesc) {
final List<TupleId> tupleIds = Lists.newArrayList();
if (tupleDesc != null) {
tupleIds.add(tupleDesc.getId());
}
return analyzer.getUnassignedConjuncts(tupleIds);
}
/**
* Returns a normalized version of a binary equality predicate 'expr' where the lhs
* child expr is bound by some tuple in 'lhsTids' and the rhs child expr is bound by
* some tuple in 'rhsTids'. Returns 'expr' if this predicate is already normalized.
* Returns null in any of the following cases:
* 1. It is not an equality predicate
* 2. One of the operands is a constant
* 3. Both children of this predicate are the same expr
* The so-called normalization is to ensure that the above conditions are met, and then
* to ensure that the order of expr is consistent with the order of node
*/
public static BinaryPredicate getNormalizedEqPred(Expr expr, List<TupleId> lhsTids,
List<TupleId> rhsTids, Analyzer analyzer) {
if (!(expr instanceof BinaryPredicate)) {
return null;
}
BinaryPredicate pred = (BinaryPredicate) expr;
if (!pred.getOp().isEquivalence()) {
return null;
}
if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) {
return null;
}
// Use the child that contains lhsTids as lhsExpr, for example, A join B on B.k = A.k,
// where lhsExpr=A.k, rhsExpr=B.k, changed the order, A.k = B.k
Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids);
Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids);
if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) {
return null;
}
BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr);
result.analyzeNoThrow(analyzer);
return result;
}
}