ScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/ScanNode.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PlaceHolderExpr;
import org.apache.doris.analysis.PredicateUtils;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.datasource.SplitAssignment;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Representation of the common elements of all scan nodes.
*/
public abstract class ScanNode extends PlanNode implements SplitGenerator {
private static final Logger LOG = LogManager.getLogger(ScanNode.class);
protected static final int NUM_SPLITS_PER_PARTITION = 10;
protected static final int NUM_SPLITTERS_ON_FLIGHT = Config.max_external_cache_loader_thread_pool_size;
protected TupleDescriptor desc;
// for distribution prunner
protected Map<String, PartitionColumnFilter> columnFilters = new CaseInsensitiveMap();
// Use this if partition_prune_algorithm_version is 2.
protected Map<String, ColumnRange> columnNameToRange = Maps.newHashMap();
protected String sortColumn = null;
protected Analyzer analyzer;
protected List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
protected List<SplitSource> splitSources = Lists.newArrayList();
protected PartitionInfo partitionsInfo = null;
protected SplitAssignment splitAssignment = null;
protected long selectedPartitionNum = 0;
protected int selectedSplitNum = 0;
// create a mapping between output slot's id and project expr
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
// support multi topn filter
protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
protected TableSnapshot tableSnapshot;
protected List<Column> columns;
// Save the id of backends which this scan node will be executed on.
// This is also important for local shuffle logic.
// Now only OlapScanNode and FileQueryScanNode implement this.
protected HashSet<Long> scanBackendIds = new HashSet<>();
public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType) {
super(id, desc.getId().asList(), planNodeName, statisticalType);
this.desc = desc;
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
this.analyzer = analyzer;
// materialize conjuncts in where
analyzer.materializeSlots(conjuncts);
}
/**
* Helper function to parse a "host:port" address string into TNetworkAddress
* This is called with ipaddress:port when doing scan range assigment.
*/
protected static TNetworkAddress addressToTNetworkAddress(String address) {
TNetworkAddress result = new TNetworkAddress();
String[] hostPort = address.split(":");
result.hostname = hostPort[0];
result.port = Integer.parseInt(hostPort[1]);
return result;
}
protected List<Column> getColumns() {
if (columns == null && desc.getTable() != null) {
columns = desc.getTable().getBaseSchema();
}
return columns;
}
public TupleDescriptor getTupleDesc() {
return desc;
}
public void setSortColumn(String column) {
sortColumn = column;
}
/**
* cast expr to SlotDescriptor type
*/
protected Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws UserException {
PrimitiveType dstType = slotDesc.getType().getPrimitiveType();
PrimitiveType srcType = expr.getType().getPrimitiveType();
if (PrimitiveType.typeWithPrecision.contains(dstType) && PrimitiveType.typeWithPrecision.contains(srcType)
&& !slotDesc.getType().equals(expr.getType())) {
return expr.castTo(slotDesc.getType());
} else if (dstType != srcType || slotDesc.getType().isAggStateType() && expr.getType().isAggStateType()
&& !slotDesc.getType().equals(expr.getType())) {
return expr.castTo(slotDesc.getType());
} else {
return expr;
}
}
protected abstract void createScanRangeLocations() throws UserException;
/**
* Returns all scan ranges plus their locations. Needs to be preceded by a call to
* finalize().
*
* @param maxScanRangeLength The maximum number of bytes each scan range should scan;
* only applicable to HDFS; less than or equal to zero means no
* maximum.
*/
public abstract List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength);
// If scan is key search, should not enable the shared scan opt to prevent the performance problem
// 1. where contain the eq or in expr of key column slot
// 2. key column slot is distribution column and first column
protected boolean isKeySearch() {
return false;
}
private void computeColumnFilter(Column column, SlotDescriptor slotDesc, PartitionInfo partitionsInfo) {
// Set `columnFilters` all the time because `DistributionPruner` also use this.
// Maybe we could use `columnNameToRange` for `DistributionPruner` and
// only create `columnFilters` when `partition_prune_algorithm_version` is 1.
PartitionColumnFilter keyFilter = createPartitionFilter(slotDesc, conjuncts, partitionsInfo);
if (null != keyFilter) {
columnFilters.put(column.getName(), keyFilter);
}
ColumnRange columnRange = createColumnRange(slotDesc, conjuncts, partitionsInfo);
if (columnRange != null) {
columnNameToRange.put(column.getName(), columnRange);
}
}
// TODO(ML): move it into PrunerOptimizer
public void computeColumnsFilter(List<Column> columns, PartitionInfo partitionsInfo) {
if (columns.size() > conjuncts.size()) {
Set<SlotRef> slotRefs = Sets.newHashSet();
for (Expr conjunct : conjuncts) {
conjunct.collect(SlotRef.class, slotRefs);
}
for (SlotRef slotRef : slotRefs) {
SlotDescriptor slotDesc = slotRef.getDesc();
if (null == slotDesc) {
continue;
}
Column column = slotDesc.getColumn();
if (column == null) {
continue;
}
computeColumnFilter(column, slotDesc, partitionsInfo);
}
} else {
for (Column column : columns) {
SlotDescriptor slotDesc = desc.getColumnSlot(column.getName());
if (null == slotDesc) {
continue;
}
computeColumnFilter(column, slotDesc, partitionsInfo);
}
}
}
public void computeColumnsFilter() {
// for load scan node, table is null
// partitionsInfo maybe null for other scan node, eg: ExternalScanNode...
if (desc.getTable() != null) {
computeColumnsFilter(getColumns(), partitionsInfo);
}
}
public TableIf getTableIf() {
return desc.getTable();
}
public static ColumnRange createColumnRange(SlotDescriptor desc,
List<Expr> conjuncts, PartitionInfo partitionsInfo) {
ColumnRange result = ColumnRange.create();
for (Expr expr : conjuncts) {
if (!expr.isBound(desc.getId())) {
continue;
}
if (expr instanceof CompoundPredicate
&& ((CompoundPredicate) expr).getOp() == CompoundPredicate.Operator.OR) {
// Try to get column filter from disjunctive predicates.
List<Expr> disjunctivePredicates = PredicateUtils.splitDisjunctivePredicates(expr);
if (disjunctivePredicates.isEmpty()) {
continue;
}
List<Range<ColumnBound>> disjunctiveRanges = Lists.newArrayList();
Set<Boolean> hasIsNull = Sets.newHashSet();
boolean allMatch = disjunctivePredicates.stream().allMatch(e -> {
ColumnRanges ranges = expressionToRanges(e, desc, partitionsInfo);
switch (ranges.type) {
case IS_NULL:
hasIsNull.add(true);
return true;
case CONVERT_SUCCESS:
disjunctiveRanges.addAll(ranges.ranges);
return true;
case CONVERT_FAILURE:
default:
return false;
}
});
if (allMatch && !(disjunctiveRanges.isEmpty() && hasIsNull.isEmpty())) {
result.intersect(disjunctiveRanges);
result.setHasDisjunctiveIsNull(!hasIsNull.isEmpty());
}
} else {
// Try to get column filter from conjunctive predicates.
ColumnRanges ranges = expressionToRanges(expr, desc, partitionsInfo);
switch (ranges.type) {
case IS_NULL:
result.setHasConjunctiveIsNull(true);
break;
case CONVERT_SUCCESS:
result.intersect(ranges.ranges);
break;
case CONVERT_FAILURE:
default:
break;
}
}
}
return result;
}
public static ColumnRanges expressionToRanges(Expr expr,
SlotDescriptor desc, PartitionInfo partitionsInfo) {
if (expr instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
if (isNullPredicate.isSlotRefChildren() && !isNullPredicate.isNotNull()) {
return ColumnRanges.createIsNull();
}
}
List<Range<ColumnBound>> result = Lists.newArrayList();
if (expr instanceof BinaryPredicate) {
BinaryPredicate binPred = (BinaryPredicate) expr;
ArrayList<Expr> partitionExprs = (partitionsInfo != null && partitionsInfo.enableAutomaticPartition())
? partitionsInfo.getPartitionExprs()
: null;
Expr slotBinding = binPred.getSlotBinding(desc.getId(), partitionExprs);
if (slotBinding == null || !slotBinding.isConstant() || !(slotBinding instanceof LiteralExpr)) {
return ColumnRanges.createFailure();
}
LiteralExpr value = (LiteralExpr) slotBinding;
switch (binPred.getOp()) {
case EQ:
ColumnBound bound = ColumnBound.of(value);
result.add(Range.closed(bound, bound));
break;
case LE:
result.add(Range.atMost(ColumnBound.of(value)));
break;
case LT:
result.add(Range.lessThan(ColumnBound.of(value)));
break;
case GE:
result.add(Range.atLeast(ColumnBound.of(value)));
break;
case GT:
result.add(Range.greaterThan(ColumnBound.of(value)));
break;
case NE:
ColumnBound b = ColumnBound.of(value);
result.add(Range.greaterThan(b));
result.add(Range.lessThan(b));
break;
default:
break;
}
} else if (expr instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) expr;
if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
return ColumnRanges.createFailure();
}
if (!(inPredicate.getChild(0).unwrapExpr(false) instanceof SlotRef)) {
// If child(0) of the in predicate is not a SlotRef,
// then other children of in predicate should not be used as a condition for partition prune.
return ColumnRanges.createFailure();
}
for (int i = 1; i < inPredicate.getChildren().size(); ++i) {
ColumnBound bound = ColumnBound.of((LiteralExpr) inPredicate.getChild(i));
result.add(Range.closed(bound, bound));
}
} else if (expr instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
ColumnRanges leftChildRange = null;
ColumnRanges rightChildRange = null;
switch (compoundPredicate.getOp()) {
case AND:
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc, partitionsInfo);
rightChildRange = expressionToRanges(compoundPredicate.getChild(1), desc, partitionsInfo);
return leftChildRange.intersectRanges(rightChildRange);
case OR:
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc, partitionsInfo);
rightChildRange = expressionToRanges(compoundPredicate.getChild(1), desc, partitionsInfo);
return leftChildRange.unionRanges(rightChildRange);
case NOT:
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc, partitionsInfo);
return leftChildRange.complementOfRanges();
default:
throw new RuntimeException("unknown OP in compound predicate: "
+ compoundPredicate.getOp().toString());
}
}
if (result.isEmpty()) {
return ColumnRanges.createFailure();
} else {
return ColumnRanges.create(result);
}
}
private PartitionColumnFilter createPartitionFilter(SlotDescriptor desc, List<Expr> conjuncts,
PartitionInfo partitionsInfo) {
PartitionColumnFilter partitionColumnFilter = null;
for (Expr expr : conjuncts) {
if (!expr.isBound(desc.getId())) {
continue;
}
if (expr instanceof BinaryPredicate) {
BinaryPredicate binPredicate = (BinaryPredicate) expr;
if (binPredicate.getOp() == BinaryPredicate.Operator.NE) {
continue;
}
ArrayList<Expr> partitionExprs = (partitionsInfo != null && partitionsInfo.enableAutomaticPartition())
? partitionsInfo.getPartitionExprs()
: null;
Expr slotBinding = binPredicate.getSlotBinding(desc.getId(), partitionExprs);
if (slotBinding == null || !slotBinding.isConstant() || !(slotBinding instanceof LiteralExpr)) {
continue;
}
if (null == partitionColumnFilter) {
partitionColumnFilter = new PartitionColumnFilter();
}
LiteralExpr literal = slotBinding instanceof PlaceHolderExpr
? ((PlaceHolderExpr) slotBinding).getLiteral() : (LiteralExpr) slotBinding;
BinaryPredicate.Operator op = binPredicate.getOp();
if (!binPredicate.slotIsLeft()) {
op = op.commutative();
}
switch (op) {
case EQ:
partitionColumnFilter.setLowerBound(literal, true);
partitionColumnFilter.setUpperBound(literal, true);
break;
case LE:
partitionColumnFilter.setUpperBound(literal, true);
partitionColumnFilter.lowerBoundInclusive = true;
break;
case LT:
partitionColumnFilter.setUpperBound(literal, false);
partitionColumnFilter.lowerBoundInclusive = true;
break;
case GE:
partitionColumnFilter.setLowerBound(literal, true);
break;
case GT:
partitionColumnFilter.setLowerBound(literal, false);
break;
default:
break;
}
} else if (expr instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) expr;
if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
continue;
}
if (!(inPredicate.getChild(0).unwrapExpr(false) instanceof SlotRef)) {
// If child(0) of the in predicate is not a SlotRef,
// then other children of in predicate should not be used as a condition for partition prune.
continue;
}
if (null == partitionColumnFilter) {
partitionColumnFilter = new PartitionColumnFilter();
}
partitionColumnFilter.setInPredicate(inPredicate);
} else if (expr instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
if (!isNullPredicate.isSlotRefChildren() || isNullPredicate.isNotNull()) {
continue;
}
// If we meet a IsNull predicate on partition column, then other predicates are useless
// eg: (xxxx) and (col is null), only the IsNull predicate has an effect on partition pruning.
partitionColumnFilter = new PartitionColumnFilter();
NullLiteral nullLiteral = new NullLiteral();
partitionColumnFilter.setLowerBound(nullLiteral, true);
partitionColumnFilter.setUpperBound(nullLiteral, true);
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("partitionColumnFilter: {}", partitionColumnFilter);
}
return partitionColumnFilter;
}
public static class ColumnRanges {
public enum Type {
// Expression is `is null` predicate.
IS_NULL,
// Succeed to convert expression to ranges.
CONVERT_SUCCESS,
// Failed to convert expression to ranges.
CONVERT_FAILURE
}
public final Type type;
public final List<Range<ColumnBound>> ranges;
private ColumnRanges(Type type, List<Range<ColumnBound>> ranges) {
this.type = type;
this.ranges = ranges;
}
private static final ColumnRanges IS_NULL = new ColumnRanges(Type.IS_NULL, null);
private static final ColumnRanges CONVERT_FAILURE = new ColumnRanges(Type.CONVERT_FAILURE, null);
public static ColumnRanges createIsNull() {
return IS_NULL;
}
public ColumnRanges complementOfRanges() {
if (type == Type.CONVERT_SUCCESS) {
RangeSet<ColumnBound> rangeSet = TreeRangeSet.create();
rangeSet.addAll(ranges);
return create(Lists.newArrayList(rangeSet.complement().asRanges()));
}
return CONVERT_FAILURE;
}
public ColumnRanges intersectRanges(ColumnRanges other) {
// intersect ranges can handle isnull
switch (this.type) {
case IS_NULL:
return createIsNull();
case CONVERT_FAILURE:
return createFailure();
case CONVERT_SUCCESS:
switch (other.type) {
case IS_NULL:
return createIsNull();
case CONVERT_FAILURE:
return createFailure();
case CONVERT_SUCCESS:
RangeSet<ColumnBound> rangeSet = TreeRangeSet.create();
rangeSet.addAll(this.ranges);
RangeSet<ColumnBound> intersectSet = TreeRangeSet.create();
other.ranges.forEach(range -> intersectSet.addAll(rangeSet.subRangeSet(range)));
return create(Lists.newArrayList(intersectSet.asRanges()));
default:
return createFailure();
}
default:
return createFailure();
}
}
public ColumnRanges unionRanges(ColumnRanges other) {
switch (this.type) {
case IS_NULL:
case CONVERT_FAILURE:
return createFailure();
case CONVERT_SUCCESS:
switch (other.type) {
case IS_NULL:
case CONVERT_FAILURE:
return createFailure();
case CONVERT_SUCCESS:
RangeSet<ColumnBound> rangeSet = TreeRangeSet.create();
rangeSet.addAll(this.ranges);
rangeSet.addAll(other.ranges);
List<Range<ColumnBound>> unionRangeList = Lists.newArrayList(rangeSet.asRanges());
return create(unionRangeList);
default:
return createFailure();
}
default:
return createFailure();
}
}
public static ColumnRanges createFailure() {
return CONVERT_FAILURE;
}
public static ColumnRanges create(List<Range<ColumnBound>> ranges) {
return new ColumnRanges(Type.CONVERT_SUCCESS, ranges);
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", getId().asInt())
.add("tid", desc.getId().asInt())
.add("tblName", desc.getTable().getName())
.add("keyRanges", "")
.addValue(super.debugString()).toString();
}
// Some of scan node(eg, DataGenScanNode) does not need to check column priv
// (because the it has no corresponding catalog/db/table info)
// Subclass may override this method.
public boolean needToCheckColumnPriv() {
return true;
}
public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) {
outputSmap = smap;
if (smap.getRhs().stream().anyMatch(expr -> !(expr instanceof SlotRef))) {
if (outputTupleDesc == null) {
outputTupleDesc = analyzer.getDescTbl().createTupleDescriptor("OlapScanNode");
}
if (projectList == null) {
projectList = Lists.newArrayList();
}
// setOutputSmap may be called multiple times
// this happens if the olap table is in the most inner sub-query block in the cascades sub-queries
// create a tmpSmap for the later setOutputSmap call
ExprSubstitutionMap tmpSmap = new ExprSubstitutionMap(
Lists.newArrayList(outputTupleDesc.getSlots().stream()
.filter(slot -> slot.isMaterialized())
.map(slot -> new SlotRef(slot))
.collect(Collectors.toList())),
Lists.newArrayList(projectList));
Set<SlotId> allOutputSlotIds = outputTupleDesc.getSlots().stream().map(slot -> slot.getId())
.collect(Collectors.toSet());
List<Expr> newRhs = Lists.newArrayList();
List<Expr> rhs = smap.getRhs();
for (int i = 0; i < smap.size(); ++i) {
Expr rhsExpr = rhs.get(i);
if (!(rhsExpr instanceof SlotRef) || !(allOutputSlotIds.contains(((SlotRef) rhsExpr).getSlotId()))) {
rhsExpr = rhsExpr.substitute(tmpSmap);
if (rhsExpr.isBound(desc.getId())) {
SlotDescriptor slotDesc = analyzer.addSlotDescriptor(outputTupleDesc);
slotDesc.initFromExpr(rhsExpr);
if (rhsExpr instanceof SlotRef) {
slotDesc.setSrcColumn(((SlotRef) rhsExpr).getColumn());
slotDesc.setIsMaterialized(((SlotRef) rhsExpr).getDesc().isMaterialized());
} else {
slotDesc.setIsMaterialized(true);
}
if (slotDesc.isMaterialized()) {
slotDesc.materializeSrcExpr();
projectList.add(rhsExpr);
}
newRhs.add(new SlotRef(slotDesc));
allOutputSlotIds.add(slotDesc.getId());
outputSlotToProjectExpr.put(slotDesc.getId(), rhsExpr);
} else {
newRhs.add(rhs.get(i));
}
} else {
newRhs.add(rhsExpr);
}
}
outputSmap.updateRhsExprs(newRhs);
}
}
@Override
public void initOutputSlotIds(Set<SlotId> requiredSlotIdSet, Analyzer analyzer) {
if (outputTupleDesc != null && requiredSlotIdSet != null) {
Preconditions.checkNotNull(outputSmap);
ArrayList<SlotId> materializedSlotIds = outputTupleDesc.getMaterializedSlotIds();
Preconditions.checkState(projectList != null && projectList.size() <= materializedSlotIds.size(),
"projectList's size should be less than materializedSlotIds's size");
boolean hasNewSlot = false;
if (projectList.size() < materializedSlotIds.size()) {
// need recreate projectList based on materializedSlotIds
hasNewSlot = true;
}
// find new project expr from outputSmap based on requiredSlotIdSet
ArrayList<SlotId> allSlots = outputTupleDesc.getAllSlotIds();
for (SlotId slotId : requiredSlotIdSet) {
if (!materializedSlotIds.contains(slotId) && allSlots.contains(slotId)) {
SlotDescriptor slot = outputTupleDesc.getSlot(slotId.asInt());
for (Expr expr : outputSmap.getRhs()) {
if (expr instanceof SlotRef && ((SlotRef) expr).getSlotId() == slotId) {
slot.setIsMaterialized(true);
outputSlotToProjectExpr.put(slotId, expr.getSrcSlotRef());
hasNewSlot = true;
}
}
}
}
if (hasNewSlot) {
// recreate the project list
projectList.clear();
materializedSlotIds = outputTupleDesc.getMaterializedSlotIds();
for (SlotId slotId : materializedSlotIds) {
projectList.add(outputSlotToProjectExpr.get(slotId));
}
}
}
}
public List<TupleId> getOutputTupleIds() {
if (outputTupleDesc != null) {
return Lists.newArrayList(outputTupleDesc.getId());
}
return tupleIds;
}
public StatsDelta genStatsDelta() throws AnalysisException {
return null;
}
public StatsDelta genQueryStats() throws UserException {
StatsDelta delta = genStatsDelta();
if (delta == null) {
return null;
}
for (SlotDescriptor slot : desc.getMaterializedSlots()) {
if (slot.isScanSlot() && slot.getColumn() != null) {
delta.addQueryStats(slot.getColumn().getName());
}
}
for (Expr expr : conjuncts) {
List<SlotId> slotIds = Lists.newArrayList();
expr.getIds(null, slotIds);
for (SlotId slotId : slotIds) {
SlotDescriptor slot = desc.getSlot(slotId.asInt());
if (slot.getColumn() != null) {
delta.addFilterStats(slot.getColumn().getName());
}
}
}
return delta;
}
// Create a single scan range locations for the given backend policy.
// Used for those scan nodes which do not require data location.
public static TScanRangeLocations createSingleScanRangeLocations(FederationBackendPolicy backendPolicy) {
TScanRangeLocations scanRangeLocation = new TScanRangeLocations();
scanRangeLocation.setScanRange(new TScanRange());
TScanRangeLocation location = new TScanRangeLocation();
Backend be = backendPolicy.getNextBe();
location.setServer(new TNetworkAddress(be.getHost(), be.getBePort()));
location.setBackendId(be.getId());
scanRangeLocation.addToLocations(location);
return scanRangeLocation;
}
public int numScanBackends() {
return scanBackendIds.size();
}
public int getScanRangeNum() {
return Integer.MAX_VALUE;
}
public boolean shouldUseOneInstance(ConnectContext ctx) {
int adaptivePipelineTaskSerialReadOnLimit = 10000;
if (ctx != null) {
if (ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) {
adaptivePipelineTaskSerialReadOnLimit = ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit;
} else {
return false;
}
} else {
// No connection context, typically for broker load.
}
// For UniqueKey table, we will use multiple instance.
return hasLimit() && getLimit() <= adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty();
}
// In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions.
// After all scan node are collected, it is possible to gain a snapshot of the partition version.
public static void setVisibleVersionForOlapScanNodes(List<ScanNode> scanNodes) throws UserException {
if (Config.isNotCloudMode()) {
return;
}
List<CloudPartition> partitions = new ArrayList<>();
Set<Long> partitionSet = new HashSet<>();
for (ScanNode node : scanNodes) {
if (!(node instanceof OlapScanNode)) {
continue;
}
OlapScanNode scanNode = (OlapScanNode) node;
OlapTable table = scanNode.getOlapTable();
for (Long id : scanNode.getSelectedPartitionIds()) {
if (!partitionSet.contains(id)) {
partitionSet.add(id);
partitions.add((CloudPartition) table.getPartition(id));
}
}
}
if (partitions.isEmpty()) {
return;
}
List<Long> versions;
try {
versions = CloudPartition.getSnapshotVisibleVersion(partitions);
} catch (RpcException e) {
throw new UserException("get visible version for OlapScanNode failed", e);
}
assert versions.size() == partitions.size() : "the got num versions is not equals to acquired num versions";
if (versions.stream().anyMatch(x -> x <= 0)) {
int size = versions.size();
for (int i = 0; i < size; ++i) {
if (versions.get(i) <= 0) {
LOG.warn("partition {} getVisibleVersion error, the visibleVersion is {}",
partitions.get(i).getId(), versions.get(i));
throw new UserException("partition " + partitions.get(i).getId()
+ " getVisibleVersion error, the visibleVersion is " + versions.get(i));
}
}
}
// ATTN: the table ids are ignored here because the both id are allocated from a same id generator.
Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size())
.boxed()
.collect(Collectors.toMap(i -> partitions.get(i).getId(), versions::get));
for (ScanNode node : scanNodes) {
if (!(node instanceof OlapScanNode)) {
continue;
}
OlapScanNode scanNode = (OlapScanNode) node;
scanNode.updateScanRangeVersions(visibleVersionMap);
}
}
protected void toThrift(TPlanNode msg) {
// topn filter
if (useTopnFilter()) {
List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes()
.stream()
.map(sortNode -> sortNode.getId().asInt())
.collect(Collectors.toList());
msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
}
}
public void addTopnFilterSortNode(SortNode sortNode) {
topnFilterSortNodes.add(sortNode);
}
public List<SortNode> getTopnFilterSortNodes() {
return topnFilterSortNodes;
}
public boolean useTopnFilter() {
return !topnFilterSortNodes.isEmpty();
}
public long getSelectedPartitionNum() {
return selectedPartitionNum;
}
public long getSelectedSplitNum() {
return selectedSplitNum;
}
@Override
public boolean isSerialOperator() {
return numScanBackends() <= 0 || getScanRangeNum()
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends()
|| (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
}
@Override
public boolean hasSerialScanChildren() {
return isSerialOperator();
}
public void setDesc(TupleDescriptor desc) {
this.desc = desc;
}
}