MaxComputeMetadataOps.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.maxcompute;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.Tables;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* MaxCompute metadata operations for DDL support (CREATE TABLE, etc.)
*/
public class MaxComputeMetadataOps implements ExternalMetadataOps {
private static final Logger LOG = LogManager.getLogger(MaxComputeMetadataOps.class);
private static final long MAX_LIFECYCLE_DAYS = 37231;
private static final int MAX_BUCKET_NUM = 1024;
private final MaxComputeExternalCatalog dorisCatalog;
private final Odps odps;
public MaxComputeMetadataOps(MaxComputeExternalCatalog dorisCatalog, Odps odps) {
this.dorisCatalog = dorisCatalog;
this.odps = odps;
}
@Override
public void close() {
}
@Override
public boolean tableExist(String dbName, String tblName) {
return dorisCatalog.tableExist(null, dbName, tblName);
}
@Override
public boolean databaseExist(String dbName) {
return dorisCatalog.getMcStructureHelper().databaseExist(dorisCatalog.getClient(), dbName);
}
@Override
public List<String> listDatabaseNames() {
return dorisCatalog.listDatabaseNames();
}
@Override
public List<String> listTableNames(String dbName) {
return dorisCatalog.listTableNames(null, dbName);
}
// ==================== Create Database (not supported yet) ====================
@Override
public boolean createDbImpl(String dbName, boolean ifNotExists, Map<String, String> properties)
throws DdlException {
throw new DdlException("Create database is not supported for MaxCompute catalog.");
}
@Override
public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException {
throw new DdlException("Drop database is not supported for MaxCompute catalog.");
}
@Override
public void afterDropDb(String dbName) {
}
// ==================== Create Table ====================
@Override
public boolean createTableImpl(CreateTableInfo createTableInfo) throws UserException {
String dbName = createTableInfo.getDbName();
String tableName = createTableInfo.getTableName();
// 1. Validate database existence
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
throw new UserException(
"Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName());
}
// 2. Check if table exists in remote
if (tableExist(db.getRemoteName(), tableName)) {
if (createTableInfo.isIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
// 3. Check if table exists in local (case sensitivity issue)
ExternalTable dorisTable = db.getTableNullable(tableName);
if (dorisTable != null) {
if (createTableInfo.isIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
// 4. Validate columns
List<Column> columns = createTableInfo.getColumns();
validateColumns(columns);
// 5. Validate partition description
PartitionDesc partitionDesc = createTableInfo.getPartitionDesc();
validatePartitionDesc(partitionDesc);
// 6. Build MaxCompute TableSchema
TableSchema schema = buildMaxComputeTableSchema(columns, partitionDesc);
// 7. Extract properties
Map<String, String> properties = createTableInfo.getProperties();
Long lifecycle = extractLifecycle(properties);
Map<String, String> mcProperties = extractMaxComputeProperties(properties);
Integer bucketNum = extractBucketNum(createTableInfo);
// 8. Create table via MaxCompute SDK
McStructureHelper structureHelper = dorisCatalog.getMcStructureHelper();
Tables.TableCreator creator = structureHelper.createTableCreator(
odps, db.getRemoteName(), tableName, schema);
if (createTableInfo.isIfNotExists()) {
creator.ifNotExists();
}
String comment = createTableInfo.getComment();
if (comment != null && !comment.isEmpty()) {
creator.withComment(comment);
}
if (lifecycle != null) {
creator.withLifeCycle(lifecycle);
}
if (!mcProperties.isEmpty()) {
creator.withTblProperties(mcProperties);
}
if (bucketNum != null) {
creator.withBucketNum(bucketNum);
}
try {
creator.create();
} catch (OdpsException e) {
throw new DdlException("Failed to create MaxCompute table '" + tableName + "': " + e.getMessage(), e);
}
return false;
}
@Override
public void afterCreateTable(String dbName, String tblName) {
Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
if (db.isPresent()) {
db.get().resetMetaCacheNames();
}
LOG.info("after create table {}.{}.{}, is db exists: {}",
dorisCatalog.getName(), dbName, tblName, db.isPresent());
}
// ==================== Drop Table (not supported yet) ====================
@Override
public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) throws DdlException {
// Get remote names (handles case-sensitivity)
String remoteDbName = dorisTable.getRemoteDbName();
String remoteTblName = dorisTable.getRemoteName();
// Check table existence
if (!tableExist(remoteDbName, remoteTblName)) {
if (ifExists) {
LOG.info("drop table[{}.{}] which does not exist", remoteDbName, remoteTblName);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE,
remoteTblName, remoteDbName);
}
}
// Drop table via McStructureHelper
try {
McStructureHelper structureHelper = dorisCatalog.getMcStructureHelper();
structureHelper.dropTable(odps, remoteDbName, remoteTblName, ifExists);
LOG.info("Successfully dropped MaxCompute table: {}.{}", remoteDbName, remoteTblName);
} catch (OdpsException e) {
throw new DdlException("Failed to drop MaxCompute table '"
+ remoteTblName + "': " + e.getMessage(), e);
}
}
@Override
public void afterDropTable(String dbName, String tblName) {
Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
if (db.isPresent()) {
db.get().unregisterTable(tblName);
}
LOG.info("after drop table {}.{}.{}, is db exists: {}",
dorisCatalog.getName(), dbName, tblName, db.isPresent());
}
@Override
public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions) throws DdlException {
throw new DdlException("Truncate table is not supported for MaxCompute catalog.");
}
// ==================== Branch/Tag (not supported) ====================
@Override
public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceBranchInfo branchInfo)
throws UserException {
throw new UserException("Branch operations are not supported for MaxCompute catalog.");
}
@Override
public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagInfo tagInfo)
throws UserException {
throw new UserException("Tag operations are not supported for MaxCompute catalog.");
}
@Override
public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) throws UserException {
throw new UserException("Tag operations are not supported for MaxCompute catalog.");
}
@Override
public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) throws UserException {
throw new UserException("Branch operations are not supported for MaxCompute catalog.");
}
// ==================== Type Conversion ====================
/**
* Convert Doris type to MaxCompute TypeInfo.
*/
public static TypeInfo dorisTypeToMcType(Type dorisType) throws UserException {
if (dorisType.isScalarType()) {
return dorisScalarTypeToMcType(dorisType);
} else if (dorisType.isArrayType()) {
ArrayType arrayType = (ArrayType) dorisType;
TypeInfo elementType = dorisTypeToMcType(arrayType.getItemType());
return TypeInfoFactory.getArrayTypeInfo(elementType);
} else if (dorisType.isMapType()) {
MapType mapType = (MapType) dorisType;
TypeInfo keyType = dorisTypeToMcType(mapType.getKeyType());
TypeInfo valueType = dorisTypeToMcType(mapType.getValueType());
return TypeInfoFactory.getMapTypeInfo(keyType, valueType);
} else if (dorisType.isStructType()) {
StructType structType = (StructType) dorisType;
List<StructField> fields = structType.getFields();
List<String> fieldNames = new ArrayList<>(fields.size());
List<TypeInfo> fieldTypes = new ArrayList<>(fields.size());
for (StructField field : fields) {
fieldNames.add(field.getName());
fieldTypes.add(dorisTypeToMcType(field.getType()));
}
return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes);
} else {
throw new UserException("Unsupported Doris type for MaxCompute: " + dorisType);
}
}
private static TypeInfo dorisScalarTypeToMcType(Type dorisType) throws UserException {
PrimitiveType primitiveType = dorisType.getPrimitiveType();
switch (primitiveType) {
case BOOLEAN:
return TypeInfoFactory.BOOLEAN;
case TINYINT:
return TypeInfoFactory.TINYINT;
case SMALLINT:
return TypeInfoFactory.SMALLINT;
case INT:
return TypeInfoFactory.INT;
case BIGINT:
return TypeInfoFactory.BIGINT;
case FLOAT:
return TypeInfoFactory.FLOAT;
case DOUBLE:
return TypeInfoFactory.DOUBLE;
case CHAR:
return TypeInfoFactory.getCharTypeInfo(((ScalarType) dorisType).getLength());
case VARCHAR:
return TypeInfoFactory.getVarcharTypeInfo(((ScalarType) dorisType).getLength());
case STRING:
return TypeInfoFactory.STRING;
case DECIMALV2:
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
case DECIMAL256:
return TypeInfoFactory.getDecimalTypeInfo(
((ScalarType) dorisType).getScalarPrecision(),
((ScalarType) dorisType).getScalarScale());
case DATE:
case DATEV2:
return TypeInfoFactory.DATE;
case DATETIME:
case DATETIMEV2:
return TypeInfoFactory.DATETIME;
case LARGEINT:
case HLL:
case BITMAP:
case QUANTILE_STATE:
case AGG_STATE:
case JSONB:
case VARIANT:
case IPV4:
case IPV6:
default:
throw new UserException(
"Unsupported Doris type for MaxCompute: " + primitiveType);
}
}
// ==================== Validation ====================
private void validateColumns(List<Column> columns) throws UserException {
if (columns == null || columns.isEmpty()) {
throw new UserException("Table must have at least one column.");
}
Set<String> columnNames = new HashSet<>();
for (Column col : columns) {
if (col.isAutoInc()) {
throw new UserException(
"Auto-increment columns are not supported for MaxCompute tables: " + col.getName());
}
if (col.isAggregated()) {
throw new UserException(
"Aggregation columns are not supported for MaxCompute tables: " + col.getName());
}
String lowerName = col.getName().toLowerCase();
if (!columnNames.add(lowerName)) {
throw new UserException("Duplicate column name: " + col.getName());
}
// Validate that the type is convertible
dorisTypeToMcType(col.getType());
}
}
private void validatePartitionDesc(PartitionDesc partitionDesc) throws UserException {
if (partitionDesc == null) {
return;
}
ArrayList<Expr> exprs = partitionDesc.getPartitionExprs();
if (exprs == null || exprs.isEmpty()) {
return;
}
for (Expr expr : exprs) {
if (expr instanceof SlotRef) {
// Identity partition - OK
} else if (expr instanceof FunctionCallExpr) {
String funcName = ((FunctionCallExpr) expr).getFnName().getFunction();
throw new UserException(
"MaxCompute does not support partition transform '" + funcName
+ "'. Only identity partitions are supported.");
} else {
throw new UserException("Invalid partition expression: " + expr.toSql());
}
}
}
// ==================== Schema Building ====================
private TableSchema buildMaxComputeTableSchema(List<Column> columns, PartitionDesc partitionDesc)
throws UserException {
Set<String> partitionColNames = new HashSet<>();
if (partitionDesc != null && partitionDesc.getPartitionColNames() != null) {
for (String name : partitionDesc.getPartitionColNames()) {
partitionColNames.add(name.toLowerCase());
}
}
TableSchema schema = new TableSchema();
// Add regular columns (non-partition)
for (Column col : columns) {
if (!partitionColNames.contains(col.getName().toLowerCase())) {
TypeInfo mcType = dorisTypeToMcType(col.getType());
com.aliyun.odps.Column mcCol = new com.aliyun.odps.Column(
col.getName(), mcType, col.getComment());
schema.addColumn(mcCol);
}
}
// Add partition columns in the order specified by partitionDesc
if (partitionDesc != null && partitionDesc.getPartitionColNames() != null) {
for (String partColName : partitionDesc.getPartitionColNames()) {
Column col = findColumnByName(columns, partColName);
if (col == null) {
throw new UserException("Partition column '" + partColName + "' not found in column definitions.");
}
TypeInfo mcType = dorisTypeToMcType(col.getType());
com.aliyun.odps.Column mcCol = new com.aliyun.odps.Column(
col.getName(), mcType, col.getComment());
schema.addPartitionColumn(mcCol);
}
}
return schema;
}
private Column findColumnByName(List<Column> columns, String name) {
for (Column col : columns) {
if (col.getName().equalsIgnoreCase(name)) {
return col;
}
}
return null;
}
// ==================== Property Extraction ====================
private Long extractLifecycle(Map<String, String> properties) throws UserException {
String lifecycleStr = properties.get("mc.lifecycle");
if (lifecycleStr == null) {
lifecycleStr = properties.get("lifecycle");
}
if (lifecycleStr != null) {
try {
long lifecycle = Long.parseLong(lifecycleStr);
if (lifecycle <= 0 || lifecycle > MAX_LIFECYCLE_DAYS) {
throw new UserException(
"Invalid lifecycle value: " + lifecycle
+ ". Must be between 1 and " + MAX_LIFECYCLE_DAYS + ".");
}
return lifecycle;
} catch (NumberFormatException e) {
throw new UserException("Invalid lifecycle value: '" + lifecycleStr + "'. Must be a positive integer.");
}
}
return null;
}
private Map<String, String> extractMaxComputeProperties(Map<String, String> properties) {
Map<String, String> mcProperties = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith("mc.tblproperty.")) {
String mcKey = entry.getKey().substring("mc.tblproperty.".length());
mcProperties.put(mcKey, entry.getValue());
}
}
return mcProperties;
}
private Integer extractBucketNum(CreateTableInfo createTableInfo) throws UserException {
DistributionDesc distributionDesc = createTableInfo.getDistributionDesc();
if (distributionDesc == null) {
return null;
}
if (!(distributionDesc instanceof HashDistributionDesc)) {
throw new UserException(
"MaxCompute only supports hash distribution. Got: " + distributionDesc.getClass().getSimpleName());
}
HashDistributionDesc hashDist = (HashDistributionDesc) distributionDesc;
int bucketNum = hashDist.getBuckets();
if (bucketNum <= 0 || bucketNum > MAX_BUCKET_NUM) {
throw new UserException(
"Invalid bucket number: " + bucketNum + ". Must be between 1 and " + MAX_BUCKET_NUM + ".");
}
return bucketNum;
}
}