CreateTableInfo.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.IndexDef;
import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.AutoBucketUtils;
import org.apache.doris.common.util.GeneratedColumnUtil;
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.es.EsUtil;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
import org.apache.doris.nereids.trees.expressions.Variable;
import org.apache.doris.nereids.trees.expressions.functions.BoundFunction;
import org.apache.doris.nereids.trees.expressions.functions.Udf;
import org.apache.doris.nereids.trees.expressions.functions.scalar.GroupingScalarFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Lambda;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TInvertedIndexFileStorageFormat;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
/**
* table info in creating table.
*/
public class CreateTableInfo {
public static final String ENGINE_OLAP = "olap";
public static final String ENGINE_JDBC = "jdbc";
public static final String ENGINE_ELASTICSEARCH = "elasticsearch";
public static final String ENGINE_ODBC = "odbc";
public static final String ENGINE_MYSQL = "mysql";
public static final String ENGINE_BROKER = "broker";
public static final String ENGINE_HIVE = "hive";
public static final String ENGINE_ICEBERG = "iceberg";
private static final ImmutableSet<AggregateType> GENERATED_COLUMN_ALLOW_AGG_TYPE =
ImmutableSet.of(AggregateType.REPLACE, AggregateType.REPLACE_IF_NOT_NULL);
private static final Logger LOG = LogManager.getLogger(CreateTableInfo.class);
private final boolean ifNotExists;
private String ctlName;
private String dbName;
private final String tableName;
private List<ColumnDefinition> columns;
private final List<IndexDefinition> indexes;
private final List<String> ctasColumns;
private String engineName;
private KeysType keysType;
private List<String> keys;
private final String comment;
private DistributionDescriptor distribution;
private final List<RollupDefinition> rollups;
private Map<String, String> properties;
private Map<String, String> extProperties;
private boolean isEnableMergeOnWrite = false;
private boolean isEnableSkipBitmapColumn = false;
private boolean isExternal = false;
private boolean isTemp = false;
private String clusterName = null;
private List<String> clusterKeysColumnNames = null;
private PartitionTableInfo partitionTableInfo; // get when validate
/**
* constructor for create table
*/
public CreateTableInfo(boolean ifNotExists, boolean isExternal, boolean isTemp, String ctlName, String dbName,
String tableName, List<ColumnDefinition> columns, List<IndexDefinition> indexes,
String engineName, KeysType keysType, List<String> keys, String comment,
PartitionTableInfo partitionTableInfo,
DistributionDescriptor distribution, List<RollupDefinition> rollups,
Map<String, String> properties, Map<String, String> extProperties,
List<String> clusterKeyColumnNames) {
this.ifNotExists = ifNotExists;
this.isExternal = isExternal;
this.isTemp = isTemp;
this.ctlName = ctlName;
this.dbName = dbName;
this.tableName = tableName;
this.ctasColumns = null;
this.columns = Utils.copyRequiredMutableList(columns);
this.indexes = Utils.copyRequiredMutableList(indexes);
this.engineName = engineName;
this.keysType = keysType;
this.keys = Utils.copyRequiredList(keys);
this.comment = comment;
this.partitionTableInfo = partitionTableInfo;
this.distribution = distribution;
this.rollups = Utils.copyRequiredList(rollups);
this.properties = properties;
PropertyAnalyzer.getInstance().rewriteForceProperties(this.properties);
this.extProperties = extProperties;
this.clusterKeysColumnNames = Utils.copyRequiredList(clusterKeyColumnNames);
}
/**
* constructor for create table as select
*/
public CreateTableInfo(boolean ifNotExists, boolean isExternal, boolean isTemp, String ctlName, String dbName,
String tableName, List<String> cols, String engineName, KeysType keysType,
List<String> keys, String comment,
PartitionTableInfo partitionTableInfo,
DistributionDescriptor distribution, List<RollupDefinition> rollups,
Map<String, String> properties, Map<String, String> extProperties,
List<String> clusterKeyColumnNames) {
this.ifNotExists = ifNotExists;
this.isExternal = isExternal;
this.isTemp = isTemp;
this.ctlName = ctlName;
this.dbName = dbName;
this.tableName = tableName;
this.ctasColumns = cols;
this.columns = null;
this.indexes = Lists.newArrayList();
this.engineName = engineName;
this.keysType = keysType;
this.keys = Utils.copyRequiredList(keys);
this.comment = comment;
this.partitionTableInfo = partitionTableInfo;
this.distribution = distribution;
this.rollups = Utils.copyRequiredList(rollups);
this.properties = properties;
PropertyAnalyzer.getInstance().rewriteForceProperties(this.properties);
this.extProperties = extProperties;
this.clusterKeysColumnNames = Utils.copyRequiredList(clusterKeyColumnNames);
}
/**
* withTableNameAndIfNotExists
*/
public CreateTableInfo withTableNameAndIfNotExists(String tableName, boolean ifNotExists) {
if (ctasColumns != null) {
return new CreateTableInfo(ifNotExists, isExternal, isTemp, ctlName, dbName, tableName, ctasColumns,
engineName, keysType, keys, comment, partitionTableInfo, distribution, rollups, properties,
extProperties, clusterKeysColumnNames);
} else {
return new CreateTableInfo(ifNotExists, isExternal, isTemp, ctlName, dbName, tableName, columns, indexes,
engineName, keysType, keys, comment, partitionTableInfo, distribution, rollups, properties,
extProperties, clusterKeysColumnNames);
}
}
public List<String> getCtasColumns() {
return ctasColumns;
}
public String getCtlName() {
return ctlName;
}
public String getDbName() {
return dbName;
}
public String getTableName() {
return tableName;
}
public String getEngineName() {
return engineName;
}
public Map<String, String> getProperties() {
return properties;
}
/**
* full qualifier table name.
*/
public List<String> getTableNameParts() {
if (ctlName != null && dbName != null) {
return ImmutableList.of(ctlName, dbName, tableName);
}
if (dbName != null) {
return ImmutableList.of(dbName, tableName);
}
return ImmutableList.of(tableName);
}
private void checkEngineWithCatalog() {
if (engineName.equals(ENGINE_OLAP)) {
if (!ctlName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
throw new AnalysisException("Cannot create olap table out of internal catalog."
+ " Make sure 'engine' type is specified when use the catalog: " + ctlName);
}
}
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
if (catalog instanceof HMSExternalCatalog && !engineName.equals(ENGINE_HIVE)) {
throw new AnalysisException("Hms type catalog can only use `hive` engine.");
} else if (catalog instanceof IcebergExternalCatalog && !engineName.equals(ENGINE_ICEBERG)) {
throw new AnalysisException("Iceberg type catalog can only use `iceberg` engine.");
}
}
/**
* analyze create table info
*/
public void validate(ConnectContext ctx) {
// pre-block in some cases.
if (columns.isEmpty()) {
throw new AnalysisException("table should contain at least one column");
}
// analyze catalog name
if (Strings.isNullOrEmpty(ctlName)) {
if (ctx.getCurrentCatalog() != null) {
ctlName = ctx.getCurrentCatalog().getName();
} else {
ctlName = InternalCatalog.INTERNAL_CATALOG_NAME;
}
}
paddingEngineName(ctlName, ctx);
checkEngineName();
if (properties == null) {
properties = Maps.newHashMap();
}
if (engineName.equalsIgnoreCase(ENGINE_OLAP)) {
if (distribution == null) {
distribution = new DistributionDescriptor(false, true, FeConstants.default_bucket_num, null);
}
properties = maybeRewriteByAutoBucket(distribution, properties);
}
try {
// check display name for temporary table, its inner name cannot pass validation
FeNameFormat.checkTableName(Util.getTempTableDisplayName(tableName));
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
checkEngineWithCatalog();
// analyze table name
if (Strings.isNullOrEmpty(dbName)) {
dbName = ctx.getDatabase();
}
try {
InternalDatabaseUtil.checkDatabase(dbName, ConnectContext.get());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), ctlName, dbName,
tableName, PrivPredicate.CREATE)) {
try {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"CREATE");
} catch (Exception ex) {
throw new AnalysisException(ex.getMessage(), ex.getCause());
}
}
Preconditions.checkState(!Strings.isNullOrEmpty(ctlName), "catalog name is null or empty");
Preconditions.checkState(!Strings.isNullOrEmpty(dbName), "database name is null or empty");
//check datev1 and decimalv2
for (ColumnDefinition columnDef : columns) {
String columnNameUpperCase = columnDef.getName().toUpperCase();
if (columnNameUpperCase.startsWith("__DORIS_")) {
throw new AnalysisException(
"Disable to create table column with name start with __DORIS_: " + columnNameUpperCase);
}
if (columnDef.getType().isVariantType() && columnNameUpperCase.indexOf('.') != -1) {
throw new AnalysisException(
"Disable to create table of `VARIANT` type column named with a `.` character: "
+ columnNameUpperCase);
}
if (columnDef.getType().isDateType() && Config.disable_datev1) {
throw new AnalysisException(
"Disable to create table with `DATE` type columns, please use `DATEV2`.");
}
if (columnDef.getType().isDecimalV2Type() && Config.disable_decimalv2) {
throw new AnalysisException("Disable to create table with `DECIMAL` type columns,"
+ "please use `DECIMALV3`.");
}
}
// check duplicated columns
Map<String, ColumnDefinition> columnMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
columns.forEach(c -> {
if (columnMap.put(c.getName(), c) != null) {
try {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME,
c.getName());
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
});
if (engineName.equalsIgnoreCase(ENGINE_OLAP)) {
boolean enableDuplicateWithoutKeysByDefault = false;
properties = PropertyAnalyzer.getInstance().rewriteOlapProperties(ctlName, dbName, properties);
try {
if (properties != null) {
enableDuplicateWithoutKeysByDefault =
PropertyAnalyzer.analyzeEnableDuplicateWithoutKeysByDefault(properties);
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
if (keys.isEmpty()) {
boolean hasAggColumn = false;
for (ColumnDefinition column : columns) {
if (column.getAggType() != null) {
hasAggColumn = true;
break;
}
}
keys = Lists.newArrayList();
if (hasAggColumn) {
for (ColumnDefinition column : columns) {
if (column.getAggType() != null) {
break;
}
keys.add(column.getName());
}
keysType = KeysType.AGG_KEYS;
} else {
if (!enableDuplicateWithoutKeysByDefault) {
int keyLength = 0;
for (ColumnDefinition column : columns) {
DataType type = column.getType();
Type catalogType = column.getType().toCatalogDataType();
keyLength += catalogType.getIndexSize();
if (keys.size() >= FeConstants.shortkey_max_column_count
|| keyLength > FeConstants.shortkey_maxsize_bytes) {
if (keys.isEmpty() && type.isStringLikeType()) {
keys.add(column.getName());
}
break;
}
if (!catalogType.couldBeShortKey()) {
break;
}
keys.add(column.getName());
if (type.isVarcharType()) {
break;
}
}
}
keysType = KeysType.DUP_KEYS;
}
// The OLAP table must have at least one short key,
// and the float and double should not be short key,
// so the float and double could not be the first column in OLAP table.
if (keys.isEmpty() && (keysType != KeysType.DUP_KEYS
|| !enableDuplicateWithoutKeysByDefault)) {
throw new AnalysisException(
"The olap table first column could not be float, double, string"
+ " or array, struct, map, please use decimal or varchar instead.");
}
} else if (enableDuplicateWithoutKeysByDefault) {
throw new AnalysisException(
"table property 'enable_duplicate_without_keys_by_default' only can"
+ " set 'true' when create olap table by default.");
}
if (properties != null
&& properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) {
if (!keysType.equals(KeysType.UNIQUE_KEYS)) {
throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE
+ " property only support unique key table");
}
}
if (keysType == KeysType.UNIQUE_KEYS) {
isEnableMergeOnWrite = false;
if (properties != null) {
properties = PropertyAnalyzer.enableUniqueKeyMergeOnWriteIfNotExists(properties);
// `analyzeXXX` would modify `properties`, which will be used later,
// so we just clone a properties map here.
try {
isEnableMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(
new HashMap<>(properties));
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
}
try {
if (Config.random_add_cluster_keys_for_mow && isEnableMergeOnWrite && clusterKeysColumnNames.isEmpty()
&& PropertyAnalyzer.analyzeUseLightSchemaChange(new HashMap<>(properties))) {
// exclude columns whose data type can not be cluster key, see {@link ColumnDefinition#validate}
List<ColumnDefinition> clusterKeysCandidates = columns.stream().filter(c -> {
DataType type = c.getType();
return !(type.isFloatLikeType() || type.isStringType() || type.isArrayType()
|| type.isBitmapType() || type.isHllType() || type.isQuantileStateType()
|| type.isJsonType()
|| type.isVariantType()
|| type.isMapType()
|| type.isStructType());
}).collect(Collectors.toList());
if (clusterKeysCandidates.size() > 0) {
clusterKeysColumnNames = new ArrayList<>();
Random random = new Random();
int randomClusterKeysCount = random.nextInt(clusterKeysCandidates.size()) + 1;
Collections.shuffle(clusterKeysCandidates);
for (int i = 0; i < randomClusterKeysCount; i++) {
clusterKeysColumnNames.add(clusterKeysCandidates.get(i).getName());
}
LOG.info("Randomly add cluster keys for table {}.{}: {}",
dbName, tableName, clusterKeysColumnNames);
}
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
validateKeyColumns();
if (!clusterKeysColumnNames.isEmpty()) {
if (!isEnableMergeOnWrite) {
throw new AnalysisException(
"Cluster keys only support unique keys table which enabled "
+ PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
}
}
for (int i = 0; i < keys.size(); ++i) {
columns.get(i).setIsKey(true);
}
if (keysType != KeysType.AGG_KEYS) {
AggregateType type = AggregateType.REPLACE;
if (keysType == KeysType.DUP_KEYS) {
type = AggregateType.NONE;
}
if (keysType == KeysType.UNIQUE_KEYS && isEnableMergeOnWrite) {
type = AggregateType.NONE;
}
for (int i = keys.size(); i < columns.size(); ++i) {
columns.get(i).setAggType(type);
}
}
// add hidden column
if (keysType.equals(KeysType.UNIQUE_KEYS)) {
if (isEnableMergeOnWrite) {
columns.add(ColumnDefinition.newDeleteSignColumnDefinition(AggregateType.NONE));
} else {
columns.add(
ColumnDefinition.newDeleteSignColumnDefinition(AggregateType.REPLACE));
}
}
// add a hidden column as row store
boolean storeRowColumn = false;
List<String> rowStoreColumns = null;
if (properties != null) {
try {
storeRowColumn =
PropertyAnalyzer.analyzeStoreRowColumn(Maps.newHashMap(properties));
rowStoreColumns = PropertyAnalyzer.analyzeRowStoreColumns(Maps.newHashMap(properties),
columns.stream()
.map(ColumnDefinition::getName)
.collect(Collectors.toList()));
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
if (storeRowColumn || (rowStoreColumns != null && !rowStoreColumns.isEmpty())) {
if (keysType.equals(KeysType.AGG_KEYS)) {
throw new AnalysisException("Aggregate table can't support row column now");
}
if (keysType.equals(KeysType.UNIQUE_KEYS)) {
if (isEnableMergeOnWrite) {
columns.add(
ColumnDefinition.newRowStoreColumnDefinition(AggregateType.NONE));
} else {
columns.add(ColumnDefinition
.newRowStoreColumnDefinition(AggregateType.REPLACE));
}
} else {
columns.add(ColumnDefinition.newRowStoreColumnDefinition(null));
}
}
if (Config.enable_hidden_version_column_by_default
&& keysType.equals(KeysType.UNIQUE_KEYS)) {
if (isEnableMergeOnWrite) {
columns.add(ColumnDefinition.newVersionColumnDefinition(AggregateType.NONE));
} else {
columns.add(ColumnDefinition.newVersionColumnDefinition(AggregateType.REPLACE));
}
}
if (properties != null) {
if (properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_SKIP_BITMAP_COLUMN)
&& !(keysType.equals(KeysType.UNIQUE_KEYS) && isEnableMergeOnWrite)) {
throw new AnalysisException("table property enable_unique_key_skip_bitmap_column can"
+ "only be set in merge-on-write unique table.");
}
// the merge-on-write table must have enable_unique_key_skip_bitmap_column table property
// and its value should be consistent with whether the table's full schema contains
// the skip bitmap hidden column
if (keysType.equals(KeysType.UNIQUE_KEYS) && isEnableMergeOnWrite) {
properties = PropertyAnalyzer.addEnableUniqueKeySkipBitmapPropertyIfNotExists(properties);
// `analyzeXXX` would modify `properties`, which will be used later,
// so we just clone a properties map here.
try {
isEnableSkipBitmapColumn = PropertyAnalyzer.analyzeUniqueKeySkipBitmapColumn(
new HashMap<>(properties));
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
}
if (isEnableSkipBitmapColumn && keysType.equals(KeysType.UNIQUE_KEYS)) {
if (isEnableMergeOnWrite) {
columns.add(ColumnDefinition.newSkipBitmapColumnDef(AggregateType.NONE));
}
// TODO(bobhan1): add support for mor table
}
// validate partition
partitionTableInfo.extractPartitionColumns();
partitionTableInfo.validatePartitionInfo(
engineName, columns, columnMap, properties, ctx, isEnableMergeOnWrite, isExternal);
// validate distribution descriptor
distribution.updateCols(columns.get(0).getName());
distribution.validate(columnMap, keysType);
// validate key set.
if (!distribution.isHash()) {
if (keysType.equals(KeysType.UNIQUE_KEYS)) {
throw new AnalysisException(
"Should not be distributed by random when keys type is unique");
} else if (keysType.equals(KeysType.AGG_KEYS)) {
for (ColumnDefinition c : columns) {
if (AggregateType.REPLACE.equals(c.getAggType())
|| AggregateType.REPLACE_IF_NOT_NULL.equals(c.getAggType())) {
throw new AnalysisException(
"Should not be distributed by random when keys type is agg"
+ "and column is in replace, [" + c.getName()
+ "] is invalid");
}
}
}
}
for (RollupDefinition rollup : rollups) {
rollup.validate();
}
} else {
// mysql, broker and hive do not need key desc
if (keysType != null) {
throw new AnalysisException(
"Create " + engineName + " table should not contain keys desc");
}
if (!rollups.isEmpty()) {
throw new AnalysisException(engineName + " catalog doesn't support rollup tables.");
}
if (engineName.equalsIgnoreCase(ENGINE_ICEBERG) && distribution != null) {
throw new AnalysisException(
"Iceberg doesn't support 'DISTRIBUTE BY', "
+ "and you can use 'bucket(num, column)' in 'PARTITIONED BY'.");
}
for (ColumnDefinition columnDef : columns) {
if (!columnDef.isNullable()
&& engineName.equalsIgnoreCase(ENGINE_HIVE)) {
throw new AnalysisException(engineName + " catalog doesn't support column with 'NOT NULL'.");
}
columnDef.setIsKey(true);
}
// TODO: support iceberg partition check
if (engineName.equalsIgnoreCase(ENGINE_HIVE)) {
partitionTableInfo.validatePartitionInfo(
engineName, columns, columnMap, properties, ctx, false, true);
}
}
// validate column
try {
if (!engineName.equals(ENGINE_ELASTICSEARCH) && columns.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
final boolean finalEnableMergeOnWrite = isEnableMergeOnWrite;
Set<String> keysSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
keysSet.addAll(keys);
Set<String> clusterKeySet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
clusterKeySet.addAll(clusterKeysColumnNames);
columns.forEach(c -> c.validate(engineName.equals(ENGINE_OLAP), keysSet, clusterKeySet, finalEnableMergeOnWrite,
keysType));
// validate index
if (!indexes.isEmpty()) {
Set<String> distinct = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
Set<Pair<IndexDef.IndexType, List<String>>> distinctCol = new HashSet<>();
TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat;
try {
invertedIndexFileStorageFormat = PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(
new HashMap<>(properties));
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
for (IndexDefinition indexDef : indexes) {
indexDef.validate();
if (!engineName.equalsIgnoreCase(ENGINE_OLAP)) {
throw new AnalysisException(
"index only support in olap engine at current version.");
}
for (String indexColName : indexDef.getColumnNames()) {
boolean found = false;
for (ColumnDefinition column : columns) {
if (column.getName().equalsIgnoreCase(indexColName)) {
indexDef.checkColumn(column, keysType, isEnableMergeOnWrite,
invertedIndexFileStorageFormat);
found = true;
break;
}
}
if (!found) {
throw new AnalysisException(
"Column does not exist in table. invalid column: " + indexColName);
}
}
distinct.add(indexDef.getIndexName());
distinctCol.add(Pair.of(indexDef.getIndexType(), indexDef.getColumnNames().stream()
.map(String::toUpperCase).collect(Collectors.toList())));
}
if (distinct.size() != indexes.size()) {
throw new AnalysisException("index name must be unique.");
}
if (distinctCol.size() != indexes.size()) {
throw new AnalysisException(
"same index columns have multiple same type index is not allowed.");
}
}
generatedColumnCheck(ctx);
}
private void paddingEngineName(String ctlName, ConnectContext ctx) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(ctlName));
if (Strings.isNullOrEmpty(engineName)) {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
if (catalog == null) {
throw new AnalysisException("Unknown catalog: " + ctlName);
}
if (catalog instanceof InternalCatalog) {
engineName = ENGINE_OLAP;
} else if (catalog instanceof HMSExternalCatalog) {
engineName = ENGINE_HIVE;
} else if (catalog instanceof IcebergExternalCatalog) {
engineName = ENGINE_ICEBERG;
} else {
throw new AnalysisException("Current catalog does not support create table: " + ctlName);
}
}
}
/**
* validate ctas definition
*/
public void validateCreateTableAsSelect(List<String> qualifierTableName, List<ColumnDefinition> columns,
ConnectContext ctx) {
String catalogName = qualifierTableName.get(0);
paddingEngineName(catalogName, ctx);
this.columns = Utils.copyRequiredMutableList(columns);
// bucket num is hard coded 10 to be consistent with legacy planner
if (engineName.equals(ENGINE_OLAP) && this.distribution == null) {
if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
throw new AnalysisException("Cannot create olap table out of internal catalog."
+ " Make sure 'engine' type is specified when use the catalog: " + catalogName);
}
this.distribution = new DistributionDescriptor(true, false, 10,
Lists.newArrayList(columns.get(0).getName()));
}
validate(ctx);
}
private void checkEngineName() {
if (engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_ODBC) || engineName.equals(ENGINE_BROKER)
|| engineName.equals(ENGINE_ELASTICSEARCH) || engineName.equals(ENGINE_HIVE)
|| engineName.equals(ENGINE_ICEBERG) || engineName.equals(ENGINE_JDBC)) {
if (!isExternal) {
// this is for compatibility
isExternal = true;
}
} else {
if (isExternal) {
throw new AnalysisException(
"Do not support external table with engine name = olap");
} else if (!engineName.equals(ENGINE_OLAP)) {
throw new AnalysisException(
"Do not support table with engine name = " + engineName);
}
}
if (isTemp && !engineName.equals(ENGINE_OLAP)) {
throw new AnalysisException("Do not support temporary table with engine name = " + engineName);
}
if (isTemp && !rollups.isEmpty()) {
throw new AnalysisException("Do not support temporary table with rollup ");
}
if ((engineName.equals(ENGINE_ODBC)
|| engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_BROKER))) {
throw new AnalysisException("odbc, mysql and broker table is no longer supported."
+ " For odbc and mysql external table, use jdbc table or jdbc catalog instead."
+ " For broker table, use table valued function instead.");
}
}
/**
* if auto bucket auto bucket enable, rewrite distribution bucket num &&
* set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
*
* @param distributionDesc distributionDesc
* @param properties properties
* @return new properties
*/
public static Map<String, String> maybeRewriteByAutoBucket(
DistributionDescriptor distributionDesc, Map<String, String> properties) {
if (distributionDesc == null || !distributionDesc.isAutoBucket()) {
return properties;
}
// auto bucket is enable
Map<String, String> newProperties = properties;
if (newProperties == null) {
newProperties = new HashMap<String, String>();
}
newProperties.put(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "true");
try {
if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
distributionDesc.updateBucketNum(FeConstants.default_bucket_num);
} else {
long partitionSize = ParseUtil.analyzeDataVolume(
newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
distributionDesc.updateBucketNum(AutoBucketUtils.getBucketsNum(partitionSize,
Config.autobucket_min_buckets));
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
return newProperties;
}
private void validateKeyColumns() {
if (keysType == null) {
throw new AnalysisException("Keys type is null.");
}
if (keys.isEmpty() && keysType != KeysType.DUP_KEYS) {
throw new AnalysisException("The number of key columns is 0.");
}
if (keys.size() > columns.size()) {
throw new AnalysisException(
"The number of key columns should be less than the number of columns.");
}
for (int i = 0; i < keys.size(); ++i) {
String name = columns.get(i).getName();
if (!keys.get(i).equalsIgnoreCase(name)) {
String keyName = keys.get(i);
if (columns.stream().noneMatch(col -> col.getName().equalsIgnoreCase(keyName))) {
throw new AnalysisException("Key column[" + keyName + "] doesn't exist.");
}
throw new AnalysisException("Key columns should be a ordered prefix of the schema."
+ " KeyColumns[" + i + "] (starts from zero) is " + keyName + ", "
+ "but corresponding column is " + name + " in the previous "
+ "columns declaration.");
}
if (columns.get(i).getAggType() != null) {
throw new AnalysisException(
"Key column[" + name + "] should not specify aggregate type.");
}
}
// for olap table
for (int i = keys.size(); i < columns.size(); ++i) {
if (keysType == KeysType.AGG_KEYS) {
if (columns.get(i).getAggType() == null) {
throw new AnalysisException(
keysType.name() + " table should specify aggregate type for "
+ "non-key column[" + columns.get(i).getName() + "]");
}
} else {
if (columns.get(i).getAggType() != null) {
throw new AnalysisException(
keysType.name() + " table should not specify aggregate type for "
+ "non-key column[" + columns.get(i).getName() + "]");
}
}
}
if (!clusterKeysColumnNames.isEmpty()) {
// the same code as KeysDesc#analyzeClusterKeys
if (keysType != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("Cluster keys only support unique keys table");
}
// check that cluster keys is not duplicated
for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
String name = clusterKeysColumnNames.get(i);
for (int j = 0; j < i; j++) {
if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
throw new AnalysisException("Duplicate cluster key column[" + name + "].");
}
}
}
// check that cluster keys is not equal to primary keys
int minKeySize = Math.min(keys.size(), clusterKeysColumnNames.size());
boolean sameKey = true;
for (int i = 0; i < minKeySize; ++i) {
if (!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
sameKey = false;
break;
}
}
if (sameKey && !Config.random_add_cluster_keys_for_mow) {
throw new AnalysisException("Unique keys and cluster keys should be different.");
}
// check that cluster key column exists
for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
String name = clusterKeysColumnNames.get(i);
for (int j = 0; j < columns.size(); j++) {
if (columns.get(j).getName().equalsIgnoreCase(name)) {
columns.get(j).setClusterKeyId(i);
break;
}
if (j == columns.size() - 1) {
throw new AnalysisException("Cluster key column[" + name + "] doesn't exist.");
}
}
}
}
}
/**
* translate to catalog create table stmt
*/
public CreateTableStmt translateToLegacyStmt() {
PartitionDesc partitionDesc = partitionTableInfo.convertToPartitionDesc(isExternal);
List<AlterClause> addRollups = Lists.newArrayList();
if (!rollups.isEmpty()) {
addRollups.addAll(rollups.stream().map(RollupDefinition::translateToCatalogStyle)
.collect(Collectors.toList()));
}
List<Column> catalogColumns = columns.stream()
.map(ColumnDefinition::translateToCatalogStyle).collect(Collectors.toList());
List<Index> catalogIndexes = indexes.stream().map(IndexDefinition::translateToCatalogStyle)
.collect(Collectors.toList());
DistributionDesc distributionDesc =
distribution != null ? distribution.translateToCatalogStyle() : null;
// TODO should move this code to validate function
// EsUtil.analyzePartitionAndDistributionDesc only accept DistributionDesc and PartitionDesc
if (engineName.equals(ENGINE_ELASTICSEARCH)) {
try {
EsUtil.analyzePartitionAndDistributionDesc(partitionDesc, distributionDesc);
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
} else if (!engineName.equals(ENGINE_OLAP)) {
if (!engineName.equals(ENGINE_HIVE) && distributionDesc != null) {
throw new AnalysisException("Create " + engineName
+ " table should not contain distribution desc");
}
if (!engineName.equals(ENGINE_HIVE) && !engineName.equals(ENGINE_ICEBERG) && partitionDesc != null) {
throw new AnalysisException("Create " + engineName
+ " table should not contain partition desc");
}
}
return new CreateTableStmt(ifNotExists, isExternal, isTemp,
new TableName(ctlName, dbName, tableName),
catalogColumns, catalogIndexes, engineName,
new KeysDesc(keysType, keys, clusterKeysColumnNames),
partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties,
comment, addRollups, null);
}
public void setIsExternal(boolean isExternal) {
this.isExternal = isExternal;
}
private void generatedColumnCommonCheck() {
for (ColumnDefinition column : columns) {
if (keysType == KeysType.AGG_KEYS && column.getGeneratedColumnDesc().isPresent()
&& (!column.isKey() && !GENERATED_COLUMN_ALLOW_AGG_TYPE.contains(column.getAggType()))) {
throw new AnalysisException("The generated columns can be key columns, "
+ "or value columns of replace and replace_if_not_null aggregation type.");
}
if (column.getGeneratedColumnDesc().isPresent() && !engineName.equalsIgnoreCase("olap")) {
throw new AnalysisException("Tables can only have generated columns if the olap engine is used");
}
}
}
private void generatedColumnCheck(ConnectContext ctx) {
generatedColumnCommonCheck();
LogicalEmptyRelation plan = new LogicalEmptyRelation(
ConnectContext.get().getStatementContext().getNextRelationId(),
new ArrayList<>());
CascadesContext cascadesContext = CascadesContext.initContext(ctx.getStatementContext(), plan,
PhysicalProperties.ANY);
Map<String, Slot> columnToSlotReference = Maps.newHashMap();
Map<String, ColumnDefinition> nameToColumnDefinition = Maps.newHashMap();
Map<Slot, SlotRefAndIdx> translateMap = Maps.newHashMap();
for (int i = 0; i < columns.size(); i++) {
ColumnDefinition column = columns.get(i);
Slot slot = new SlotReference(column.getName(), column.getType());
columnToSlotReference.put(column.getName(), slot);
nameToColumnDefinition.put(column.getName(), column);
SlotRef slotRef = new SlotRef(null, column.getName());
slotRef.setType(column.getType().toCatalogDataType());
translateMap.put(slot, new SlotRefAndIdx(slotRef, i, column.getGeneratedColumnDesc().isPresent()));
}
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext);
List<Slot> slots = Lists.newArrayList(columnToSlotReference.values());
List<GeneratedColumnUtil.ExprAndname> exprAndnames = Lists.newArrayList();
for (int i = 0; i < columns.size(); i++) {
ColumnDefinition column = columns.get(i);
Optional<GeneratedColumnDesc> info = column.getGeneratedColumnDesc();
if (!info.isPresent()) {
continue;
}
Expression parsedExpression = info.get().getExpression();
checkParsedExpressionInGeneratedColumn(parsedExpression);
Expression boundSlotExpression = SlotReplacer.INSTANCE.replace(parsedExpression, columnToSlotReference);
Scope scope = new Scope(slots);
ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope, cascadesContext, false, false);
Expression expr;
try {
expr = analyzer.analyze(boundSlotExpression, new ExpressionRewriteContext(cascadesContext));
} catch (AnalysisException e) {
throw new AnalysisException("In generated column '" + column.getName() + "', "
+ Utils.convertFirstChar(e.getMessage()));
}
checkExpressionInGeneratedColumn(expr, column, nameToColumnDefinition);
TypeCoercionUtils.checkCanCastTo(expr.getDataType(), column.getType());
ExpressionToExpr translator = new ExpressionToExpr(i, translateMap);
Expr e = expr.accept(translator, planTranslatorContext);
info.get().setExpr(e);
exprAndnames.add(new GeneratedColumnUtil.ExprAndname(e.clone(), column.getName()));
}
// for alter drop column
Map<String, List<String>> columnToListOfGeneratedColumnsThatReferToThis = new HashMap<>();
for (ColumnDefinition column : columns) {
Optional<GeneratedColumnDesc> info = column.getGeneratedColumnDesc();
if (!info.isPresent()) {
continue;
}
Expr generatedExpr = info.get().getExpr();
Set<Expr> slotRefsInGeneratedExpr = new HashSet<>();
generatedExpr.collect(e -> e instanceof SlotRef, slotRefsInGeneratedExpr);
for (Expr slotRefExpr : slotRefsInGeneratedExpr) {
String name = ((SlotRef) slotRefExpr).getColumnName();
columnToListOfGeneratedColumnsThatReferToThis
.computeIfAbsent(name, k -> new ArrayList<>())
.add(column.getName());
}
}
for (Map.Entry<String, List<String>> entry : columnToListOfGeneratedColumnsThatReferToThis.entrySet()) {
ColumnDefinition col = nameToColumnDefinition.get(entry.getKey());
col.addGeneratedColumnsThatReferToThis(entry.getValue());
}
// expand expr
GeneratedColumnUtil.rewriteColumns(exprAndnames);
for (GeneratedColumnUtil.ExprAndname exprAndname : exprAndnames) {
if (nameToColumnDefinition.containsKey(exprAndname.getName())) {
ColumnDefinition columnDefinition = nameToColumnDefinition.get(exprAndname.getName());
Optional<GeneratedColumnDesc> info = columnDefinition.getGeneratedColumnDesc();
info.ifPresent(genCol -> genCol.setExpandExprForLoad(exprAndname.getExpr()));
}
}
}
private static class SlotReplacer extends DefaultExpressionRewriter<Map<String, Slot>> {
public static final SlotReplacer INSTANCE = new SlotReplacer();
public Expression replace(Expression e, Map<String, Slot> replaceMap) {
return e.accept(this, replaceMap);
}
@Override
public Expression visitUnboundSlot(UnboundSlot unboundSlot, Map<String, Slot> replaceMap) {
if (!replaceMap.containsKey(unboundSlot.getName())) {
throw new AnalysisException("Unknown column '" + unboundSlot.getName()
+ "' in 'generated column function'");
}
return replaceMap.get(unboundSlot.getName());
}
}
void checkParsedExpressionInGeneratedColumn(Expression expr) {
expr.foreach(e -> {
if (e instanceof SubqueryExpr) {
throw new AnalysisException("Generated column does not support subquery.");
} else if (e instanceof Lambda) {
throw new AnalysisException("Generated column does not support lambda.");
}
});
}
void checkExpressionInGeneratedColumn(Expression expr, ColumnDefinition column,
Map<String, ColumnDefinition> nameToColumnDefinition) {
expr.foreach(e -> {
if (e instanceof Variable) {
throw new AnalysisException("Generated column expression cannot contain variable.");
} else if (e instanceof Slot && nameToColumnDefinition.containsKey(((Slot) e).getName())) {
ColumnDefinition columnDefinition = nameToColumnDefinition.get(((Slot) e).getName());
if (columnDefinition.getAutoIncInitValue() != -1) {
throw new AnalysisException(
"Generated column '" + column.getName()
+ "' cannot refer to auto-increment column.");
}
} else if (e instanceof BoundFunction && !checkFunctionInGeneratedColumn((BoundFunction) e)) {
throw new AnalysisException("Expression of generated column '"
+ column.getName() + "' contains a disallowed function:'"
+ ((BoundFunction) e).getName() + "'");
}
});
}
boolean checkFunctionInGeneratedColumn(Expression expr) {
if (!(expr instanceof ScalarFunction)) {
return false;
}
if (expr instanceof Udf) {
return false;
}
if (expr instanceof GroupingScalarFunction) {
return false;
}
return true;
}
private static class ExpressionToExpr extends ExpressionTranslator {
private final Map<Slot, SlotRefAndIdx> slotRefMap;
private final int index;
public ExpressionToExpr(int index, Map<Slot, SlotRefAndIdx> slotRefMap) {
this.index = index;
this.slotRefMap = slotRefMap;
}
@Override
public Expr visitSlotReference(SlotReference slotReference, PlanTranslatorContext context) {
if (!slotRefMap.containsKey(slotReference)) {
throw new AnalysisException("Unknown column '" + slotReference.getName() + "'");
}
int idx = slotRefMap.get(slotReference).getIdx();
boolean isGenCol = slotRefMap.get(slotReference).isGeneratedColumn();
if (isGenCol && idx >= index) {
throw new AnalysisException(
"Generated column can refer only to generated columns defined prior to it.");
}
return slotRefMap.get(slotReference).getSlotRef();
}
}
/** SlotRefAndIdx */
public static class SlotRefAndIdx {
private final SlotRef slotRef;
private final int idx;
private final boolean isGeneratedColumn;
public SlotRefAndIdx(SlotRef slotRef, int idx, boolean isGeneratedColumn) {
this.slotRef = slotRef;
this.idx = idx;
this.isGeneratedColumn = isGeneratedColumn;
}
public int getIdx() {
return idx;
}
public SlotRef getSlotRef() {
return slotRef;
}
public boolean isGeneratedColumn() {
return isGeneratedColumn;
}
}
public PartitionTableInfo getPartitionTableInfo() {
return partitionTableInfo;
}
public DistributionDescriptor getDistribution() {
return distribution;
}
public boolean isTemp() {
return isTemp;
}
}