MTMVPlanUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.types.AggStateType;
import org.apache.doris.nereids.types.CharType;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.DecimalV2Type;
import org.apache.doris.nereids.types.NullType;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.TinyIntType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
public class MTMVPlanUtil {
public static ConnectContext createMTMVContext(MTMV mtmv) {
ConnectContext ctx = createBasicMvContext(null);
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
}
// Set db&catalog to be used when creating materialized views to avoid SQL statements not writing the full path
// After https://github.com/apache/doris/pull/36543,
// After 1, this logic is no longer needed. This is to be compatible with older versions
setCatalogAndDb(ctx, mtmv);
return ctx;
}
public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
ctx.getState().reset();
ctx.setThreadLocalInfo();
// Debug session variable should be disabled when refreshed
ctx.getSessionVariable().skipDeletePredicate = false;
ctx.getSessionVariable().skipDeleteBitmap = false;
ctx.getSessionVariable().skipDeleteSign = false;
ctx.getSessionVariable().skipStorageEngineMerge = false;
ctx.getSessionVariable().showHiddenColumns = false;
ctx.getSessionVariable().allowModifyMaterializedViewData = true;
// Disable add default limit rule to avoid refresh data wrong
ctx.getSessionVariable().setDisableNereidsRules(
String.join(",", ImmutableSet.of(
"COMPRESSED_MATERIALIZE_AGG", "COMPRESSED_MATERIALIZE_SORT",
"ELIMINATE_CONST_JOIN_CONDITION",
RuleType.ADD_DEFAULT_LIMIT.name())));
ctx.setStartTime();
if (parentContext != null) {
ctx.changeDefaultCatalog(parentContext.getDefaultCatalog());
ctx.setDatabase(parentContext.getDatabase());
}
return ctx;
}
private static void setCatalogAndDb(ConnectContext ctx, MTMV mtmv) {
EnvInfo envInfo = mtmv.getEnvInfo();
if (envInfo == null) {
return;
}
// switch catalog;
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(envInfo.getCtlId());
// if catalog not exist, it may not have any impact, so there is no error and it will be returned directly
if (catalog == null) {
return;
}
ctx.changeDefaultCatalog(catalog.getName());
// use db
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = catalog.getDb(envInfo.getDbId());
// if db not exist, it may not have any impact, so there is no error and it will be returned directly
if (!databaseIf.isPresent()) {
return;
}
ctx.setDatabase(databaseIf.get().getFullName());
}
public static MTMVRelation generateMTMVRelation(Set<TableIf> tablesInPlan, ConnectContext ctx) {
Set<BaseTableInfo> oneLevelTables = Sets.newHashSet();
Set<BaseTableInfo> allLevelTables = Sets.newHashSet();
Set<BaseTableInfo> oneLevelViews = Sets.newHashSet();
for (TableIf table : tablesInPlan) {
BaseTableInfo baseTableInfo = new BaseTableInfo(table);
if (table.getType() == TableType.VIEW) {
// TODO reopen it after we support mv on view
// oneLevelViews.add(baseTableInfo);
} else {
oneLevelTables.add(baseTableInfo);
allLevelTables.add(baseTableInfo);
if (table instanceof MTMV) {
allLevelTables.addAll(((MTMV) table).getRelation().getBaseTables());
}
}
}
return new MTMVRelation(allLevelTables, oneLevelTables, oneLevelViews);
}
public static Set<TableIf> getBaseTableFromQuery(String querySql, ConnectContext ctx) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
} catch (Exception e) {
throw new ParseException("Nereids parse failed. " + e.getMessage());
}
StatementBase parsedStmt = statements.get(0);
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
StatementContext original = ctx.getStatementContext();
try (StatementContext tempCtx = new StatementContext()) {
ctx.setStatementContext(tempCtx);
try {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
return Sets.newHashSet(ctx.getStatementContext().getTables().values());
} finally {
ctx.setStatementContext(original);
}
}
}
/**
* Derive the MTMV columns based on the query statement.
*
* @param querySql
* @param ctx
* @param partitionCol partition column name of MTMV
* @param distributionColumnNames distribution column names of MTMV
* @param simpleColumnDefinitions Use custom column names if provided (non-empty);
* otherwise, auto-generate the column names.
* @param properties properties of MTMV, it determines whether row storage needs to be generated based on this.
* @return ColumnDefinitions of MTMV
*/
public static List<ColumnDefinition> generateColumnsBySql(String querySql, ConnectContext ctx, String partitionCol,
Set<String> distributionColumnNames, List<SimpleColumnDefinition> simpleColumnDefinitions,
Map<String, String> properties) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
} catch (Exception e) {
throw new ParseException("Nereids parse failed. " + e.getMessage());
}
StatementBase parsedStmt = statements.get(0);
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
StatementContext original = ctx.getStatementContext();
try (StatementContext tempCtx = new StatementContext()) {
ctx.setStatementContext(tempCtx);
try {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
Plan plan = planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
return generateColumns(plan, ctx, partitionCol, distributionColumnNames, simpleColumnDefinitions,
properties);
} finally {
ctx.setStatementContext(original);
}
}
}
/**
* Derive the MTMV columns based on the analyzed plan.
*
* @param plan should be analyzed plan
* @param ctx
* @param partitionCol partition column name of MTMV
* @param distributionColumnNames distribution column names of MTMV
* @param simpleColumnDefinitions Use custom column names if provided (non-empty);
* otherwise, auto-generate the column names.
* @param properties properties of MTMV, it determines whether row storage needs to be generated based on this.
* @return ColumnDefinitions of MTMV
*/
public static List<ColumnDefinition> generateColumns(Plan plan, ConnectContext ctx, String partitionCol,
Set<String> distributionColumnNames, List<SimpleColumnDefinition> simpleColumnDefinitions,
Map<String, String> properties) {
List<ColumnDefinition> columns = Lists.newArrayList();
List<Slot> slots = plan.getOutput();
if (slots.isEmpty()) {
throw new org.apache.doris.nereids.exceptions.AnalysisException("table should contain at least one column");
}
if (!CollectionUtils.isEmpty(simpleColumnDefinitions) && simpleColumnDefinitions.size() != slots.size()) {
throw new org.apache.doris.nereids.exceptions.AnalysisException(
"simpleColumnDefinitions size is not equal to the query's");
}
Set<String> colNames = Sets.newHashSet();
for (int i = 0; i < slots.size(); i++) {
String colName = CollectionUtils.isEmpty(simpleColumnDefinitions) ? slots.get(i).getName()
: simpleColumnDefinitions.get(i).getName();
try {
FeNameFormat.checkColumnName(colName);
} catch (org.apache.doris.common.AnalysisException e) {
throw new org.apache.doris.nereids.exceptions.AnalysisException(e.getMessage(), e);
}
if (colNames.contains(colName)) {
throw new org.apache.doris.nereids.exceptions.AnalysisException("repeat cols:" + colName);
} else {
colNames.add(colName);
}
DataType dataType = getDataType(slots.get(i), i, ctx, partitionCol, distributionColumnNames);
// If datatype is AggStateType, AggregateType should be generic, or column definition check will fail
columns.add(new ColumnDefinition(
colName,
dataType,
false,
slots.get(i).getDataType() instanceof AggStateType ? AggregateType.GENERIC : null,
slots.get(i).nullable(),
Optional.empty(),
CollectionUtils.isEmpty(simpleColumnDefinitions) ? null
: simpleColumnDefinitions.get(i).getComment()));
}
// add a hidden column as row store
if (properties != null) {
try {
boolean storeRowColumn =
PropertyAnalyzer.analyzeStoreRowColumn(Maps.newHashMap(properties));
if (storeRowColumn) {
columns.add(ColumnDefinition.newRowStoreColumnDefinition(null));
}
} catch (Exception e) {
throw new org.apache.doris.nereids.exceptions.AnalysisException(e.getMessage(), e.getCause());
}
}
return columns;
}
/**
* generate DataType by Slot
* @param s
* @param i
* @param ctx
* @param partitionCol
* @param distributionColumnNames
* @return
*/
public static DataType getDataType(Slot s, int i, ConnectContext ctx, String partitionCol,
Set<String> distributionColumnNames) {
DataType dataType = s.getDataType().conversion();
// first column can not be TEXT, should transfer to varchar
if (i == 0 && dataType.isStringType()) {
dataType = VarcharType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH);
} else {
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
NullType.class, TinyIntType.INSTANCE);
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
DecimalV2Type.class, DecimalV2Type.SYSTEM_DEFAULT);
if (s.isColumnFromTable()) {
// check if external table
if ((!((SlotReference) s).getOriginalTable().isPresent()
|| !((SlotReference) s).getOriginalTable().get().isManagedTable())) {
if (s.getName().equals(partitionCol) || distributionColumnNames.contains(s.getName())) {
// String type can not be used in partition/distributed column
// so we replace it to varchar
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
CharacterType.class, VarcharType.MAX_VARCHAR_TYPE);
} else {
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
CharacterType.class, StringType.INSTANCE);
}
}
} else {
if (ctx.getSessionVariable().useMaxLengthOfVarcharInCtas) {
// The calculation of columns that are not from the original table will become VARCHAR(65533)
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
VarcharType.class, VarcharType.MAX_VARCHAR_TYPE);
dataType = TypeCoercionUtils.replaceSpecifiedType(dataType,
CharType.class, VarcharType.MAX_VARCHAR_TYPE);
}
}
}
return dataType;
}
}