AbstractMaterializedViewRule.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.constraint.TableIdentifier;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Id;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
import org.apache.doris.nereids.rules.exploration.mv.Predicates.ExpressionInfo;
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PartitionRemover;
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.expression.rules.FoldConstantRuleOnFE;
import org.apache.doris.nereids.rules.rewrite.MergeProjects;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable;
import org.apache.doris.nereids.trees.expressions.literal.DateLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.types.VariantType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.TypeUtils;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* The abstract class for all materialized view rules
*/
public abstract class AbstractMaterializedViewRule implements ExplorationRuleFactory {
public static final Logger LOG = LogManager.getLogger(AbstractMaterializedViewRule.class);
public static final Set<JoinType> SUPPORTED_JOIN_TYPE_SET = ImmutableSet.of(
JoinType.INNER_JOIN,
JoinType.LEFT_OUTER_JOIN,
JoinType.RIGHT_OUTER_JOIN,
JoinType.FULL_OUTER_JOIN,
JoinType.LEFT_SEMI_JOIN,
JoinType.RIGHT_SEMI_JOIN,
JoinType.LEFT_ANTI_JOIN,
JoinType.RIGHT_ANTI_JOIN,
JoinType.NULL_AWARE_LEFT_ANTI_JOIN);
/**
* The abstract template method for query rewrite, it contains the main logic, try to rewrite query by
* multi materialization every time. if exception it will catch the exception and record it to
* materialization context.
*/
public List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
List<Plan> rewrittenPlans = new ArrayList<>();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
// if available materialization list is empty, bail out
StatementContext statementContext = cascadesContext.getStatementContext();
if (cascadesContext.getMaterializationContexts().isEmpty()) {
return rewrittenPlans;
}
if (statementContext.getMaterializedViewRewriteDuration()
> sessionVariable.materializedViewRewriteDurationThresholdMs) {
LOG.warn("materialized view rewrite duration is exceeded, the query queryId is {}",
cascadesContext.getConnectContext().getQueryIdentifier());
MaterializationContext.makeFailWithDurationExceeded(queryPlan, cascadesContext.getMaterializationContexts(),
statementContext.getMaterializedViewRewriteDuration());
return rewrittenPlans;
}
for (MaterializationContext context : cascadesContext.getMaterializationContexts()) {
statementContext.getMaterializedViewStopwatch().reset().start();
if (checkIfRewritten(queryPlan, context)) {
continue;
}
// check mv plan is valid or not
if (!isMaterializationValid(queryPlan, cascadesContext, context)) {
continue;
}
// get query struct infos according to the view strut info, if valid query struct infos is empty, bail out
List<StructInfo> queryStructInfos = getValidQueryStructInfos(queryPlan, cascadesContext,
context.getStructInfo().getTableBitSet());
if (queryStructInfos.isEmpty()) {
continue;
}
statementContext.addMaterializedViewRewriteDuration(
statementContext.getMaterializedViewStopwatch().elapsed(TimeUnit.MILLISECONDS));
for (StructInfo queryStructInfo : queryStructInfos) {
statementContext.getMaterializedViewStopwatch().reset().start();
if (statementContext.getMaterializedViewRewriteDuration()
> sessionVariable.materializedViewRewriteDurationThresholdMs) {
statementContext.getMaterializedViewStopwatch().stop();
LOG.warn("materialized view rewrite duration is exceeded, the queryId is {}",
cascadesContext.getConnectContext().getQueryIdentifier());
MaterializationContext.makeFailWithDurationExceeded(queryStructInfo.getOriginalPlan(),
cascadesContext.getMaterializationContexts(),
statementContext.getMaterializedViewRewriteDuration());
return rewrittenPlans;
}
try {
if (rewrittenPlans.size() < sessionVariable.getMaterializedViewRewriteSuccessCandidateNum()) {
rewrittenPlans.addAll(doRewrite(queryStructInfo, cascadesContext, context));
}
} catch (Exception exception) {
LOG.warn("Materialized view rule exec fail", exception);
context.recordFailReason(queryStructInfo,
"Materialized view rule exec fail", exception::toString);
} finally {
statementContext.addMaterializedViewRewriteDuration(
statementContext.getMaterializedViewStopwatch().elapsed(TimeUnit.MILLISECONDS));
}
}
}
return rewrittenPlans;
}
/**
* Get valid query struct infos, if invalid record the invalid reason
*/
protected List<StructInfo> getValidQueryStructInfos(Plan queryPlan, CascadesContext cascadesContext,
BitSet materializedViewTableSet) {
List<StructInfo> validStructInfos = new ArrayList<>();
// For every materialized view we should trigger refreshing struct info map
List<StructInfo> uncheckedStructInfos = MaterializedViewUtils.extractStructInfo(queryPlan, queryPlan,
cascadesContext, materializedViewTableSet);
uncheckedStructInfos.forEach(queryStructInfo -> {
boolean valid = checkQueryPattern(queryStructInfo, cascadesContext) && queryStructInfo.isValid();
if (!valid) {
cascadesContext.getMaterializationContexts().forEach(ctx ->
ctx.recordFailReason(queryStructInfo, "Query struct info is invalid",
() -> String.format("query table bitmap is %s, plan is %s",
queryStructInfo.getTableBitSet(), queryPlan.treeString())
));
} else {
validStructInfos.add(queryStructInfo);
}
});
return validStructInfos;
}
/**
* The abstract template method for query rewrite, it contains the main logic, try to rewrite query by
* only one materialization every time. Different query pattern should override the sub logic.
*/
protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext cascadesContext,
MaterializationContext materializationContext) {
List<Plan> rewriteResults = new ArrayList<>();
StructInfo viewStructInfo = materializationContext.getStructInfo();
MatchMode matchMode = decideMatchMode(queryStructInfo.getRelations(), viewStructInfo.getRelations());
if (MatchMode.COMPLETE != matchMode) {
materializationContext.recordFailReason(queryStructInfo, "Match mode is invalid",
() -> String.format("matchMode is %s", matchMode));
return rewriteResults;
}
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
int materializedViewRelationMappingMaxCount = sessionVariable.getMaterializedViewRelationMappingMaxCount();
List<RelationMapping> queryToViewTableMappings = RelationMapping.generate(queryStructInfo.getRelations(),
viewStructInfo.getRelations(), materializedViewRelationMappingMaxCount);
// if any relation in query and view can not map, bail out.
if (queryToViewTableMappings == null) {
materializationContext.recordFailReason(queryStructInfo,
"Query to view table mapping is null", () -> "");
return rewriteResults;
}
for (RelationMapping queryToViewTableMapping : queryToViewTableMappings) {
SlotMapping queryToViewSlotMapping =
materializationContext.getSlotMappingFromCache(queryToViewTableMapping);
if (queryToViewSlotMapping == null) {
queryToViewSlotMapping = SlotMapping.generate(queryToViewTableMapping);
materializationContext.addSlotMappingToCache(queryToViewTableMapping, queryToViewSlotMapping);
}
if (queryToViewSlotMapping == null) {
materializationContext.recordFailReason(queryStructInfo,
"Query to view slot mapping is null", () ->
String.format("queryToViewTableMapping relation mapping is %s",
queryToViewTableMapping));
continue;
}
SlotMapping viewToQuerySlotMapping = queryToViewSlotMapping.inverse();
LogicalCompatibilityContext compatibilityContext = LogicalCompatibilityContext.from(
queryToViewTableMapping, viewToQuerySlotMapping, queryStructInfo, viewStructInfo);
ComparisonResult comparisonResult = StructInfo.isGraphLogicalEquals(queryStructInfo, viewStructInfo,
compatibilityContext);
if (comparisonResult.isInvalid()) {
materializationContext.recordFailReason(queryStructInfo,
"The graph logic between query and view is not consistent",
comparisonResult::getErrorMessage);
continue;
}
SplitPredicate compensatePredicates = predicatesCompensate(queryStructInfo, viewStructInfo,
viewToQuerySlotMapping, comparisonResult, cascadesContext);
// Can not compensate, bail out
if (compensatePredicates.isInvalid()) {
materializationContext.recordFailReason(queryStructInfo,
"Predicate compensate fail",
() -> String.format("query predicates = %s,\n query equivalenceClass = %s, \n"
+ "view predicates = %s,\n query equivalenceClass = %s\n"
+ "comparisonResult = %s ", queryStructInfo.getPredicates(),
queryStructInfo.getEquivalenceClass(), viewStructInfo.getPredicates(),
viewStructInfo.getEquivalenceClass(), comparisonResult));
continue;
}
Plan rewrittenPlan;
Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext);
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
} else {
// Try to rewrite compensate predicates by using mv scan
List<Expression> rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(),
queryPlan, materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping, queryStructInfo.getTableBitSet(),
compensatePredicates.getRangePredicateMap(),
cascadesContext);
if (rewriteCompensatePredicates.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
"Rewrite compensate predicate by view fail",
() -> String.format("compensatePredicates = %s,\n mvExprToMvScanExprMapping = %s,\n"
+ "viewToQuerySlotMapping = %s",
compensatePredicates, materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping));
continue;
}
rewrittenPlan = new LogicalFilter<>(Sets.newLinkedHashSet(rewriteCompensatePredicates), mvScan);
}
boolean checkResult = rewriteQueryByViewPreCheck(matchMode, queryStructInfo,
viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext);
if (!checkResult) {
continue;
}
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
return childContext.getRewritePlan();
}, rewrittenPlan, queryPlan);
if (rewrittenPlan == null) {
continue;
}
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> invalidPartitions;
if (PartitionCompensator.needUnionRewrite(materializationContext)
&& sessionVariable.isEnableMaterializedViewUnionRewrite()) {
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
Map<List<String>, Set<String>> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions(
cascadesContext.getConnectContext().getStatementContext());
Set<String> relateTableUsedPartitions = queryUsedPartitions.get(relatedTableInfo.toList());
if (relateTableUsedPartitions == null) {
materializationContext.recordFailReason(queryStructInfo,
String.format("queryUsedPartition is null but needUnionRewrite, table is %s, queryId is %s",
relatedTableInfo.toList(),
cascadesContext.getConnectContext().getQueryIdentifier()),
() -> String.format(
"queryUsedPartition is null but needUnionRewrite, table is %s, queryId is %s",
relatedTableInfo.toList(),
cascadesContext.getConnectContext().getQueryIdentifier()));
LOG.warn(String.format(
"queryUsedPartition is null but needUnionRewrite, table is %s, queryId is %s",
relatedTableInfo.toList(), cascadesContext.getConnectContext().getQueryIdentifier()));
return rewriteResults;
}
if (relateTableUsedPartitions.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
String.format("queryUsedPartition is empty, table is %s, queryId is %s",
relatedTableInfo.toList(),
cascadesContext.getConnectContext().getQueryIdentifier()),
() -> String.format("queryUsedPartition is empty, table is %s, queryId is %s",
relatedTableInfo.toList(),
cascadesContext.getConnectContext().getQueryIdentifier()));
LOG.debug(String.format("queryUsedPartition is empty, table is %s, queryId is %s",
relatedTableInfo.toList(), cascadesContext.getConnectContext().getQueryIdentifier()));
// no need to rewrite by current mv, becaus
return rewriteResults;
}
try {
invalidPartitions = calcInvalidPartitions(relateTableUsedPartitions, rewrittenPlan,
cascadesContext, (AsyncMaterializationContext) materializationContext);
} catch (AnalysisException e) {
materializationContext.recordFailReason(queryStructInfo,
"Calc invalid partitions fail",
() -> String.format("Calc invalid partitions fail, mv partition names are %s",
mtmv.getPartitions()));
LOG.warn("Calc invalid partitions fail", e);
continue;
}
if (invalidPartitions == null) {
materializationContext.recordFailReason(queryStructInfo,
"mv can not offer any partition for query",
() -> String.format("mv partition info %s", mtmv.getMvPartitionInfo()));
// if mv can not offer any partition for query, query rewrite bail out to avoid cycle run
return rewriteResults;
}
boolean partitionNeedUnion = PartitionCompensator.needUnionRewrite(invalidPartitions, cascadesContext);
boolean canUnionRewrite = canUnionRewrite(queryPlan, mtmv, cascadesContext);
if (partitionNeedUnion && !canUnionRewrite) {
materializationContext.recordFailReason(queryStructInfo,
"need compensate union all, but can not, because the query structInfo",
() -> String.format("mv partition info is %s, and the query plan is %s",
mtmv.getMvPartitionInfo(), queryPlan.treeString()));
return rewriteResults;
}
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion) {
Pair<Plan, Boolean> planAndNeedAddFilterPair =
StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
mtmv.getMvPartitionInfo().getRelatedCol(), cascadesContext);
if (planAndNeedAddFilterPair == null) {
materializationContext.recordFailReason(queryStructInfo,
"Add filter to base table fail when union rewrite",
() -> String.format("invalidPartitions are %s, queryPlan is %s, partition column is %s",
invalidPartitions, queryPlan.treeString(),
mtmv.getMvPartitionInfo().getPartitionCol()));
continue;
}
if (finalInvalidPartitions.value().isEmpty() || !planAndNeedAddFilterPair.value()) {
// if invalid base table filter is empty or doesn't need to add filter on base table,
// only need remove mv invalid partition
rewrittenPlan = rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key());
} else {
// For rewrittenPlan which contains materialized view should remove invalid partition ids
List<Plan> children = Lists.newArrayList(
rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()),
planAndNeedAddFilterPair.key());
// Union query materialized view and source table
rewrittenPlan = new LogicalUnion(Qualifier.ALL,
queryPlan.getOutput().stream().map(NamedExpression.class::cast)
.collect(Collectors.toList()),
children.stream()
.map(plan -> plan.getOutput().stream()
.map(slot -> (SlotReference) slot.toSlot())
.collect(Collectors.toList()))
.collect(Collectors.toList()),
ImmutableList.of(),
false,
children);
}
}
}
List<Slot> rewrittenPlanOutput = rewrittenPlan.getOutput();
rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan);
if (rewrittenPlan == null) {
// maybe virtual slot reference added automatically
materializationContext.recordFailReason(queryStructInfo,
"RewrittenPlan output logical properties is different with target group",
() -> String.format("materialized view rule normalizeExpressions, output size between "
+ "origin and rewritten plan is different, rewritten output is %s, "
+ "origin output is %s",
rewrittenPlanOutput, queryPlan.getOutput()));
continue;
}
// Merge project
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.bottomUp(new MergeProjects()))).execute();
return childContext.getRewritePlan();
}, rewrittenPlan, queryPlan);
if (!isOutputValid(queryPlan, rewrittenPlan)) {
LogicalProperties logicalProperties = rewrittenPlan.getLogicalProperties();
materializationContext.recordFailReason(queryStructInfo,
"RewrittenPlan output logical properties is different with target group",
() -> String.format("planOutput logical"
+ " properties = %s,\n groupOutput logical properties = %s",
logicalProperties, queryPlan.getLogicalProperties()));
continue;
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
materializationContext.clearScanPlan(cascadesContext);
}
return rewriteResults;
}
// Set materialization context statistics to statementContext for cost estimate later
private static void trySetStatistics(MaterializationContext context, CascadesContext cascadesContext) {
Optional<Pair<Id, Statistics>> materializationPlanStatistics = context.getPlanStatistics(cascadesContext);
if (materializationPlanStatistics.isPresent() && materializationPlanStatistics.get().key() != null) {
cascadesContext.getStatementContext().addStatistics(materializationPlanStatistics.get().key(),
materializationPlanStatistics.get().value());
}
}
/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
return true;
}
// Normalize expression such as nullable property and output slot id
protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
if (rewrittenPlan.getOutput().size() != originPlan.getOutput().size()) {
return null;
}
// normalize nullable
List<NamedExpression> normalizeProjects = new ArrayList<>();
for (int i = 0; i < originPlan.getOutput().size(); i++) {
normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), rewrittenPlan.getOutput().get(i)));
}
return new LogicalProject<>(normalizeProjects, rewrittenPlan);
}
/**
* Check the logical properties of rewritten plan by mv is the same with source plan
* if same return true, if different return false
*/
protected boolean isOutputValid(Plan sourcePlan, Plan rewrittenPlan) {
if (sourcePlan.getGroupExpression().isPresent() && !rewrittenPlan.getLogicalProperties()
.equals(sourcePlan.getGroupExpression().get().getOwnerGroup().getLogicalProperties())) {
return false;
}
return sourcePlan.getLogicalProperties().equals(rewrittenPlan.getLogicalProperties());
}
/**
* Partition will be pruned in query then add the pruned partitions to select partitions field of
* catalog relation.
* Maybe only some partitions is invalid in materialized view, or base table maybe add new partition
* So we should calc the invalid partition used in query
* @return the key in pair is mvNeedRemovePartitionNameSet, the value in pair is baseTableNeedUnionPartitionNameSet
*/
protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> calcInvalidPartitions(
Set<String> queryUsedPartition,
Plan rewrittenPlan,
CascadesContext cascadesContext,
AsyncMaterializationContext materializationContext)
throws AnalysisException {
return PartitionCompensator.calcInvalidPartitions(queryUsedPartition, rewrittenPlan,
materializationContext, cascadesContext);
}
/**
* Query rewrite result may output origin plan , this will cause loop.
* if return origin plan, need add check hear.
*/
protected boolean rewriteQueryByViewPreCheck(MatchMode matchMode, StructInfo queryStructInfo,
StructInfo viewStructInfo, SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan,
MaterializationContext materializationContext) {
if (materializationContext instanceof SyncMaterializationContext
&& queryStructInfo.getBottomPlan() instanceof LogicalOlapScan) {
LogicalOlapScan olapScan = (LogicalOlapScan) queryStructInfo.getBottomPlan();
if (olapScan.getSelectedIndexId() != olapScan.getTable().getBaseIndexId()) {
return false;
}
}
return true;
}
/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInfo, StructInfo viewStructInfo,
SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan, MaterializationContext materializationContext,
CascadesContext cascadesContext) {
return tempRewritedPlan;
}
/**
* Use target expression to represent the source expression. Visit the source expression,
* try to replace the source expression with target expression in targetExpressionMapping, if found then
* replace the source expression by target expression mapping value.
*
* @param sourceExpressionsToWrite the source expression to write by target expression
* @param sourcePlan the source plan witch the source expression belong to
* @param targetExpressionMapping target expression mapping, if finding the expression in key set of the mapping
* then use the corresponding value of mapping to replace it
*/
protected List<Expression> rewriteExpression(List<? extends Expression> sourceExpressionsToWrite, Plan sourcePlan,
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping, BitSet sourcePlanBitSet,
Map<Expression, ExpressionInfo> queryExprToInfoMap, CascadesContext cascadesContext) {
// Firstly, rewrite the target expression using source with inverse mapping
// then try to use the target expression to represent the query. if any of source expressions
// can not be represented by target expressions, return null.
// generate target to target replacement expression mapping, and change target expression to source based
List<? extends Expression> sourceShuttledExpressions = ExpressionUtils.shuttleExpressionWithLineage(
sourceExpressionsToWrite, sourcePlan, sourcePlanBitSet);
ExpressionMapping expressionMappingKeySourceBased = targetExpressionMapping.keyPermute(targetToSourceMapping);
// target to target replacement expression mapping, because mv is 1:1 so get first element
List<Map<Expression, Expression>> flattenExpressionMap = expressionMappingKeySourceBased.flattenMap();
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased =
flattenExpressionMap.get(0);
// viewExprParamToDateTruncMap is {slot#0 : date_trunc(slot#0, 'day')}
Map<Expression, DateTrunc> viewExprParamToDateTruncMap = new HashMap<>();
targetToTargetReplacementMappingQueryBased.keySet().forEach(expr -> {
if (expr instanceof DateTrunc) {
viewExprParamToDateTruncMap.put(expr.child(0), (DateTrunc) expr);
}
});
List<Expression> rewrittenExpressions = new ArrayList<>();
for (int exprIndex = 0; exprIndex < sourceShuttledExpressions.size(); exprIndex++) {
Expression expressionShuttledToRewrite = sourceShuttledExpressions.get(exprIndex);
if (expressionShuttledToRewrite instanceof Literal) {
rewrittenExpressions.add(expressionShuttledToRewrite);
continue;
}
final Set<Expression> slotsToRewrite =
expressionShuttledToRewrite.collectToSet(expression -> expression instanceof Slot);
final Set<SlotReference> variants =
expressionShuttledToRewrite.collectToSet(expression -> expression instanceof SlotReference
&& ((SlotReference) expression).getDataType() instanceof VariantType);
extendMappingByVariant(variants, targetToTargetReplacementMappingQueryBased);
Expression replacedExpression = ExpressionUtils.replace(expressionShuttledToRewrite,
targetToTargetReplacementMappingQueryBased);
Set<Expression> replacedExpressionSlotQueryUsed = replacedExpression.collect(slotsToRewrite::contains);
if (!replacedExpressionSlotQueryUsed.isEmpty()) {
// if contains any slot to rewrite, which means can not be rewritten by target,
// expressionShuttledToRewrite is slot#0 > '2024-01-01' but mv plan output is date_trunc(slot#0, 'day')
// which would try to rewrite
if (viewExprParamToDateTruncMap.isEmpty()
|| expressionShuttledToRewrite.children().isEmpty()
|| !(expressionShuttledToRewrite instanceof ComparisonPredicate)) {
// view doesn't have date_trunc, or
// expressionShuttledToRewrite is not ComparisonPredicate, bail out
return ImmutableList.of();
}
Expression queryShuttledExprParam = expressionShuttledToRewrite.child(0);
Expression queryOriginalExpr = sourceExpressionsToWrite.get(exprIndex);
if (!queryExprToInfoMap.containsKey(queryOriginalExpr)
|| !viewExprParamToDateTruncMap.containsKey(queryShuttledExprParam)) {
// query expr contains expression info or mv out contains date_trunc expression,
// if not, can not try to rewritten by view date_trunc, bail out
return ImmutableList.of();
}
Map<Expression, Expression> datetruncMap = new HashMap<>();
Literal queryUsedLiteral = queryExprToInfoMap.get(queryOriginalExpr).literal;
if (!(queryUsedLiteral instanceof DateLiteral)) {
return ImmutableList.of();
}
datetruncMap.put(queryShuttledExprParam, queryUsedLiteral);
Expression dateTruncWithLiteral = ExpressionUtils.replace(
viewExprParamToDateTruncMap.get(queryShuttledExprParam), datetruncMap);
Expression foldedExpressionWithLiteral = FoldConstantRuleOnFE.evaluate(dateTruncWithLiteral,
new ExpressionRewriteContext(cascadesContext));
if (!(foldedExpressionWithLiteral instanceof DateLiteral)) {
return ImmutableList.of();
}
if (((DateLiteral) foldedExpressionWithLiteral).getDouble() == queryUsedLiteral.getDouble()) {
// after date_trunc simplify if equals to original expression, expr could be rewritten by mv
replacedExpression = ExpressionUtils.replace(expressionShuttledToRewrite,
targetToTargetReplacementMappingQueryBased,
viewExprParamToDateTruncMap);
}
if (replacedExpression.anyMatch(slotsToRewrite::contains)) {
// has expression not rewritten successfully, bail out
return ImmutableList.of();
}
}
rewrittenExpressions.add(replacedExpression);
}
return rewrittenExpressions;
}
/**
* if query contains variant slot reference, extend the expression mapping for rewrte
* such as targetToTargetReplacementMappingQueryBased is
* id#0 -> id#8
* type#1 -> type#9
* payload#4 -> payload#10
* query variants is payload['issue']['number']#20
* then we can add payload['issue']['number']#20 -> element_at(element_at(payload#10, 'issue'), 'number')
* to targetToTargetReplacementMappingQueryBased
* */
private void extendMappingByVariant(Set<SlotReference> queryVariants,
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased) {
if (queryVariants.isEmpty()) {
return;
}
Map<List<String>, Expression> viewNameToExprMap = new HashMap<>();
for (Map.Entry<Expression, Expression> targetExpressionEntry :
targetToTargetReplacementMappingQueryBased.entrySet()) {
if (targetExpressionEntry.getKey() instanceof SlotReference
&& ((SlotReference) targetExpressionEntry.getKey()).getDataType() instanceof VariantType) {
SlotReference targetSlotReference = (SlotReference) targetExpressionEntry.getKey();
List<String> nameIdentifier = new ArrayList<>(targetSlotReference.getQualifier());
nameIdentifier.add(targetSlotReference.getName());
nameIdentifier.addAll(targetSlotReference.getSubPath());
viewNameToExprMap.put(nameIdentifier, targetExpressionEntry.getValue());
}
}
if (viewNameToExprMap.isEmpty()) {
return;
}
Map<List<String>, SlotReference> queryNameAndExpressionMap = new HashMap<>();
for (SlotReference slotReference : queryVariants) {
List<String> nameIdentifier = new ArrayList<>(slotReference.getQualifier());
nameIdentifier.add(slotReference.getName());
nameIdentifier.addAll(slotReference.getSubPath());
queryNameAndExpressionMap.put(nameIdentifier, slotReference);
}
for (Map.Entry<List<String>, ? extends Expression> queryNameEntry : queryNameAndExpressionMap.entrySet()) {
Expression minExpr = null;
List<String> minCompensateName = null;
for (Map.Entry<List<String>, Expression> entry : viewNameToExprMap.entrySet()) {
if (!containsAllWithOrder(queryNameEntry.getKey(), entry.getKey())) {
continue;
}
List<String> removedQueryName = new ArrayList<>(queryNameEntry.getKey());
removedQueryName.removeAll(entry.getKey());
if (minCompensateName == null) {
minCompensateName = removedQueryName;
minExpr = entry.getValue();
}
if (removedQueryName.size() < minCompensateName.size()) {
minCompensateName = removedQueryName;
minExpr = entry.getValue();
}
}
if (minExpr != null) {
targetToTargetReplacementMappingQueryBased.put(queryNameEntry.getValue(),
constructElementAt(minExpr, minCompensateName));
}
}
}
private static Expression constructElementAt(Expression target, List<String> atList) {
Expression elementAt = target;
for (String at : atList) {
elementAt = new ElementAt(elementAt, new VarcharLiteral(at));
}
return elementAt;
}
// source names is contain all target with order or not
private static boolean containsAllWithOrder(List<String> sourceNames, List<String> targetNames) {
if (sourceNames.size() < targetNames.size()) {
return false;
}
for (int index = 0; index < targetNames.size(); index++) {
String sourceName = sourceNames.get(index);
String targetName = targetNames.get(index);
if (sourceName == null || targetName == null) {
return false;
}
if (!sourceName.equals(targetName)) {
return false;
}
}
return true;
}
/**
* Normalize expression with query, keep the consistency of exprId and nullable props with
* query
* Keep the replacedExpression slot property is the same as the sourceExpression
*/
public static NamedExpression normalizeExpression(
NamedExpression sourceExpression, NamedExpression replacedExpression) {
Expression innerExpression = replacedExpression;
if (replacedExpression.nullable() != sourceExpression.nullable()) {
// if enable join eliminate, query maybe inner join and mv maybe outer join.
// If the slot is at null generate side, the nullable maybe different between query and view
// So need to force to consistent.
innerExpression = sourceExpression.nullable()
? new Nullable(replacedExpression) : new NonNullable(replacedExpression);
}
return new Alias(sourceExpression.getExprId(), innerExpression, sourceExpression.getName());
}
/**
* Compensate mv predicates by query predicates, compensate predicate result is query based.
* Such as a > 5 in mv, and a > 10 in query, the compensatory predicate is a > 10.
* For another example as following:
* predicate a = b in mv, and a = b and c = d in query, the compensatory predicate is c = d
*/
protected SplitPredicate predicatesCompensate(
StructInfo queryStructInfo,
StructInfo viewStructInfo,
SlotMapping viewToQuerySlotMapping,
ComparisonResult comparisonResult,
CascadesContext cascadesContext
) {
// TODO: Use set of list? And consider view expr
List<Expression> queryPulledUpExpressions = ImmutableList.copyOf(comparisonResult.getQueryExpressions());
// set pulled up expression to queryStructInfo predicates and update related predicates
if (!queryPulledUpExpressions.isEmpty()) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(queryPulledUpExpressions));
}
List<Expression> viewPulledUpExpressions = ImmutableList.copyOf(comparisonResult.getViewExpressions());
// set pulled up expression to viewStructInfo predicates and update related predicates
if (!viewPulledUpExpressions.isEmpty()) {
viewStructInfo = viewStructInfo.withPredicates(
viewStructInfo.getPredicates().merge(viewPulledUpExpressions));
}
// if the join type in query and mv plan is different, we should check query is have the
// filters which rejects null
Set<Set<Slot>> requireNoNullableViewSlot = comparisonResult.getViewNoNullableSlot();
// check query is use the null reject slot which view comparison need
if (!requireNoNullableViewSlot.isEmpty()) {
SlotMapping queryToViewMapping = viewToQuerySlotMapping.inverse();
// try to use
boolean valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, queryStructInfo,
viewStructInfo, cascadesContext);
if (!valid) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(comparisonResult.getQueryAllPulledUpExpressions()));
valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping,
queryStructInfo, viewStructInfo, cascadesContext);
}
if (!valid) {
return SplitPredicate.INVALID_INSTANCE;
}
}
// viewEquivalenceClass to query based
// equal predicate compensate
final Map<Expression, ExpressionInfo> equalCompensateConjunctions = Predicates.compensateEquivalence(
queryStructInfo, viewStructInfo, viewToQuerySlotMapping, comparisonResult);
// range compensate
final Map<Expression, ExpressionInfo> rangeCompensatePredicates =
Predicates.compensateRangePredicate(queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
comparisonResult, cascadesContext);
// residual compensate
final Map<Expression, ExpressionInfo> residualCompensatePredicates = Predicates.compensateResidualPredicate(
queryStructInfo, viewStructInfo, viewToQuerySlotMapping, comparisonResult);
if (equalCompensateConjunctions == null || rangeCompensatePredicates == null
|| residualCompensatePredicates == null) {
return SplitPredicate.INVALID_INSTANCE;
}
return SplitPredicate.of(equalCompensateConjunctions, rangeCompensatePredicates, residualCompensatePredicates);
}
/**
* Check the queryPredicates contains the required nullable slot
*/
private boolean containsNullRejectSlot(Set<Set<Slot>> requireNoNullableViewSlot,
Set<Expression> queryPredicates,
SlotMapping queryToViewMapping,
StructInfo queryStructInfo,
StructInfo viewStructInfo,
CascadesContext cascadesContext) {
Set<Expression> queryPulledUpPredicates = queryPredicates.stream()
.flatMap(expr -> ExpressionUtils.extractConjunction(expr).stream())
.map(expr -> {
// NOTICE inferNotNull generate Not with isGeneratedIsNotNull = false,
// so, we need set this flag to false before comparison.
if (expr instanceof Not) {
return ((Not) expr).withGeneratedIsNotNull(false);
}
return expr;
})
.collect(Collectors.toSet());
Set<Expression> queryNullRejectPredicates =
ExpressionUtils.inferNotNull(queryPulledUpPredicates, cascadesContext);
if (queryPulledUpPredicates.containsAll(queryNullRejectPredicates)) {
// Query has no null reject predicates, return
return false;
}
// Get query null reject predicate slots
Set<Expression> queryNullRejectSlotSet = new HashSet<>();
for (Expression queryNullRejectPredicate : queryNullRejectPredicates) {
Optional<Slot> notNullSlot = TypeUtils.isNotNull(queryNullRejectPredicate);
if (!notNullSlot.isPresent()) {
continue;
}
queryNullRejectSlotSet.add(notNullSlot.get());
}
// query slot need shuttle to use table slot, avoid alias influence
Set<Expression> queryUsedNeedRejectNullSlotsViewBased = ExpressionUtils.shuttleExpressionWithLineage(
new ArrayList<>(queryNullRejectSlotSet), queryStructInfo.getTopPlan(), new BitSet()).stream()
.map(expr -> ExpressionUtils.replace(expr, queryToViewMapping.toSlotReferenceMap()))
.collect(Collectors.toSet());
// view slot need shuttle to use table slot, avoid alias influence
Set<Set<Slot>> shuttledRequireNoNullableViewSlot = new HashSet<>();
for (Set<Slot> requireNullableSlots : requireNoNullableViewSlot) {
shuttledRequireNoNullableViewSlot.add(
ExpressionUtils.shuttleExpressionWithLineage(new ArrayList<>(requireNullableSlots),
viewStructInfo.getTopPlan(), new BitSet()).stream().map(Slot.class::cast)
.collect(Collectors.toSet()));
}
// query pulledUp predicates should have null reject predicates and contains any require noNullable slot
return shuttledRequireNoNullableViewSlot.stream().noneMatch(viewRequiredNullSlotSet ->
Sets.intersection(viewRequiredNullSlotSet, queryUsedNeedRejectNullSlotsViewBased).isEmpty());
}
/**
* Decide the match mode
*
* @see MatchMode
*/
private MatchMode decideMatchMode(List<CatalogRelation> queryRelations, List<CatalogRelation> viewRelations) {
Set<TableIdentifier> queryTables = new HashSet<>();
for (CatalogRelation catalogRelation : queryRelations) {
queryTables.add(new TableIdentifier(catalogRelation.getTable()));
}
Set<TableIdentifier> viewTables = new HashSet<>();
for (CatalogRelation catalogRelation : viewRelations) {
viewTables.add(new TableIdentifier(catalogRelation.getTable()));
}
if (queryTables.equals(viewTables)) {
return MatchMode.COMPLETE;
}
if (queryTables.containsAll(viewTables)) {
return MatchMode.VIEW_PARTIAL;
}
if (viewTables.containsAll(queryTables)) {
return MatchMode.QUERY_PARTIAL;
}
return MatchMode.NOT_MATCH;
}
/**
* Check the pattern of query or materializedView is supported or not.
*/
protected boolean checkQueryPattern(StructInfo structInfo, CascadesContext cascadesContext) {
if (structInfo.getRelations().isEmpty()) {
return false;
}
return true;
}
protected boolean checkMaterializationPattern(StructInfo structInfo, CascadesContext cascadesContext) {
return checkQueryPattern(structInfo, cascadesContext);
}
protected void recordIfRewritten(Plan plan, MaterializationContext context, CascadesContext cascadesContext) {
context.setSuccess(true);
cascadesContext.addMaterializationRewrittenSuccess(context.generateMaterializationIdentifier());
if (plan.getGroupExpression().isPresent()) {
context.addMatchedGroup(plan.getGroupExpression().get().getOwnerGroup().getGroupId(), true);
}
}
protected boolean checkIfRewritten(Plan plan, MaterializationContext context) {
return plan.getGroupExpression().isPresent()
&& context.alreadyRewrite(plan.getGroupExpression().get().getOwnerGroup().getGroupId());
}
// check mv plan is valid or not, this can use cache for performance
private boolean isMaterializationValid(Plan queryPlan, CascadesContext cascadesContext,
MaterializationContext context) {
if (!context.getStructInfo().isValid()) {
context.recordFailReason(context.getStructInfo(),
"View original struct info is invalid", () -> String.format("view plan is %s",
context.getStructInfo().getOriginalPlan().treeString()));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
+ "view plan is %s",
context.generateMaterializationIdentifier(), queryPlan.treeString(),
context.getStructInfo().getTopPlan().treeString()));
}
return false;
}
long materializationId = context.generateMaterializationIdentifier().hashCode();
Boolean cachedCheckResult = cascadesContext.getMemo().materializationHasChecked(this.getClass(),
materializationId);
if (cachedCheckResult == null) {
// need check in real time
boolean checkResult = checkMaterializationPattern(context.getStructInfo(), cascadesContext);
if (!checkResult) {
context.recordFailReason(context.getStructInfo(),
"View struct info is invalid", () -> String.format("view plan is %s",
context.getStructInfo().getOriginalPlan().treeString()));
// tmp to location question
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
+ "view plan is %s",
context.generateMaterializationIdentifier(), queryPlan.treeString(),
context.getStructInfo().getTopPlan().treeString()));
}
cascadesContext.getMemo().recordMaterializationCheckResult(this.getClass(), materializationId,
false);
return false;
} else {
cascadesContext.getMemo().recordMaterializationCheckResult(this.getClass(),
materializationId, true);
}
} else if (!cachedCheckResult) {
context.recordFailReason(context.getStructInfo(),
"View struct info is invalid", () -> String.format("view plan is %s",
context.getStructInfo().getOriginalPlan().treeString()));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
+ "view plan is %s",
context.generateMaterializationIdentifier(), queryPlan.treeString(),
context.getStructInfo().getTopPlan().treeString()));
}
return false;
}
return true;
}
/**
* Query and mv match node
*/
protected enum MatchMode {
/**
* The tables in query are same to the tables in view
*/
COMPLETE,
/**
* The tables in query contains all the tables in view
*/
VIEW_PARTIAL,
/**
* The tables in view contains all the tables in query
*/
QUERY_PARTIAL,
/**
* Except for COMPLETE and VIEW_PARTIAL and QUERY_PARTIAL
*/
NOT_MATCH
}
}