OlapScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.planner.normalize.PartitionRangePredicateNormalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsDeriveResult;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TNormalizedOlapScanNode;
import org.apache.doris.thrift.TNormalizedPlanNode;
import org.apache.doris.thrift.TOlapScanNode;
import org.apache.doris.thrift.TOlapTableIndex;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TSortInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
// Full scan of an Olap table.
public class OlapScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(OlapScanNode.class);
// average compression ratio in doris storage engine
private static final int COMPRESSION_RATIO = 5;
/*
* When the field value is ON, the storage engine can return the data directly
* without pre-aggregation.
* When the field value is OFF, the storage engine needs to aggregate the data
* before returning to scan node. And if the table is an aggregation table,
* all key columns need to be read an participate in aggregation.
* For example:
* Aggregate table: k1, k2, v1 sum
* Field value is ON
* Query1: select k1, sum(v1) from table group by k1
* This aggregation function in query is same as the schema.
* So the field value is ON while the query can scan data directly.
*
* Field value is OFF
* Query1: select k1 , k2 from table
* This aggregation info is null.
* Query2: select k1, min(v1) from table group by k1
* This aggregation function in query is min which different from the schema.
* So the data stored in storage engine need to be merged firstly before
* returning to scan node. Although we only queried key column k1, key column
* k2 still needs to be detected and participate in aggregation to ensure the
* results are correct.
*
* There are currently two places to modify this variable:
* 1. The turnOffPreAgg() method of SingleNodePlanner.
* This method will only be called on the left deepest OlapScanNode the plan
* tree,
* while other nodes are false by default (because the Aggregation operation is
* executed after Join,
* we cannot judge whether other OlapScanNodes can close the pre-aggregation).
* So even the Duplicate key table, if it is not the left deepest node, it will
* remain false too.
*
* 2. After MaterializedViewSelector selects the materialized view, the
* updateScanRangeInfoByNewMVSelector()\
* method of OlapScanNode may be called to update this variable.
* This call will be executed on all ScanNodes in the plan tree. In this step,
* for the DuplicateKey table, the variable will be set to true.
* See comment of "isPreAggregation" variable in MaterializedViewSelector for
* details.
*/
private boolean isPreAggregation = false;
private String reasonOfPreAggregation = null;
private boolean canTurnOnPreAggr = true;
private boolean forceOpenPreAgg = false;
private OlapTable olapTable = null;
private long totalTabletsNum = 0;
private long selectedIndexId = -1;
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
private long totalBytes = 0;
// tablet id to single replica bytes
private Map<Long, Long> tabletBytes = Maps.newLinkedHashMap();
private SortInfo sortInfo = null;
private Set<Integer> outputColumnUniqueIds = new HashSet<>();
// When scan match sort_info, we can push limit into OlapScanNode.
// It's limit for scanner instead of scanNode so we add a new limit.
private long sortLimit = -1;
// List of tablets will be scanned by current olap_scan_node
private ArrayList<Long> scanTabletIds = Lists.newArrayList();
private ArrayList<Long> scanReplicaIds = Lists.newArrayList();
private Set<Long> sampleTabletIds = Sets.newHashSet();
private TableSample tableSample;
private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
// a bucket seq may map to many tablets, and each tablet has a
// TScanRangeLocations.
public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations = ArrayListMultimap.create();
public Map<Integer, Long> bucketSeq2Bytes = Maps.newLinkedHashMap();
// For point query
private Map<SlotRef, Expr> pointQueryEqualPredicats;
private DescriptorTable descTable;
private Set<Integer> distributionColumnIds;
private boolean shouldColoScan = false;
protected List<Expr> rewrittenProjectList;
private long maxVersion = -1L;
// cached for prepared statement to quickly prune partition
// only used in short circuit plan at present
private final PartitionPruneV2ForShortCircuitPlan cachedPartitionPruner =
new PartitionPruneV2ForShortCircuitPlan();
private boolean isTopnLazyMaterialize = false;
private List<Column> topnLazyMaterializeOutputColumns = new ArrayList<>();
private Column globalRowIdColumn;
// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
olapTable = (OlapTable) desc.getTable();
distributionColumnIds = Sets.newTreeSet();
Set<String> distColumnName = getDistributionColumnNames();
// use for Nereids to generate uniqueId set for inverted index to avoid scan unnecessary big size column
int columnId = 0;
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
if (slotDescriptor.getColumn() != null) {
outputColumnUniqueIds.add(slotDescriptor.getColumn().getUniqueId());
if (distColumnName.contains(slotDescriptor.getColumn().getName().toLowerCase())) {
distributionColumnIds.add(columnId);
}
columnId++;
}
}
}
public void setIsPreAggregation(boolean isPreAggregation, String reason) {
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ? reason :
this.reasonOfPreAggregation + " " + reason;
}
public boolean isPreAggregation() {
return isPreAggregation;
}
public boolean getCanTurnOnPreAggr() {
return canTurnOnPreAggr;
}
public Set<Long> getSampleTabletIds() {
return sampleTabletIds;
}
public HashSet<Long> getScanBackendIds() {
return scanBackendIds;
}
public void setSampleTabletIds(List<Long> sampleTablets) {
if (sampleTablets != null) {
this.sampleTabletIds.addAll(sampleTablets);
}
}
public void setRewrittenProjectList(List<Expr> rewrittenProjectList) {
this.rewrittenProjectList = rewrittenProjectList;
}
public void setTableSample(TableSample tSample) {
this.tableSample = tSample;
}
public void setCanTurnOnPreAggr(boolean canChangePreAggr) {
this.canTurnOnPreAggr = canChangePreAggr;
}
public void closePreAggregation(String reason) {
setIsPreAggregation(false, reason);
setCanTurnOnPreAggr(false);
}
public long getTotalTabletsNum() {
return totalTabletsNum;
}
public boolean getForceOpenPreAgg() {
return forceOpenPreAgg;
}
public ArrayList<Long> getScanTabletIds() {
return scanTabletIds;
}
public void setForceOpenPreAgg(boolean forceOpenPreAgg) {
this.forceOpenPreAgg = forceOpenPreAgg;
}
public SortInfo getSortInfo() {
return sortInfo;
}
public void setSortInfo(SortInfo sortInfo) {
this.sortInfo = sortInfo;
}
public void setSortLimit(long sortLimit) {
this.sortLimit = sortLimit;
}
public Collection<Long> getSelectedPartitionIds() {
return selectedPartitionIds;
}
public void setTupleIds(ArrayList<TupleId> tupleIds) {
this.tupleIds = tupleIds;
}
// only used for UT and Nereids
public void setSelectedPartitionIds(Collection<Long> selectedPartitionIds) {
this.selectedPartitionIds = selectedPartitionIds;
}
/**
* Only used for Nereids to set rollup or materialized view selection result.
*/
public void setSelectedIndexInfo(
long selectedIndexId,
boolean isPreAggregation,
String reasonOfPreAggregation) {
this.selectedIndexId = selectedIndexId;
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = reasonOfPreAggregation;
}
/**
* The function is used to directly select the index id of the base table as the
* selectedIndexId.
* It makes sure that the olap scan node must scan the base data rather than
* scan the materialized view data.
* <p>
* This function is mainly used to update stmt.
* Update stmt also needs to scan data like normal queries.
* But its syntax is different from ordinary queries,
* so planner cannot use the logic of query to automatically match the best
* index id.
* So, here it need to manually specify the index id to scan the base table
* directly.
*/
public void useBaseIndexId() {
this.selectedIndexId = olapTable.getBaseIndexId();
}
public long getSelectedIndexId() {
return selectedIndexId;
}
public void ignoreConjuncts(Expr whereExpr) {
if (whereExpr == null) {
return;
}
Expr vconjunct = convertConjunctsToAndCompoundPredicate(conjuncts).replaceSubPredicate(whereExpr);
conjuncts = splitAndCompoundPredicateToConjuncts(vconjunct).stream().collect(Collectors.toList());
}
/**
* This method is mainly used to update scan range info in OlapScanNode by the
* new materialized selector.
* Situation1:
* If the new scan range is same as the old scan range which determined by the
* old materialized selector,
* the scan range will not be changed.
* <p>
* Situation2: Scan range is difference. The type of table is duplicated.
* The new scan range is used directly.
* The reason is that the old selector does not support SPJ<->SPJG, so the
* result of old one must be incorrect.
* <p>
* Situation3: Scan range is difference. The type of table is aggregated.
* The new scan range is different from the old one.
* If the test_materialized_view is set to true, an error will be reported.
* The query will be cancelled.
* <p>
* Situation4: Scan range is difference. The type of table is aggregated.
* `test_materialized_view` is set to false.
* The result of the old version selector will be selected. Print the warning
* log
*
* @param selectedIndexId
* @param isPreAggregation
* @param reasonOfDisable
* @throws UserException
*/
public void updateScanRangeInfoByNewMVSelector(long selectedIndexId,
boolean isPreAggregation, String reasonOfDisable)
throws UserException {
if (selectedIndexId == this.selectedIndexId && isPreAggregation == this.isPreAggregation) {
return;
}
StringBuilder stringBuilder = new StringBuilder("The new selected index id ")
.append(selectedIndexId)
.append(", pre aggregation tag ").append(isPreAggregation)
.append(", reason ").append(reasonOfDisable == null ? "null" : reasonOfDisable)
.append(". The old selected index id ").append(this.selectedIndexId)
.append(" pre aggregation tag ").append(this.isPreAggregation)
.append(" reason ").append(this.reasonOfPreAggregation == null ? "null" : this.reasonOfPreAggregation);
String scanRangeInfo = stringBuilder.toString();
String situation;
boolean update;
CHECK:
{ // CHECKSTYLE IGNORE THIS LINE
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
situation = "The key type of table is duplicate, or unique key with merge-on-write.";
update = true;
break CHECK;
}
if (ConnectContext.get() == null) {
situation = "Connection context is null";
update = true;
break CHECK;
}
situation = "The key type of table is aggregated.";
update = false;
} // CHECKSTYLE IGNORE THIS LINE
if (update) {
this.selectedIndexId = selectedIndexId;
updateSlotUniqueId();
setIsPreAggregation(isPreAggregation, reasonOfDisable);
updateColumnType();
if (LOG.isDebugEnabled()) {
LOG.debug("Using the new scan range info instead of the old one. {}, {}",
situation, scanRangeInfo);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Using the old scan range info instead of the new one. {}, {}",
situation, scanRangeInfo);
}
}
}
/**
* In some situation, the column type between base and mv is different.
* If mv selector selects the mv index, the type of column should be changed to
* the type of mv column.
* For example:
* base table: k1 int, k2 int
* mv table: k1 int, k2 bigint sum
* The type of `k2` column between base and mv is different.
* When mv selector selects the mv table to scan, the type of column should be
* changed to bigint in here.
* Currently, only `SUM` aggregate type could match this changed.
*/
private void updateColumnType() throws UserException {
if (selectedIndexId == olapTable.getBaseIndexId()) {
return;
}
MaterializedIndexMeta meta = olapTable.getIndexMetaByIndexId(selectedIndexId);
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
if (!slotDescriptor.isMaterialized()) {
continue;
}
Column baseColumn = slotDescriptor.getColumn();
Preconditions.checkNotNull(baseColumn);
Column mvColumn = meta.getColumnByName(baseColumn.getName());
if (mvColumn == null) {
mvColumn = meta.getColumnByName(CreateMaterializedViewStmt.mvColumnBuilder(baseColumn.getName()));
}
if (mvColumn == null) {
throw new UserException("updateColumnType: Do not found mvColumn=" + baseColumn.getName()
+ " from index=" + olapTable.getIndexNameById(selectedIndexId));
}
if (mvColumn.getType() != baseColumn.getType()) {
slotDescriptor.setColumn(mvColumn);
}
}
}
/**
* In some situation, we need use mv col unique id , because mv col unique and
* base col unique id is different.
* For example: select count(*) from table (table has a mv named mv1)
* if Optimizer deceide use mv1, we need updateSlotUniqueId.
*/
private void updateSlotUniqueId() throws UserException {
if (!olapTable.getEnableLightSchemaChange() || selectedIndexId == olapTable.getBaseIndexId()) {
return;
}
MaterializedIndexMeta meta = olapTable.getIndexMetaByIndexId(selectedIndexId);
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
if (!slotDescriptor.isMaterialized()) {
continue;
}
Column baseColumn = slotDescriptor.getColumn();
Column mvColumn = meta.getColumnByName(baseColumn.getName());
if (mvColumn == null) {
boolean isBound = false;
for (Expr conjunct : conjuncts) {
List<TupleId> tids = Lists.newArrayList();
conjunct.getIds(tids, null);
if (!tids.isEmpty() && conjunct.isBound(slotDescriptor.getId())) {
isBound = true;
break;
}
}
if (isBound) {
slotDescriptor.setIsMaterialized(false);
} else {
throw new UserException("updateSlotUniqueId: Do not found mvColumn=" + baseColumn.getName()
+ " from index=" + olapTable.getIndexNameById(selectedIndexId));
}
} else {
slotDescriptor.setColumn(mvColumn);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("updateSlotUniqueId() slots: {}", desc.getSlots());
}
}
public OlapTable getOlapTable() {
return olapTable;
}
public boolean isDupKeysOrMergeOnWrite() {
return olapTable.isDupKeysOrMergeOnWrite();
}
@Override
protected String debugString() {
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
helper.addValue(super.debugString());
helper.addValue("olapTable=" + olapTable.getName());
return helper.toString();
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
filterDeletedRows(analyzer);
if (olapTable.getPartitionInfo().enableAutomaticPartition()) {
partitionsInfo = olapTable.getPartitionInfo();
analyzerPartitionExpr(analyzer, partitionsInfo);
}
computeColumnsFilter();
computePartitionInfo();
computeTupleState(analyzer);
/**
* Compute InAccurate cardinality before mv selector and tablet pruning.
* - Accurate statistical information relies on the selector of materialized
* views and bucket reduction.
* - However, Those both processes occur after the reorder algorithm is
* completed.
* - When Join reorder is turned on, the cardinality must be calculated before
* the reorder algorithm.
* - So only an inaccurate cardinality can be calculated here.
*/
mockRowCountInStatistic();
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
computeInaccurateCardinality();
}
}
/**
* Init OlapScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else.
*/
public void init() throws UserException {
selectedPartitionNum = selectedPartitionIds.size();
try {
createScanRangeLocations();
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
}
/**
* Remove the method after statistics collection is working properly
*/
public void mockRowCountInStatistic() {
cardinality = 0;
for (long selectedPartitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(selectedPartitionId);
final MaterializedIndex baseIndex = partition.getBaseIndex();
cardinality += baseIndex.getRowCount();
}
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
if (LOG.isDebugEnabled()) {
LOG.debug("OlapScanNode get scan range locations. Tuple: {}", desc);
}
/**
* If JoinReorder is turned on, it will be calculated init(), and this value is
* not accurate.
* In the following logic, cardinality will be accurately calculated again.
* So here we need to reset the value of cardinality.
*/
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
cardinality = 0;
}
try {
createScanRangeLocations();
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
// Relatively accurate cardinality according to ScanRange in
// getScanRangeLocations
computeStats(analyzer);
computeNumNodes();
}
public void computeTupleState(Analyzer analyzer) {
for (TupleId id : tupleIds) {
analyzer.getDescTbl().getTupleDesc(id).computeStat();
}
}
@Override
public void computeStats(Analyzer analyzer) throws UserException {
super.computeStats(analyzer);
if (cardinality > 0) {
avgRowSize = totalBytes / (float) cardinality * COMPRESSION_RATIO;
capCardinalityAtLimit();
}
// when node scan has no data, cardinality should be 0 instead of a invalid
// value after computeStats()
cardinality = cardinality == -1 ? 0 : cardinality;
// update statsDeriveResult for real statistics
// After statistics collection is complete, remove the logic
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
statsDeriveResult = new StatsDeriveResult(cardinality, statsDeriveResult.getSlotIdToColumnStats());
}
}
@Override
protected void computeNumNodes() {
if (cardinality > 0) {
numNodes = scanBackendIds.size();
}
// even current node scan has no data,at least on backend will be assigned when
// the fragment actually execute
numNodes = numNodes <= 0 ? 1 : numNodes;
}
private void computeInaccurateCardinality() throws UserException {
StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this);
cardinality = (long) statsDeriveResult.getRowCount();
}
private Collection<Long> partitionPrune(PartitionInfo partitionInfo,
PartitionNames partitionNames) throws AnalysisException {
PartitionPruner partitionPruner = null;
Map<Long, PartitionItem> keyItemMap;
if (partitionNames != null) {
keyItemMap = Maps.newHashMap();
for (String partName : partitionNames.getPartitionNames()) {
Partition partition = olapTable.getPartition(partName, partitionNames.isTemp());
if (partition == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_SUCH_PARTITION, partName);
}
keyItemMap.put(partition.getId(), partitionInfo.getItem(partition.getId()));
}
} else {
keyItemMap = partitionInfo.getIdToItem(false);
}
if (partitionInfo.getType() == PartitionType.RANGE) {
if (isPointQuery() && partitionInfo.getPartitionColumns().size() == 1) {
// short circuit, a quick path to find partition
ColumnRange filterRange = columnNameToRange.get(partitionInfo.getPartitionColumns().get(0).getName());
LiteralExpr lowerBound = filterRange.getRangeSet().get().asRanges().stream()
.findFirst().get().lowerEndpoint().getValue();
LiteralExpr upperBound = filterRange.getRangeSet().get().asRanges().stream()
.findFirst().get().upperEndpoint().getValue();
cachedPartitionPruner.update(keyItemMap);
return cachedPartitionPruner.prune(lowerBound, upperBound);
}
partitionPruner = new RangePartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(), columnNameToRange);
} else if (partitionInfo.getType() == PartitionType.LIST) {
partitionPruner = new ListPartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(),
columnNameToRange);
}
return partitionPruner.prune();
}
private Collection<Long> distributionPrune(
MaterializedIndex table,
DistributionInfo distributionInfo) throws AnalysisException {
DistributionPruner distributionPruner = null;
switch (distributionInfo.getType()) {
case HASH: {
HashDistributionInfo info = (HashDistributionInfo) distributionInfo;
distributionPruner = new HashDistributionPruner(table.getTabletIdsInOrder(),
info.getDistributionColumns(),
columnFilters,
info.getBucketNum(),
getSelectedIndexId() == olapTable.getBaseIndexId());
return distributionPruner.prune();
}
case RANDOM: {
return null;
}
default: {
return null;
}
}
}
// Update the visible version of the scan range locations. for cloud mode. called as the end of
// NereidsPlanner.splitFragments
public void updateScanRangeVersions(Map<Long, Long> visibleVersionMap) {
if (LOG.isDebugEnabled() && ConnectContext.get() != null) {
LOG.debug("query id: {}, selectedPartitionIds: {}, visibleVersionMap: {}",
DebugUtil.printId(ConnectContext.get().queryId()), selectedPartitionIds, visibleVersionMap);
}
Map<Long, TScanRangeLocations> locationsMap = scanRangeLocations.stream()
.collect(Collectors.toMap(loc -> loc.getScanRange().getPaloScanRange().getTabletId(), loc -> loc));
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
final List<Tablet> tablets = selectedTable.getTablets();
Long visibleVersion = visibleVersionMap.get(partitionId);
assert visibleVersion != null : "the acquried version is not exists in the visible version map";
String visibleVersionStr = String.valueOf(visibleVersion);
for (Tablet tablet : tablets) {
TScanRangeLocations locations = locationsMap.get(tablet.getId());
if (locations == null) {
continue;
}
TPaloScanRange scanRange = locations.getScanRange().getPaloScanRange();
scanRange.setVersion(visibleVersionStr);
}
}
this.maxVersion = visibleVersionMap.values().stream().max(Long::compareTo).orElse(0L);
}
public Long getTabletSingleReplicaSize(Long tabletId) {
return tabletBytes.get(tabletId);
}
public long getMaxVersion() {
return maxVersion;
}
// for non-cloud mode. for cloud mode see `updateScanRangeVersions`
private void addScanRangeLocations(Partition partition,
List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs) throws UserException {
long visibleVersion = Partition.PARTITION_INIT_VERSION;
// For cloud mode, set scan range visible version in Coordinator.exec so that we could
// assign a snapshot version of all partitions.
if (!(Config.isCloudMode() && Config.enable_cloud_snapshot_version)) {
visibleVersion = partition.getVisibleVersion();
}
maxVersion = Math.max(maxVersion, visibleVersion);
String visibleVersionStr = String.valueOf(visibleVersion);
int useFixReplica = -1;
boolean skipMissingVersion = false;
ConnectContext context = ConnectContext.get();
ComputeGroup computeGroup = null;
if (context != null) {
computeGroup = context.getComputeGroupSafely();
useFixReplica = context.getSessionVariable().useFixReplica;
if (useFixReplica == -1
&& context.getState().isNereids() && context.getSessionVariable().getEnableQueryCache()) {
useFixReplica = 0;
}
// if use_fix_replica is set to true, set skip_missing_version to false
skipMissingVersion = useFixReplica == -1 && context.getSessionVariable().skipMissingVersion;
if (LOG.isDebugEnabled()) {
LOG.debug("query id: {}, partition id:{} visibleVersion: {}",
DebugUtil.printId(context.queryId()), partition.getId(), visibleVersion);
}
}
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
if (skipMissingVersion) {
long tabletVersion = -1L;
for (Replica replica : tablet.getReplicas()) {
if (replica.getVersion() > tabletVersion) {
tabletVersion = replica.getVersion();
}
}
if (tabletVersion != visibleVersion) {
LOG.warn("tablet {} version {} is not equal to partition {} version {}",
tabletId, tabletVersion, partition.getId(), visibleVersion);
visibleVersion = tabletVersion;
visibleVersionStr = String.valueOf(visibleVersion);
maxVersion = Math.max(maxVersion, visibleVersion);
}
}
TScanRangeLocations locations = new TScanRangeLocations();
TPaloScanRange paloRange = new TPaloScanRange();
paloRange.setDbName("");
paloRange.setSchemaHash("0");
paloRange.setVersion(visibleVersionStr);
paloRange.setVersionHash("");
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
//
// ATTN: visibleVersion is not used in cloud mode, see CloudReplica.checkVersionCatchup
// for details.
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion,
backendAlivePathHashs, skipMissingVersion);
if (replicas.isEmpty()) {
if (context.getSessionVariable().skipBadTablet) {
continue;
}
LOG.warn("no queryable replica found in tablet {}. visible version {}", tabletId, visibleVersion);
StringBuilder sb = new StringBuilder(
"Failed to get scan range, no queryable replica found in tablet: " + tabletId);
if (Config.show_details_for_unaccessible_tablet) {
sb.append(". Reason: ").append(tablet.getDetailsStatusForQuery(visibleVersion));
}
if (LOG.isDebugEnabled()) {
LOG.debug(sb.toString());
}
throw new UserException(sb.toString());
}
if (useFixReplica <= -1) {
if (skipMissingVersion) {
// sort by replica's last success version, higher success version in the front.
replicas.sort(Replica.LAST_SUCCESS_VERSION_COMPARATOR);
} else {
Collections.shuffle(replicas);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("use fix replica, value: {}, replica count: {}", useFixReplica, replicas.size());
}
// sort by replica id
replicas.sort(Replica.ID_COMPARATOR);
Replica replica = replicas.get(useFixReplica >= replicas.size() ? replicas.size() - 1 : useFixReplica);
if (context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) {
long beId = replica.getBackendId();
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
// If the fixed replica is bad, then not clear the replicas using random replica
if (backend == null || !backend.isAlive()) {
if (LOG.isDebugEnabled()) {
LOG.debug("backend {} not exists or is not alive for replica {}", beId,
replica.getId());
}
Collections.shuffle(replicas);
} else {
replicas.clear();
replicas.add(replica);
}
} else {
replicas.clear();
replicas.add(replica);
}
}
if (isEnableCooldownReplicaAffinity()) {
final long coolDownReplicaId = tablet.getCooldownReplicaId();
// we prefer to query using cooldown replica to make sure the cache is fully utilized
// for example: consider there are 3BEs(A,B,C) and each has one replica for tablet X. and X
// is now under cooldown
// first time we choose BE A, and A will download data into cache while the other two's cache is empty
// second time we choose BE B, this time B will be cached, C is still empty
// third time we choose BE C, after this time all replica is cached
// but it means we will do 3 S3 IO to get the data which will bring 3 slow query
if (-1L != coolDownReplicaId) {
final Optional<Replica> replicaOptional = replicas.stream()
.filter(r -> r.getId() == coolDownReplicaId).findAny();
replicaOptional.ifPresent(
r -> {
Backend backend = Env.getCurrentSystemInfo()
.getBackend(r.getBackendIdWithoutException());
if (backend != null && backend.isAlive()) {
replicas.clear();
replicas.add(r);
}
}
);
}
}
boolean tabletIsNull = true;
boolean collectedStat = false;
boolean clusterException = false;
List<String> errs = Lists.newArrayList();
int replicaInTablet = 0;
long oneReplicaBytes = 0;
for (Replica replica : replicas) {
Backend backend = null;
long backendId = -1;
try {
backendId = replica.getBackendId();
backend = Env.getCurrentSystemInfo().getBackend(backendId);
} catch (ComputeGroupException e) {
LOG.warn("failed to get backend {} for replica {}", backendId, replica.getId(), e);
errs.add(e.toString());
clusterException = true;
continue;
}
if (backend == null || !backend.isAlive()) {
if (LOG.isDebugEnabled()) {
LOG.debug("backend {} not exists or is not alive for replica {}", backendId,
replica.getId());
}
String err = "replica " + replica.getId() + "'s backend " + backendId
+ (backend != null ? " with tag " + backend.getLocationTag() : "")
+ " does not exist or not alive";
errs.add(err);
continue;
}
if (!backend.isMixNode()) {
continue;
}
String beTagName = backend.getLocationTag().value;
if ((ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) || (computeGroup != null
&& !Config.isCloudMode() && !computeGroup.containsBackend(beTagName))) {
String err = String.format(
"Replica on backend %d with tag %s," + " which is not in user's resource tag: %s",
backend.getId(), beTagName, computeGroup.toString());
if (LOG.isDebugEnabled()) {
LOG.debug(err);
}
errs.add(err);
continue;
}
scanReplicaIds.add(replica.getId());
String ip = backend.getHost();
int port = backend.getBePort();
TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress(ip, port));
scanRangeLocation.setBackendId(backendId);
locations.addToLocations(scanRangeLocation);
paloRange.addToHosts(new TNetworkAddress(ip, port));
tabletIsNull = false;
// for CBO
if (!collectedStat && replica.getRowCount() != -1) {
long dataSize = replica.getDataSize();
if (replicaInTablet == 0) {
oneReplicaBytes = dataSize;
tabletBytes.put(tabletId, dataSize);
}
replicaInTablet++;
totalBytes += dataSize;
collectedStat = true;
}
scanBackendIds.add(backend.getId());
// For skipping missing version of tablet, we only select the backend with the highest last
// success version replica to save as much data as possible.
if (skipMissingVersion) {
break;
}
}
if (clusterException) {
throw new UserException("tablet " + tabletId + " err: " + Joiner.on(", ").join(errs));
}
if (tabletIsNull) {
throw new UserException("tablet " + tabletId + " has no queryable replicas. err: "
+ Joiner.on(", ").join(errs));
}
TScanRange scanRange = new TScanRange();
scanRange.setPaloScanRange(paloRange);
locations.setScanRange(scanRange);
Integer bucketSeq = tabletId2BucketSeq.get(tabletId);
bucketSeq2locations.put(bucketSeq, locations);
bucketSeq2Bytes.merge(bucketSeq, oneReplicaBytes, Long::sum);
scanRangeLocations.add(locations);
}
if (tablets.size() == 0) {
desc.setCardinality(0);
} else {
desc.setCardinality(cardinality);
}
}
private boolean isEnableCooldownReplicaAffinity() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null) {
return connectContext.getSessionVariable().isEnableCooldownReplicaAffinity();
}
return true;
}
private void computePartitionInfo() throws AnalysisException {
long start = System.currentTimeMillis();
// Step1: compute partition ids
PartitionNames partitionNames = ((BaseTableRef) desc.getRef()).getPartitionNames();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
selectedPartitionIds = partitionPrune(partitionInfo, partitionNames);
} else {
selectedPartitionIds = olapTable.getPartitionIds();
}
selectedPartitionIds = olapTable.selectNonEmptyPartitionIds(selectedPartitionIds);
selectedPartitionNum = selectedPartitionIds.size();
for (long id : selectedPartitionIds) {
Partition partition = olapTable.getPartition(id);
if (partition.getState() == PartitionState.RESTORE) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_PARTITION_STATE,
partition.getName(), "RESTORING");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("partition prune cost: {} ms, partitions: {}",
(System.currentTimeMillis() - start), selectedPartitionIds);
}
}
public void selectBestRollupByRollupSelector(Analyzer analyzer) throws UserException {
// Step2: select best rollup
long start = System.currentTimeMillis();
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
// This function is compatible with the INDEX selection logic of ROLLUP,
// so the Duplicate table here returns base index directly
// and the selection logic of materialized view is selected in
// "MaterializedViewSelector"
selectedIndexId = olapTable.getBaseIndexId();
if (LOG.isDebugEnabled()) {
LOG.debug("The best index will be selected later in mv selector");
}
return;
}
final RollupSelector rollupSelector = new RollupSelector(analyzer, desc, olapTable);
selectedIndexId = rollupSelector.selectBestRollup(selectedPartitionIds, conjuncts, isPreAggregation);
updateSlotUniqueId();
if (LOG.isDebugEnabled()) {
LOG.debug("select best roll up cost: {} ms, best index id: {}", (System.currentTimeMillis() - start),
selectedIndexId);
}
}
@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList();
if (selectedPartitionIds.size() == 0) {
desc.setCardinality(0);
return;
}
Preconditions.checkState(selectedIndexId != -1);
// compute tablet info by selected index id and selected partition ids
long start = System.currentTimeMillis();
computeSampleTabletIds();
computeTabletInfo();
if (LOG.isDebugEnabled()) {
LOG.debug("distribution prune cost: {} ms", (System.currentTimeMillis() - start));
}
}
public void setOutputColumnUniqueIds(Set<Integer> outputColumnUniqueIds) {
this.outputColumnUniqueIds = outputColumnUniqueIds;
}
/**
* Sample some tablets in the selected partition.
* If Seek is specified, the tablets sampled each time are the same.
*/
public void computeSampleTabletIds() {
if (tableSample == null) {
return;
}
OlapTable olapTable = (OlapTable) desc.getTable();
// 1. Calculate the total number of rows in the selected partition, and sort partition list.
long selectedRows = 0;
long totalSampleRows = 0;
List<Long> selectedPartitionList = new ArrayList<>();
Preconditions.checkState(selectedIndexId != -1);
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedIndex = partition.getIndex(selectedIndexId);
// selectedIndex is not expected to be null, because MaterializedIndex ids in one rollup's partitions
// are all same. skip this partition here.
if (selectedIndex != null) {
selectedRows += selectedIndex.getRowCount();
selectedPartitionList.add(partitionId);
}
}
selectedPartitionList.sort(Comparator.naturalOrder());
// 2.Sampling is not required in some cases, will not take effect after clear sampleTabletIds.
if (tableSample.isPercent()) {
if (tableSample.getSampleValue() >= 100) {
return;
}
totalSampleRows = (long) Math.max(selectedRows * (tableSample.getSampleValue() / 100.0), 1);
} else {
if (tableSample.getSampleValue() > selectedRows) {
return;
}
totalSampleRows = tableSample.getSampleValue();
}
// 3. Sampling partition. If Seek is specified, the partition will be the same for each sampling.
long hitRows = 0; // The number of rows hit by the tablet
Set<Long> hitTabletIds = Sets.newHashSet();
long partitionSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * selectedPartitionList.size());
for (int i = 0; i < selectedPartitionList.size(); i++) {
int seekPid = (int) ((i + partitionSeek) % selectedPartitionList.size());
final Partition partition = olapTable.getPartition(selectedPartitionList.get(seekPid));
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
List<Tablet> tablets = selectedTable.getTablets();
if (tablets.isEmpty()) {
continue;
}
// 4. Calculate the number of rows that need to be sampled in the current partition.
long sampleRows = 0; // The number of sample rows in partition
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(selectedTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = (long) Math.max(
tableSample.getSampleValue() * (selectedTable.getRowCount() / selectedRows), 1);
}
// 5. Sampling tablets. If Seek is specified, the same tablet will be sampled each time.
long tabletSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * tablets.size());
for (int j = 0; j < tablets.size(); j++) {
int seekTid = (int) ((j + tabletSeek) % tablets.size());
Tablet tablet = tablets.get(seekTid);
if (sampleTabletIds.size() != 0 && !sampleTabletIds.contains(tablet.getId())) {
// After PruneOlapScanTablet, sampleTabletIds.size() != 0,
// continue sampling only in sampleTabletIds.
// If it is percentage sample, the number of sampled rows is a percentage of the
// total number of rows, and It is not related to sampleTabletI after PruneOlapScanTablet.
continue;
}
long tabletRowCount;
if (!FeConstants.runningUnitTest) {
tabletRowCount = tablet.getRowCount(true);
} else {
tabletRowCount = selectedTable.getRowCount() / tablets.size();
}
if (tabletRowCount == 0) {
continue;
}
hitTabletIds.add(tablet.getId());
sampleRows -= tabletRowCount;
hitRows += tabletRowCount;
if (sampleRows <= 0) {
break;
}
}
if (hitRows > totalSampleRows) {
break;
}
}
if (sampleTabletIds.size() != 0) {
sampleTabletIds.retainAll(hitTabletIds);
if (LOG.isDebugEnabled()) {
LOG.debug("after computeSampleTabletIds, hitRows {}, totalRows {}, selectedTablets {}, sampleRows {}",
hitRows, selectedRows, sampleTabletIds.size(), totalSampleRows);
}
} else {
sampleTabletIds = hitTabletIds;
if (LOG.isDebugEnabled()) {
LOG.debug("after computeSampleTabletIds, hitRows {}, selectedRows {}, sampleRows {}",
hitRows, selectedRows, totalSampleRows);
}
}
}
public boolean isPointQuery() {
return ConnectContext.get().getStatementContext().isShortCircuitQuery();
}
private void computeTabletInfo() throws UserException {
/**
* The tablet info could be computed only once.
* So the scanBackendIds should be empty in the beginning.
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream()
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
}
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
final List<Tablet> tablets = Lists.newArrayList();
Collection<Long> tabletIds = distributionPrune(selectedTable, partition.getDistributionInfo());
if (LOG.isDebugEnabled()) {
LOG.debug("distribution prune tablets: {}", tabletIds);
}
if (sampleTabletIds.size() != 0) {
if (tabletIds != null) {
tabletIds.retainAll(sampleTabletIds);
} else {
tabletIds = sampleTabletIds;
}
if (LOG.isDebugEnabled()) {
LOG.debug("after sample tablets: {}", tabletIds);
}
}
List<Long> allTabletIds = selectedTable.getTabletIdsInOrder();
if (tabletIds != null) {
for (Long id : tabletIds) {
if (selectedTable.getTablet(id) != null) {
tablets.add(selectedTable.getTablet(id));
scanTabletIds.add(id);
} else {
// The tabletID specified in query does not exist in this partition, skip scan partition.
Preconditions.checkState(sampleTabletIds.size() != 0);
}
}
} else {
tablets.addAll(selectedTable.getTablets());
scanTabletIds.addAll(allTabletIds);
}
for (int i = 0; i < allTabletIds.size(); i++) {
tabletId2BucketSeq.put(allTabletIds.get(i), i);
}
totalTabletsNum += selectedTable.getTablets().size();
selectedSplitNum += tablets.size();
addScanRangeLocations(partition, tablets, backendAlivePathHashs);
}
}
/**
* Check Parent sort node can push down to child olap scan.
*/
public boolean checkPushSort(SortNode sortNode) {
// Ensure limit is less then threshold
if (sortNode.getLimit() <= 0
|| sortNode.getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
return false;
}
// Ensure all isAscOrder is same, ande length != 0.
// Can't be zorder.
if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1
|| olapTable.isZOrderSort()) {
return false;
}
// Tablet's order by key only can be the front part of schema.
// Like: schema: a.b.c.d.e.f.g order by key: a.b.c (no a,b,d)
// Do **prefix match** to check if order by key can be pushed down.
// olap order by key: a.b.c.d
// sort key: (a) (a,b) (a,b,c) (a,b,c,d) is ok
// (a,c) (a,c,d), (a,c,b) (a,c,f) (a,b,c,d,e)is NOT ok
List<Expr> sortExprs = sortNode.getSortInfo().getOrigOrderingExprs();
List<Boolean> nullsFirsts = sortNode.getSortInfo().getNullsFirst();
List<Boolean> isAscOrders = sortNode.getSortInfo().getIsAscOrder();
if (sortExprs.size() > olapTable.getDataSortInfo().getColNum()) {
return false;
}
for (int i = 0; i < sortExprs.size(); i++) {
// table key.
Column tableKey = olapTable.getFullSchema().get(i);
// sort slot.
Expr sortExpr = sortExprs.get(i);
if (sortExpr instanceof SlotRef) {
SlotRef slotRef = (SlotRef) sortExpr;
if (tableKey.equals(slotRef.getColumn())) {
// [ORDER BY DESC NULLS FIRST] or [ORDER BY ASC NULLS LAST] can not be optimized
// to only read file tail, since NULLS is at file head but data is at tail
if (tableKey.isAllowNull() && (nullsFirsts.get(i) != isAscOrders.get(i))) {
return false;
}
} else {
return false;
}
} else {
return false;
}
}
return true;
}
/**
* We query Palo Meta to get request's data location
* extra result info will pass to backend ScanNode
*/
@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return scanRangeLocations;
}
// Only called when Coordinator exec in high performance point query
public List<TScanRangeLocations> lazyEvaluateRangeLocations() throws UserException {
// Lazy evaluation
selectedIndexId = olapTable.getBaseIndexId();
// Only key columns
computeColumnsFilter(olapTable.getBaseSchemaKeyColumns(), olapTable.getPartitionInfo());
computePartitionInfo();
scanBackendIds.clear();
scanTabletIds.clear();
bucketSeq2locations.clear();
scanReplicaIds.clear();
sampleTabletIds.clear();
try {
createScanRangeLocations();
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
return scanRangeLocations;
}
public void setDescTable(DescriptorTable descTable) {
this.descTable = descTable;
}
public DescriptorTable getDescTable() {
return this.descTable;
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
long selectedIndexIdForExplain = selectedIndexId;
if (selectedIndexIdForExplain == -1) {
// If there is no data in table, the selectedIndexId will be -1, set it to base index id,
// so that to avoid "null" in explain result.
selectedIndexIdForExplain = olapTable.getBaseIndexId();
}
String indexName = olapTable.getIndexNameById(selectedIndexIdForExplain);
output.append(prefix).append("TABLE: ").append(olapTable.getQualifiedName())
.append("(").append(indexName).append(")");
if (detailLevel == TExplainLevel.BRIEF) {
output.append("\n").append(prefix).append(String.format("cardinality=%,d", cardinality));
if (cardinalityAfterFilter != -1) {
output.append("\n").append(prefix).append(String.format("afterFilter=%,d", cardinalityAfterFilter));
}
if (!runtimeFilters.isEmpty()) {
output.append("\n").append(prefix).append("Apply RFs: ");
output.append(getRuntimeFilterExplainString(false, true));
}
if (!conjuncts.isEmpty()) {
output.append("\n").append(prefix).append("PREDICATES: ").append(conjuncts.size()).append("\n");
}
return output.toString();
}
if (isPreAggregation) {
output.append(", PREAGGREGATION: ON");
} else {
output.append(", PREAGGREGATION: OFF. Reason: ").append(reasonOfPreAggregation);
}
output.append("\n");
if (sortColumn != null) {
output.append(prefix).append("SORT COLUMN: ").append(sortColumn).append("\n");
}
if (sortInfo != null) {
output.append(prefix).append("SORT INFO:\n");
sortInfo.getMaterializedOrderingExprs().forEach(expr -> {
output.append(prefix).append(prefix).append(expr.toSql()).append("\n");
});
}
if (sortLimit != -1) {
output.append(prefix).append("SORT LIMIT: ").append(sortLimit).append("\n");
}
if (useTopnFilter()) {
String topnFilterSources = String.join(",",
topnFilterSortNodes.stream()
.map(node -> node.getId().asInt() + "").collect(Collectors.toList()));
output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n");
}
if (!conjuncts.isEmpty()) {
Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n");
}
if (!runtimeFilters.isEmpty()) {
output.append(prefix).append("runtime filters: ");
output.append(getRuntimeFilterExplainString(false));
}
String selectedPartitions = getSelectedPartitionIds().stream().sorted()
.map(id -> olapTable.getPartition(id).getName())
.collect(Collectors.joining(","));
output.append(prefix).append(String.format("partitions=%s/%s (%s)", selectedPartitionNum,
olapTable.getPartitions().size(), selectedPartitions)).append("\n");
output.append(prefix).append(String.format("tablets=%s/%s", selectedSplitNum, totalTabletsNum));
// We print up to 3 tablet, and we print "..." if the number is more than 3
if (scanTabletIds.size() > 3) {
List<Long> firstTenTabletIds = scanTabletIds.subList(0, 3);
output.append(String.format(", tabletList=%s ...", Joiner.on(",").join(firstTenTabletIds)));
} else {
output.append(String.format(", tabletList=%s", Joiner.on(",").join(scanTabletIds)));
}
output.append("\n");
output.append(prefix).append(String.format("cardinality=%s", cardinality))
.append(String.format(", avgRowSize=%s", avgRowSize)).append(String.format(", numNodes=%s", numNodes));
output.append("\n");
if (pushDownAggNoGroupingOp != null) {
output.append(prefix).append("pushAggOp=").append(pushDownAggNoGroupingOp).append("\n");
}
if (isPointQuery()) {
output.append(prefix).append("SHORT-CIRCUIT\n");
}
if (!CollectionUtils.isEmpty(rewrittenProjectList)) {
output.append(prefix).append("rewrittenProjectList: ").append(
getExplainString(rewrittenProjectList)).append("\n");
}
output.append(prefix).append("desc: ").append(desc.getId().asInt()).append("\n");
return output.toString();
}
@Override
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel instance.
// so here we need count distinct be_num to do the work. make sure get right instance
if (ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
return scanRangeLocations.size();
}
@Override
public void setShouldColoScan() {
shouldColoScan = true;
}
@Override
public boolean getShouldColoScan() {
return shouldColoScan;
}
public int getBucketNum() {
// In bucket shuffle join, we have 2 situation.
// 1. Only one partition: in this case, we use scanNode.getTotalTabletsNum() to get the right bucket num
// because when table turn on dynamic partition, the bucket number in default distribution info
// is not correct.
// 2. Table is colocated: in this case, table could have more than one partition, but all partition's
// bucket number must be same, so we use default bucket num is ok.
if (olapTable.isColocateTable()) {
return olapTable.getDefaultDistributionInfo().getBucketNum();
} else {
return (int) totalTabletsNum;
}
}
@Override
// If scan is key search, should not enable the shared scan opt to prevent the performance problem
// 1. where contain the eq or in expr of key column slot
// 2. key column slot is distribution column and first column
// FIXME: this is not a good check, we can not guarantee that the predicate we check can truly
// help to prune the data, so we should check the predicate's effect on the data.
protected boolean isKeySearch() {
List<SlotRef> whereSlot = Lists.newArrayList();
for (Expr conjunct : conjuncts) {
if (conjunct instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) conjunct;
if (binaryPredicate.getOp().isEquivalence()) {
if (binaryPredicate.getChild(0) instanceof SlotRef) {
whereSlot.add((SlotRef) binaryPredicate.getChild(0));
}
if (binaryPredicate.getChild(1) instanceof SlotRef) {
whereSlot.add((SlotRef) binaryPredicate.getChild(1));
}
}
}
if (conjunct instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) conjunct;
if (!inPredicate.isNotIn()) {
if (inPredicate.getChild(0) instanceof SlotRef) {
whereSlot.add((SlotRef) inPredicate.getChild(0));
}
if (inPredicate.getChild(1) instanceof SlotRef) {
whereSlot.add((SlotRef) inPredicate.getChild(1));
}
}
}
}
for (SlotRef slotRef : whereSlot) {
String columnName = slotRef.getDesc().getColumn().getName().toLowerCase();
if (olapTable != null) {
if (olapTable.getDistributionColumnNames().contains(columnName)
&& olapTable.getBaseSchema().get(0).getName().toLowerCase().equals(columnName)) {
return true;
}
}
}
return false;
}
@Override
protected void toThrift(TPlanNode msg) {
List<String> keyColumnNames = new ArrayList<String>();
List<TPrimitiveType> keyColumnTypes = new ArrayList<TPrimitiveType>();
List<TColumn> columnsDesc = new ArrayList<TColumn>();
List<TOlapTableIndex> indexDesc = Lists.newArrayList();
if (isTopnLazyMaterialize) {
Set<String> materializedColumnNames = topnLazyMaterializeOutputColumns.stream()
.map(Column::getName).collect(Collectors.toSet());
olapTable.getColumnDesc(selectedIndexId, columnsDesc, keyColumnNames, keyColumnTypes,
materializedColumnNames);
TColumn tColumn = globalRowIdColumn.toThrift();
tColumn.setColumnType(ScalarType.createStringType().toColumnTypeThrift());
tColumn.setAggregationType(AggregateType.REPLACE.toThrift());
tColumn.setIsKey(false);
tColumn.setIsAllowNull(false);
// keep compatibility
tColumn.setVisible(false);
tColumn.setColUniqueId(Integer.MAX_VALUE);
columnsDesc.add(tColumn);
} else {
olapTable.getColumnDesc(selectedIndexId, columnsDesc, keyColumnNames, keyColumnTypes);
// Add extra row id column
ArrayList<SlotDescriptor> slots = desc.getSlots();
Column lastColumn = slots.get(slots.size() - 1).getColumn();
if (lastColumn != null && lastColumn.getName().equalsIgnoreCase(Column.ROWID_COL)) {
TColumn tColumn = new TColumn();
tColumn.setColumnName(Column.ROWID_COL);
tColumn.setColumnType(ScalarType.createStringType().toColumnTypeThrift());
tColumn.setAggregationType(AggregateType.REPLACE.toThrift());
tColumn.setIsKey(false);
tColumn.setIsAllowNull(false);
// keep compatibility
tColumn.setVisible(false);
tColumn.setColUniqueId(Integer.MAX_VALUE);
columnsDesc.add(tColumn);
}
}
for (Index index : olapTable.getIndexes()) {
TOlapTableIndex tIndex = index.toThrift(index.getColumnUniqueIds(olapTable.getBaseSchema()));
indexDesc.add(tIndex);
}
msg.node_type = TPlanNodeType.OLAP_SCAN_NODE;
if (olapTable.getBaseSchema().stream().anyMatch(Column::isClusterKey)) {
keyColumnNames.clear();
keyColumnTypes.clear();
}
msg.olap_scan_node = new TOlapScanNode(desc.getId().asInt(), keyColumnNames, keyColumnTypes, isPreAggregation);
msg.olap_scan_node.setColumnsDesc(columnsDesc);
msg.olap_scan_node.setIndexesDesc(indexDesc);
if (selectedIndexId != -1) {
msg.olap_scan_node.setSchemaVersion(olapTable.getIndexSchemaVersion(selectedIndexId));
}
if (null != sortColumn) {
msg.olap_scan_node.setSortColumn(sortColumn);
}
if (sortInfo != null) {
TSortInfo tSortInfo = new TSortInfo(
Expr.treesToThrift(sortInfo.getOrderingExprs()),
sortInfo.getIsAscOrder(),
sortInfo.getNullsFirst());
if (sortInfo.getSortTupleSlotExprs() != null) {
tSortInfo.setSortTupleSlotExprs(Expr.treesToThrift(sortInfo.getSortTupleSlotExprs()));
}
msg.olap_scan_node.setSortInfo(tSortInfo);
}
if (sortLimit != -1) {
msg.olap_scan_node.setSortLimit(sortLimit);
}
msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
String tableName = olapTable.getName();
if (selectedIndexId != -1) {
tableName = tableName + "(" + getSelectedIndexName() + ")";
}
msg.olap_scan_node.setTableName(tableName);
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
// In TOlapScanNode , pushDownAggNoGroupingOp field is deprecated.
if (outputColumnUniqueIds != null) {
msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
}
msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds));
super.toThrift(msg);
}
@Override
public void normalize(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) {
TNormalizedOlapScanNode normalizedOlapScanNode = new TNormalizedOlapScanNode();
normalizedOlapScanNode.setTableId(olapTable.getId());
long selectIndexId = selectedIndexId == -1 ? olapTable.getBaseIndexId() : selectedIndexId;
normalizedOlapScanNode.setIndexId(selectIndexId);
normalizedOlapScanNode.setIsPreaggregation(isPreAggregation);
normalizedOlapScanNode.setSortColumn(sortColumn);
normalizedOlapScanNode.setRollupName(olapTable.getIndexNameById(selectIndexId));
normalizeSchema(normalizedOlapScanNode);
normalizeSelectColumns(normalizedOlapScanNode, normalizer);
normalizedPlan.setNodeType(TPlanNodeType.OLAP_SCAN_NODE);
normalizedPlan.setOlapScanNode(normalizedOlapScanNode);
}
private void normalizeSelectColumns(TNormalizedOlapScanNode normalizedOlapScanNode, Normalizer normalizer) {
List<SlotDescriptor> slots = tupleIds
.stream()
.flatMap(tupleId -> normalizer.getDescriptorTable().getTupleDesc(tupleId).getSlots().stream())
.collect(Collectors.toList());
List<Pair<SlotId, String>> selectColumns = slots.stream()
.map(slot -> Pair.of(slot.getId(), slot.getColumn().getName()))
.collect(Collectors.toList());
for (Column partitionColumn : olapTable.getPartitionInfo().getPartitionColumns()) {
boolean selectPartitionColumn = false;
String partitionColumnName = partitionColumn.getName();
for (Pair<SlotId, String> selectColumn : selectColumns) {
if (selectColumn.second.equalsIgnoreCase(partitionColumnName)) {
selectPartitionColumn = true;
break;
}
}
if (!selectPartitionColumn) {
selectColumns.add(Pair.of(new SlotId(-1), partitionColumnName));
}
}
selectColumns.sort(Comparator.comparing(Pair::value));
for (Pair<SlotId, String> selectColumn : selectColumns) {
normalizer.normalizeSlotId(selectColumn.first.asInt());
}
normalizedOlapScanNode.setSelectColumns(
selectColumns.stream().map(Pair::value).collect(Collectors.toList())
);
}
private void normalizeSchema(TNormalizedOlapScanNode normalizedOlapScanNode) {
List<Column> columns = selectedIndexId == -1
? olapTable.getBaseSchema() : olapTable.getSchemaByIndexId(selectedIndexId);
List<Column> keyColumns = columns.stream().filter(Column::isKey).collect(Collectors.toList());
normalizedOlapScanNode.setKeyColumnNames(
keyColumns.stream()
.map(Column::getName)
.collect(Collectors.toList())
);
normalizedOlapScanNode.setKeyColumnTypes(
keyColumns.stream()
.map(column -> column.getDataType().toThrift())
.collect(Collectors.toList())
);
}
@Override
protected void normalizeConjuncts(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) {
List<Expr> normalizedPredicates = new PartitionRangePredicateNormalizer(normalizer, this)
.normalize();
List<TExpr> normalizedConjuncts = normalizeExprs(normalizedPredicates, normalizer);
normalizedPlan.setConjuncts(normalizedConjuncts);
}
@Override
protected void normalizeProjects(TNormalizedPlanNode normalizedPlanNode, Normalizer normalizer) {
List<SlotDescriptor> outputSlots =
getOutputTupleIds()
.stream()
.flatMap(tupleId -> normalizer.getDescriptorTable().getTupleDesc(tupleId).getSlots().stream())
.collect(Collectors.toList());
List<Expr> projectList = this.projectList;
if (projectList == null) {
projectList = outputSlots.stream().map(SlotRef::new).collect(Collectors.toList());
}
List<TExpr> projectThrift = normalizeProjects(outputSlots, projectList, normalizer);
normalizedPlanNode.setProjects(projectThrift);
}
public void collectColumns(Analyzer analyzer, Set<String> equivalenceColumns, Set<String> unequivalenceColumns) {
// 1. Get columns which has predicate on it.
for (Expr expr : conjuncts) {
if (!isPredicateUsedForPrefixIndex(expr, false)) {
continue;
}
for (SlotDescriptor slot : desc.getMaterializedSlots()) {
if (expr.isBound(slot.getId())) {
if (!isEquivalenceExpr(expr)) {
unequivalenceColumns.add(slot.getColumn().getName());
} else {
equivalenceColumns.add(slot.getColumn().getName());
}
break;
}
}
}
// 2. Equal join predicates when pushing inner child.
List<Expr> eqJoinPredicate = analyzer.getEqJoinConjuncts(desc.getId());
for (Expr expr : eqJoinPredicate) {
if (!isPredicateUsedForPrefixIndex(expr, true)) {
continue;
}
for (SlotDescriptor slot : desc.getMaterializedSlots()) {
Preconditions.checkState(expr.getChildren().size() == 2);
for (Expr child : expr.getChildren()) {
if (child.isBound(slot.getId())) {
equivalenceColumns.add(slot.getColumn().getName());
break;
}
}
}
}
}
private void analyzerPartitionExpr(Analyzer analyzer, PartitionInfo partitionInfo) throws AnalysisException {
ArrayList<Expr> exprs = partitionInfo.getPartitionExprs();
for (Expr e : exprs) {
e.analyze(analyzer);
}
}
public TupleId getTupleId() {
Preconditions.checkNotNull(desc);
return desc.getId();
}
private boolean isEquivalenceExpr(Expr expr) {
if (expr instanceof InPredicate) {
return true;
}
if (expr instanceof BinaryPredicate) {
final BinaryPredicate predicate = (BinaryPredicate) expr;
if (predicate.getOp().isEquivalence()) {
return true;
}
}
return false;
}
private boolean isPredicateUsedForPrefixIndex(Expr expr, boolean isJoinConjunct) {
if (!(expr instanceof InPredicate)
&& !(expr instanceof BinaryPredicate)) {
return false;
}
if (expr instanceof InPredicate) {
return isInPredicateUsedForPrefixIndex((InPredicate) expr);
} else if (expr instanceof BinaryPredicate) {
if (isJoinConjunct) {
return isEqualJoinConjunctUsedForPrefixIndex((BinaryPredicate) expr);
} else {
return isBinaryPredicateUsedForPrefixIndex((BinaryPredicate) expr);
}
}
return true;
}
private boolean isEqualJoinConjunctUsedForPrefixIndex(BinaryPredicate expr) {
Preconditions.checkArgument(expr.getOp().isEquivalence());
if (expr.isAuxExpr()) {
return false;
}
for (Expr child : expr.getChildren()) {
for (SlotDescriptor slot : desc.getMaterializedSlots()) {
if (child.isBound(slot.getId()) && isSlotRefNested(child)) {
return true;
}
}
}
return false;
}
private boolean isBinaryPredicateUsedForPrefixIndex(BinaryPredicate expr) {
if (expr.isAuxExpr() || expr.getOp().isUnequivalence()) {
return false;
}
return (isSlotRefNested(expr.getChild(0)) && expr.getChild(1).isConstant())
|| (isSlotRefNested(expr.getChild(1)) && expr.getChild(0).isConstant());
}
private boolean isInPredicateUsedForPrefixIndex(InPredicate expr) {
if (expr.isNotIn()) {
return false;
}
return isSlotRefNested(expr.getChild(0)) && expr.isLiteralChildren();
}
private boolean isSlotRefNested(Expr expr) {
while (expr instanceof CastExpr) {
expr = expr.getChild(0);
}
return expr instanceof SlotRef;
}
private void filterDeletedRows(Analyzer analyzer) throws AnalysisException {
if (!Util.showHiddenColumns() && olapTable.hasDeleteSign() && !ConnectContext.get().getSessionVariable()
.skipDeleteSign()) {
SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN);
deleteSignSlot.analyze(analyzer);
deleteSignSlot.getDesc().setIsMaterialized(true);
Expr conjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, deleteSignSlot, new IntLiteral(0));
conjunct.analyze(analyzer);
conjuncts.add(conjunct);
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
closePreAggregation(Column.DELETE_SIGN + " is used as conjuncts.");
}
}
}
/*
* Although sometimes the scan range only involves one instance,
* the data distribution cannot be set to UNPARTITIONED here.
* The reason is that @coordinator will not set the scan range for the fragment,
* when data partition of fragment is UNPARTITIONED.
*/
public DataPartition constructInputPartitionByDistributionInfo() throws UserException {
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
if ((colocateTableIndex.isColocateTable(olapTable.getId())
&& !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId())))
|| olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED
|| olapTable.getPartitions().size() == 1) {
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
if (!(distributionInfo instanceof HashDistributionInfo)) {
return DataPartition.RANDOM;
}
List<Column> distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
List<Expr> dataDistributeExprs = Lists.newArrayList();
for (Column column : distributeColumns) {
SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName());
dataDistributeExprs.add(slotRef);
}
return DataPartition.hashPartitioned(dataDistributeExprs);
} else {
return DataPartition.RANDOM;
}
}
@VisibleForTesting
public String getReasonOfPreAggregation() {
return reasonOfPreAggregation;
}
@VisibleForTesting
public String getSelectedIndexName() {
return olapTable.getIndexNameById(selectedIndexId);
}
@Override
public void finalizeForNereids() {
computeNumNodes();
computeStatsForNereids();
}
private void computeStatsForNereids() {
if (cardinality > 0 && avgRowSize <= 0) {
avgRowSize = totalBytes / (float) cardinality * COMPRESSION_RATIO;
capCardinalityAtLimit();
}
// when node scan has no data, cardinality should be 0 instead of an invalid
// value after computeStats()
cardinality = cardinality == -1 ? 0 : cardinality;
}
Set<String> getDistributionColumnNames() {
return olapTable != null
? olapTable.getDistributionColumnNames()
: Sets.newTreeSet();
}
/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) {
outputColumnUniqueIds.clear();
for (SlotDescriptor slot : context.getTupleDesc(this.getTupleId()).getSlots()) {
if (requiredByProjectSlotIdSet.contains(slot.getId()) && slot.getColumn() != null) {
outputColumnUniqueIds.add(slot.getColumn().getUniqueId());
}
}
}
@Override
public StatsDelta genStatsDelta() throws AnalysisException {
return new StatsDelta(Env.getCurrentEnv().getCurrentCatalog().getId(),
Env.getCurrentEnv().getCurrentCatalog().getDbOrAnalysisException(
olapTable.getQualifiedDbName()).getId(),
olapTable.getId(), selectedIndexId == -1 ? olapTable.getBaseIndexId() : selectedIndexId,
scanReplicaIds);
}
@Override
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
KeysType type = getOlapTable().getKeysType();
if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) {
return false;
}
String aggFunctionName = aggExpr.getFnName().getFunction();
if (aggFunctionName.equalsIgnoreCase("COUNT") && type != KeysType.DUP_KEYS) {
return false;
}
return true;
}
@Override
public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) {
KeysType type = getOlapTable().getKeysType();
// The value column of the agg does not support zone_map index.
if (type == KeysType.AGG_KEYS && !col.isKey()) {
return false;
}
if (aggExpr.getChild(0) instanceof SlotRef) {
SlotRef slot = (SlotRef) aggExpr.getChild(0);
if (CreateMaterializedViewStmt.isMVColumn(slot.getColumnName()) && slot.getColumn().isAggregated()) {
return false;
}
}
return true;
}
@Override
public int getScanRangeNum() {
return getScanTabletIds().size();
}
public boolean isTopnLazyMaterialize() {
return isTopnLazyMaterialize;
}
public void setIsTopnLazyMaterialize(boolean isTopnLazyMaterialize) {
this.isTopnLazyMaterialize = isTopnLazyMaterialize;
}
public void addTopnLazyMaterializeOutputColumns(Column column) {
this.topnLazyMaterializeOutputColumns.add(column);
}
public List<Column> getTopnLazyMaterializeOutputColumns() {
return topnLazyMaterializeOutputColumns;
}
public Column getGlobalRowIdColumn() {
return globalRowIdColumn;
}
public void setGlobalRowIdColumn(Column globalRowIdColumn) {
this.globalRowIdColumn = globalRowIdColumn;
}
}