CreateMTMVInfo.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.analysis.AllPartitionDesc;
import org.apache.doris.analysis.ListPartitionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mtmv.MTMVAnalyzeQueryInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVPropertyUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mtmv.ivm.IvmException;
import org.apache.doris.mtmv.ivm.IvmFailureReason;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo.AnalyzerForCreateView;
import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo.PlanSlotFinder;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* MTMV info in creating MTMV.
*/
public class CreateMTMVInfo extends CreateTableInfo {
public static final Logger LOG = LogManager.getLogger(CreateMTMVInfo.class);
public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE,PRUNE_EMPTY_PARTITION,"
+ "ELIMINATE_GROUP_BY_KEY_BY_UNIFORM";
private LogicalPlan logicalQuery;
private List<SimpleColumnDefinition> simpleColumnDefinitions;
private MTMVPartitionDefinition mvPartitionDefinition;
private String querySql;
private Map<String, String> mvProperties = Maps.newHashMap();
private final MTMVRefreshInfo refreshInfo;
private MTMVRelation relation;
private MTMVPartitionInfo mvPartitionInfo;
private final Map<String, String> sessionVariables;
private boolean enableIvm;
/**
* constructor for create MTMV
*/
public CreateMTMVInfo(
boolean ifNotExists,
TableNameInfo mvName,
List<String> keys,
String comment,
DistributionDescriptor distribution,
Map<String, String> properties,
LogicalPlan logicalQuery,
String querySql,
MTMVRefreshInfo refreshInfo,
List<SimpleColumnDefinition> simpleColumnDefinitions,
MTMVPartitionDefinition mvPartitionDefinition,
Map<String, String> sessionVariables) {
super(
ifNotExists,
mvName,
Utils.copyRequiredList(keys),
comment,
distribution,
properties);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "require logicalQuery object");
this.querySql = Objects.requireNonNull(querySql, "require querySql object");
this.refreshInfo = Objects.requireNonNull(refreshInfo, "require refreshInfo object");
this.simpleColumnDefinitions = Objects
.requireNonNull(simpleColumnDefinitions, "require simpleColumnDefinitions object");
this.mvPartitionDefinition = Objects
.requireNonNull(mvPartitionDefinition, "require mtmvPartitionInfo object");
this.sessionVariables = sessionVariables;
this.enableIvm = isExplicitIncremental();
}
/**
* analyze create table info
*/
public void analyze(ConnectContext ctx) throws Exception {
// analyze table name
tableNameInfo.analyze(ctx.getNameSpaceContext());
if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(tableNameInfo.getCtl())) {
throw new AnalysisException("Only support creating asynchronous materialized views in internal catalog");
}
if (ctx.getSessionVariable().isInDebugMode()) {
throw new AnalysisException("Create materialized view fail, because is in debug mode");
}
try {
FeNameFormat.checkTableName(tableNameInfo.getTbl());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, tableNameInfo.getCtl(), tableNameInfo.getDb(),
tableNameInfo.getTbl(), PrivPredicate.CREATE)) {
String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE",
ctx.getQualifiedUser(), ctx.getRemoteIP(),
tableNameInfo.getDb() + ": " + tableNameInfo.getTbl());
throw new AnalysisException(message);
}
analyzeProperties();
if (isAutoRefresh()) {
analyzeAutoRefreshQuery(ctx);
} else {
enableIvm = isExplicitIncremental();
analyzeQuery(ctx);
}
this.partitionDesc = generatePartitionDesc(ctx);
if (distribution == null) {
throw new AnalysisException("Create async materialized view should contain distribution desc");
}
if (properties == null) {
properties = Maps.newHashMap();
}
// IVM MVs are UNIQUE_KEYS (MOW) tables keyed on __DORIS_IVM_ROW_ID_COL__.
// MOW dedup only works within the same tablet, so the distribution MUST be
// HASH on the row-id column; RANDOM distribution would allow the same key
// to land in different tablets across successive INSERTs, breaking dedup.
if (isEnableIvm()) {
int bucketNum = distribution.translateToCatalogStyle().getBuckets();
distribution = new DistributionDescriptor(
true, distribution.isAutoBucket(), bucketNum,
Lists.newArrayList(Column.IVM_ROW_ID_COL));
}
CreateTableInfo.maybeRewriteByAutoBucket(distribution, properties);
// analyze distribute
Map<String, ColumnDefinition> columnMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
columns.forEach(c -> columnMap.put(c.getName(), c));
distribution.updateCols(columns.get(0).getName());
KeysType distributionKeysType = isEnableIvm() ? KeysType.UNIQUE_KEYS : KeysType.DUP_KEYS;
distribution.validate(columnMap, distributionKeysType);
refreshInfo.validate();
analyzeProperties();
rewriteQuerySql(ctx);
// set CreateTableInfo information
setTableInformation(ctx);
}
private void analyzeAutoRefreshQuery(ConnectContext ctx) throws UserException {
AnalyzeQueryState origin = AnalyzeQueryState.capture(this);
try {
enableIvm = true;
analyzeQuery(ctx);
} catch (AnalysisException e) {
// AUTO uses IVM analysis only as a capability probe. Most IVM unsupported cases are reported as
// IvmException, but planner/PCT/nullable analysis can also throw a plain AnalysisException after
// IVM rewrite changes the plan shape. Treat those as probe failures too: restore the original create
// state and retry regular MTMV analysis. If the regular path also fails, that error is still returned
// to the user. Explicit INCREMENTAL does not call this helper, so its strict failure semantics remain.
LOG.info("AUTO refresh materialized view {} fallback to non-IVM after IVM probe failed: {}",
tableNameInfo.getTbl(), e.getMessage());
origin.restore(this);
logicalQuery = parseFreshLogicalQuery();
resetStatementContext(ctx);
enableIvm = false;
analyzeQuery(ctx);
}
}
private LogicalPlan parseFreshLogicalQuery() {
return new NereidsParser().parseQuery(querySql);
}
private void resetStatementContext(ConnectContext ctx) {
StatementContext oldStatementContext = ctx.getStatementContext();
ctx.setStatementContext(new StatementContext(ctx,
oldStatementContext == null ? null : oldStatementContext.getOriginStatement()));
}
private void rewriteQuerySql(ConnectContext ctx) {
TreeMap<Pair<Integer, Integer>, String> rewriteMap = ctx.getStatementContext().getIndexInSqlToString();
TreeMap<Pair<Integer, Integer>, String> snapshot = new TreeMap<>(rewriteMap);
rewriteMap.clear();
try {
analyzeAndFillRewriteSqlMap(querySql, ctx);
querySql = BaseViewInfo.rewriteSql(rewriteMap, querySql);
if (isEnableIvm() && !simpleColumnDefinitions.isEmpty()) {
querySql = BaseViewInfo.rewriteProjectsToUserDefineAlias(querySql, simpleColumnDefinitions.stream()
.map(SimpleColumnDefinition::getName)
.collect(Collectors.toList()));
}
} finally {
rewriteMap.clear();
rewriteMap.putAll(snapshot);
}
}
private void analyzeAndFillRewriteSqlMap(String sql, ConnectContext ctx) {
StatementContext stmtCtx = ctx.getStatementContext();
LogicalPlan parsedViewPlan = new NereidsParser().parseForCreateView(sql);
if (parsedViewPlan instanceof UnboundResultSink) {
parsedViewPlan = (LogicalPlan) ((UnboundResultSink<?>) parsedViewPlan).child();
}
CascadesContext viewContextForStar = CascadesContext.initContext(
stmtCtx, parsedViewPlan, PhysicalProperties.ANY);
AnalyzerForCreateView analyzerForStar = new AnalyzerForCreateView(viewContextForStar);
analyzerForStar.analyze();
Plan analyzedPlan = viewContextForStar.getRewritePlan();
// Traverse all slots in the plan, and add the slot's location information
// and the fully qualified replacement string to the indexInSqlToString of the StatementContext.
analyzedPlan.accept(PlanSlotFinder.INSTANCE, ctx.getStatementContext());
}
private void analyzeProperties() {
properties = PropertyAnalyzer.getInstance().rewriteOlapProperties(
tableNameInfo.getCtl(),
tableNameInfo.getDb(),
properties);
if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
throw new AnalysisException("Not support dynamic partition properties on async materialized view");
}
for (String key : MTMVPropertyUtil.MV_PROPERTY_KEYS) {
if (properties.containsKey(key)) {
MTMVPropertyUtil.analyzeProperty(key, properties.get(key));
mvProperties.put(key, properties.get(key));
properties.remove(key);
}
}
}
/**
* analyzeQuery
*/
public void analyzeQuery(ConnectContext ctx) throws UserException {
checkUserSpecifiedKeysForIvm();
MTMVAnalyzeQueryInfo mtmvAnalyzeQueryInfo = MTMVPlanUtil.analyzeQuery(ctx, this.mvProperties,
this.mvPartitionDefinition, this.distribution, this.simpleColumnDefinitions, this.properties, this.keys,
this.logicalQuery, isEnableIvm());
this.mvPartitionInfo = mtmvAnalyzeQueryInfo.getMvPartitionInfo();
this.columns = mtmvAnalyzeQueryInfo.getColumnDefinitions();
this.relation = mtmvAnalyzeQueryInfo.getRelation();
this.properties = mtmvAnalyzeQueryInfo.getProperties();
}
private void checkUserSpecifiedKeysForIvm() {
if (isEnableIvm() && !keys.isEmpty()) {
throw new IvmException(IvmFailureReason.PLAN_PATTERN_UNSUPPORTED,
"Incremental materialized view does not allow specifying key columns. "
+ "The unique key is the hidden row-id column managed by IVM.");
}
}
private List<Column> getPartitionColumn(String partitionColumnName) {
for (ColumnDefinition columnDefinition : columns) {
if (columnDefinition.getName().equalsIgnoreCase(partitionColumnName)) {
// current only support one partition col
return Lists.newArrayList(columnDefinition.translateToCatalogStyle());
}
}
throw new AnalysisException("can not find partition column");
}
private PartitionDesc generatePartitionDesc(ConnectContext ctx) {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return null;
}
// all pct table partition type is same
MTMVRelatedTableIf relatedTable = MTMVUtil.getRelatedTable(mvPartitionInfo.getPctInfos().get(0).getTableInfo());
List<AllPartitionDesc> allPartitionDescs = null;
try {
allPartitionDescs = MTMVPartitionUtil
.getPartitionDescsByRelatedTable(properties, mvPartitionInfo, mvProperties,
getPartitionColumn(mvPartitionInfo.getPartitionCol()));
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (allPartitionDescs.size() > ctx.getSessionVariable().getCreateTablePartitionMaxNum()) {
throw new AnalysisException(String.format(
"The number of partitions to be created is [%s], exceeding the maximum value of [%s]. "
+ "Creating too many partitions can be time-consuming. If necessary, "
+ "You can set the session variable 'create_table_partition_max_num' to a larger value.",
allPartitionDescs.size(), ctx.getSessionVariable().getCreateTablePartitionMaxNum()));
}
try {
PartitionType type = relatedTable.getPartitionType(Optional.empty());
if (type == PartitionType.RANGE) {
return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
allPartitionDescs);
} else if (type == PartitionType.LIST) {
return new ListPartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
allPartitionDescs);
} else {
return null;
}
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
}
/**
* set CreateTableInfo Information
*/
private void setTableInformation(ConnectContext ctx) {
List<String> ctasColumns = simpleColumnDefinitions.stream()
.map(SimpleColumnDefinition::getName)
.collect(Collectors.toList());
this.setCatalog(tableNameInfo.getCtl());
this.setDbName(tableNameInfo.getDb());
this.setTableName(tableNameInfo.getTbl());
this.setCtasColumns(ctasColumns.isEmpty() ? null : ctasColumns);
this.setEngineName(CreateTableInfo.ENGINE_OLAP);
if (isEnableIvm()) {
this.setKeysType(KeysType.UNIQUE_KEYS);
if (properties == null) {
properties = Maps.newHashMap();
}
properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "true");
} else {
this.setKeysType(KeysType.DUP_KEYS);
}
this.setPartitionTableInfo(partitionDesc == null
? PartitionTableInfo.EMPTY : partitionDesc.convertToPartitionTableInfo());
this.setRollups(Lists.newArrayList());
this.setSortOrderFields(Lists.newArrayList());
this.setIndexes(Lists.newArrayList());
this.analyzeEngine();
validatePartitionInfo(ctx);
}
private void validatePartitionInfo(ConnectContext ctx) {
Map<String, ColumnDefinition> columnMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
columns.forEach(c -> {
if (columnMap.put(c.getName(), c) != null) {
try {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME,
c.getName());
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
});
getPartitionTableInfo().validatePartitionInfo(
getEngineName(),
columns,
columnMap,
properties,
ctx,
isEnableMergeOnWrite(),
isExternal());
}
public String getQuerySql() {
return querySql;
}
public Map<String, String> getMvProperties() {
return mvProperties;
}
public MTMVRefreshInfo getRefreshInfo() {
return refreshInfo;
}
public MTMVRelation getRelation() {
return relation;
}
public boolean isEnableIvm() {
return enableIvm;
}
private boolean isExplicitIncremental() {
return refreshInfo.getRefreshMethod() == RefreshMethod.INCREMENTAL;
}
private boolean isAutoRefresh() {
return refreshInfo.getRefreshMethod() == RefreshMethod.AUTO;
}
public MTMVPartitionInfo getMvPartitionInfo() {
return mvPartitionInfo;
}
public Map<String, String> getSessionVariables() {
return sessionVariables;
}
private static class AnalyzeQueryState {
private final Map<String, String> properties;
private final List<ColumnDefinition> columns;
private final DistributionDescriptor distribution;
private final PartitionDesc partitionDesc;
private final LogicalPlan logicalQuery;
private final MTMVRelation relation;
private final MTMVPartitionInfo mvPartitionInfo;
private final MTMVPartitionType mvPartitionType;
private final Expression mvPartitionExpression;
private final boolean enableIvm;
private AnalyzeQueryState(CreateMTMVInfo info) {
this.properties = info.properties == null ? null : Maps.newHashMap(info.properties);
this.columns = info.columns;
this.distribution = info.distribution;
this.partitionDesc = info.partitionDesc;
this.logicalQuery = info.logicalQuery;
this.relation = info.relation;
this.mvPartitionInfo = info.mvPartitionInfo;
this.mvPartitionType = info.mvPartitionDefinition.getPartitionType();
this.mvPartitionExpression = info.mvPartitionDefinition.getFunctionCallExpression();
this.enableIvm = info.enableIvm;
}
private static AnalyzeQueryState capture(CreateMTMVInfo info) {
return new AnalyzeQueryState(info);
}
private void restore(CreateMTMVInfo info) {
info.properties = properties == null ? null : Maps.newHashMap(properties);
info.columns = columns;
info.distribution = distribution;
info.partitionDesc = partitionDesc;
info.logicalQuery = logicalQuery;
info.relation = relation;
info.mvPartitionInfo = mvPartitionInfo;
info.mvPartitionDefinition.setPartitionType(mvPartitionType);
info.mvPartitionDefinition.setFunctionCallExpression(mvPartitionExpression);
info.enableIvm = enableIvm;
}
}
}