PaimonMetadataOps.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.paimon;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.DorisTypeVisitor;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException;
import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class PaimonMetadataOps implements ExternalMetadataOps {
private static final Logger LOG = LogManager.getLogger(PaimonMetadataOps.class);
protected Catalog catalog;
protected ExternalCatalog dorisCatalog;
private ExecutionAuthenticator executionAuthenticator;
private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
private static final String PROP_COMMENT = "comment";
private static final String PROP_LOCATION = "location";
public PaimonMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
this.dorisCatalog = dorisCatalog;
this.catalog = catalog;
this.executionAuthenticator = dorisCatalog.getExecutionAuthenticator();
}
@Override
public boolean createDbImpl(String dbName, boolean ifNotExists, Map<String, String> properties)
throws DdlException {
try {
return executionAuthenticator.execute(() -> performCreateDb(dbName, ifNotExists, properties));
} catch (Exception e) {
throw new DdlException("Failed to create database: "
+ dbName + ": " + Util.getRootCauseMessage(e), e);
}
}
private boolean performCreateDb(String dbName, boolean ifNotExists, Map<String, String> properties)
throws DdlException, Catalog.DatabaseAlreadyExistException {
if (databaseExist(dbName)) {
if (ifNotExists) {
LOG.info("create database[{}] which already exists", dbName);
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName);
}
}
if (!properties.isEmpty() && dorisCatalog instanceof PaimonExternalCatalog) {
String catalogType = ((PaimonExternalCatalog) dorisCatalog).getCatalogType();
if (!PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)) {
throw new DdlException(
"Not supported: create database with properties for paimon catalog type: " + catalogType);
}
}
catalog.createDatabase(dbName, ifNotExists, properties);
return false;
}
@Override
public void afterCreateDb() {
dorisCatalog.resetMetaCacheNames();
}
@Override
public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException {
try {
executionAuthenticator.execute(() -> {
performDropDb(dbName, ifExists, force);
return null;
});
} catch (Exception e) {
throw new DdlException(
"Failed to drop database: " + dbName + ", error message is:" + e.getMessage(), e);
}
}
private void performDropDb(String dbName, boolean ifExists, boolean force) throws DdlException {
ExternalDatabase dorisDb = dorisCatalog.getDbNullable(dbName);
if (dorisDb == null) {
if (ifExists) {
LOG.info("drop database[{}] which does not exist", dbName);
// Database does not exist and IF EXISTS is specified; treat as no-op.
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
// ErrorReport.reportDdlException is expected to throw DdlException.
return;
}
}
if (force) {
List<String> tableNames = listTableNames(dbName);
if (!tableNames.isEmpty()) {
LOG.info("drop database[{}] with force, drop all tables, num: {}", dbName, tableNames.size());
}
for (String tableName : tableNames) {
performDropTable(dbName, tableName, true);
}
}
try {
catalog.dropDatabase(dbName, ifExists, force);
} catch (DatabaseNotExistException e) {
throw new RuntimeException("database " + dbName + " does not exist!");
} catch (DatabaseNotEmptyException e) {
throw new RuntimeException("database " + dbName + " is not empty! please check!");
}
}
@Override
public void afterDropDb(String dbName) {
dorisCatalog.unregisterDatabase(dbName);
}
@Override
public boolean createTableImpl(CreateTableInfo createTableInfo) throws UserException {
try {
return executionAuthenticator.execute(() -> performCreateTable(createTableInfo));
} catch (Exception e) {
throw new DdlException(
"Failed to create table: " + createTableInfo.getTableName() + ", error message is:" + e.getMessage(),
e);
}
}
public boolean performCreateTable(CreateTableInfo createTableInfo) throws UserException {
String dbName = createTableInfo.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName());
}
String tableName = createTableInfo.getTableName();
// 1. first, check if table exist in remote
if (tableExist(db.getRemoteName(), tableName)) {
if (createTableInfo.isIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
// 2. second, check if table exist in local.
// This is because case sensibility issue, eg:
// 1. lower_case_table_name = 1
// 2. create table tbl1;
// 3. create table TBL1; TBL1 does not exist in remote because the remote system is case-sensitive.
// but because lower_case_table_name = 1, the table can not be created in Doris because it is conflict with
// tbl1
ExternalTable dorisTable = db.getTableNullable(tableName);
if (dorisTable != null) {
if (createTableInfo.isIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
List<ColumnDefinition> columns = createTableInfo.getColumnDefinitions();
List<StructField> collect = columns.stream()
.map(col -> new StructField(col.getName(), col.getType().toCatalogDataType(),
col.getComment(), col.isNullable()))
.collect(Collectors.toList());
StructType structType = new StructType(new ArrayList<>(collect));
Schema schema = toPaimonSchema(structType, createTableInfo.getPartitionDesc(), createTableInfo.getProperties());
try {
catalog.createTable(new Identifier(createTableInfo.getDbName(), createTableInfo.getTableName()),
schema, createTableInfo.isIfNotExists());
} catch (TableAlreadyExistException | DatabaseNotExistException e) {
throw new RuntimeException(e);
}
return false;
}
private Schema toPaimonSchema(StructType structType, PartitionDesc partitionDesc, Map<String, String> properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(PROP_COMMENT);
if (normalizedProperties.containsKey(PROP_LOCATION)) {
String path = normalizedProperties.remove(PROP_LOCATION);
normalizedProperties.put(CoreOptions.PATH.key(), path);
}
String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
List<String> primaryKeys = pkAsString == null ? Collections.emptyList() : Arrays.stream(pkAsString.split(","))
.map(String::trim)
.collect(Collectors.toList());
List<String> partitionKeys = partitionDesc == null ? new ArrayList<>() : partitionDesc.getPartitionColNames();
Schema.Builder schemaBuilder = Schema.newBuilder()
.options(normalizedProperties)
.primaryKey(primaryKeys)
.partitionKeys(partitionKeys)
.comment(properties.getOrDefault(PROP_COMMENT, null));
for (StructField field : structType.getFields()) {
schemaBuilder.column(field.getName(),
toPaimontype(field.getType()).copy(field.getContainsNull()),
field.getComment());
}
return schemaBuilder.build();
}
private DataType toPaimontype(Type type) {
return DorisTypeVisitor.visit(type, new DorisToPaimonTypeVisitor());
}
@Override
public void afterCreateTable(String dbName, String tblName) {
Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
if (db.isPresent()) {
db.get().resetMetaCacheNames();
}
LOG.info("after create table {}.{}.{}, is db exists: {}",
dorisCatalog.getName(), dbName, tblName, db.isPresent());
}
@Override
public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) throws DdlException {
try {
executionAuthenticator.execute(() -> {
performDropTable(dorisTable.getRemoteDbName(), dorisTable.getRemoteName(), ifExists);
return null;
});
} catch (Exception e) {
throw new DdlException(
"Failed to drop table: " + dorisTable.getName() + ", error message is:" + e.getMessage(), e);
}
}
private void performDropTable(String dBName, String tableName, boolean ifExists) throws DdlException {
if (!tableExist(dBName, tableName)) {
if (ifExists) {
LOG.info("drop table[{}] which does not exist", tableName);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dBName);
}
}
try {
catalog.dropTable(Identifier.create(dBName, tableName), ifExists);
} catch (TableNotExistException e) {
throw new RuntimeException("table " + tableName + " does not exist");
}
}
@Override
public void afterDropTable(String dbName, String tblName) {
Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
db.ifPresent(externalDatabase -> externalDatabase.unregisterTable(tblName));
LOG.info("after drop table {}.{}.{}. is db exists: {}",
dorisCatalog.getName(), dbName, tblName, db.isPresent());
}
@Override
public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions) throws DdlException {
throw new UnsupportedOperationException("truncate table is not a supported operation!");
}
@Override
public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceBranchInfo branchInfo)
throws UserException {
throw new UnsupportedOperationException("create or replace branch is not a supported operation!");
}
@Override
public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagInfo tagInfo) throws UserException {
throw new UnsupportedOperationException("create or replace tag is not a supported operation!");
}
@Override
public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) throws UserException {
throw new UnsupportedOperationException("drop tag is not a supported operation!");
}
@Override
public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) throws UserException {
throw new UnsupportedOperationException("drop branch is not a supported operation!");
}
@Override
public List<String> listDatabaseNames() {
try {
return executionAuthenticator.execute(() -> new ArrayList<>(catalog.listDatabases()));
} catch (Exception e) {
throw new RuntimeException("Failed to list databases names, catalog name: " + dorisCatalog.getName(), e);
}
}
@Override
public List<String> listTableNames(String db) {
try {
return executionAuthenticator.execute(() -> {
List<String> tableNames = new ArrayList<>();
try {
tableNames.addAll(catalog.listTables(db));
} catch (DatabaseNotExistException e) {
LOG.warn("DatabaseNotExistException", e);
}
return tableNames;
});
} catch (Exception e) {
throw new RuntimeException("Failed to list table names, catalog name: " + dorisCatalog.getName(), e);
}
}
@Override
public boolean tableExist(String dbName, String tblName) {
try {
return executionAuthenticator.execute(() -> {
try {
catalog.getTable(Identifier.create(dbName, tblName));
return true;
} catch (TableNotExistException e) {
return false;
}
});
} catch (Exception e) {
throw new RuntimeException("Failed to check table existence, catalog name: " + dorisCatalog.getName()
+ "error message is:" + ExceptionUtils.getRootCauseMessage(e), e);
}
}
@Override
public boolean databaseExist(String dbName) {
try {
return executionAuthenticator.execute(() -> {
try {
catalog.getDatabase(dbName);
return true;
} catch (DatabaseNotExistException e) {
return false;
}
});
} catch (Exception e) {
throw new RuntimeException("Failed to check database exist, error message is:" + e.getMessage(), e);
}
}
public Catalog getCatalog() {
return catalog;
}
@Override
public void close() {
if (catalog != null) {
catalog = null;
}
}
}