IcebergMetadataOps.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.DorisTypeVisitor;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class IcebergMetadataOps implements ExternalMetadataOps {
private static final Logger LOG = LogManager.getLogger(IcebergMetadataOps.class);
protected Catalog catalog;
protected ExternalCatalog dorisCatalog;
protected SupportsNamespaces nsCatalog;
private PreExecutionAuthenticator preExecutionAuthenticator;
// Generally, there should be only two levels under the catalog, namely <database>.<table>,
// but the REST type catalog is obtained from an external server,
// and the level provided by the external server may be three levels, <catalog>.<database>.<table>.
// Therefore, if the external server provides a catalog,
// the catalog needs to be recorded here to ensure semantic consistency.
private Optional<String> externalCatalogName = Optional.empty();
public IcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
this.dorisCatalog = dorisCatalog;
this.catalog = catalog;
nsCatalog = (SupportsNamespaces) catalog;
this.preExecutionAuthenticator = dorisCatalog.getPreExecutionAuthenticator();
if (dorisCatalog.getProperties().containsKey(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME)) {
externalCatalogName =
Optional.of(dorisCatalog.getProperties().get(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME));
}
}
public Catalog getCatalog() {
return catalog;
}
public ExternalCatalog getExternalCatalog() {
return dorisCatalog;
}
@Override
public void close() {
if (catalog != null) {
catalog = null;
}
}
@Override
public boolean tableExist(String dbName, String tblName) {
try {
return preExecutionAuthenticator.execute(() -> catalog.tableExists(getTableIdentifier(dbName, tblName)));
} catch (Exception e) {
throw new RuntimeException("Failed to check table exist, error message is:" + e.getMessage(), e);
}
}
public boolean databaseExist(String dbName) {
try {
return preExecutionAuthenticator.execute(() -> nsCatalog.namespaceExists(getNamespace(dbName)));
} catch (Exception e) {
throw new RuntimeException("Failed to check database exist, error message is:" + e.getMessage(), e);
}
}
public List<String> listDatabaseNames() {
try {
return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces(getNamespace())
.stream()
.map(n -> n.level(n.length() - 1))
.collect(Collectors.toList()));
} catch (Exception e) {
throw new RuntimeException("Failed to list database names, error message is:" + e.getMessage(), e);
}
}
@Override
public List<String> listTableNames(String dbName) {
try {
return preExecutionAuthenticator.execute(() -> {
List<TableIdentifier> tableIdentifiers = catalog.listTables(getNamespace(dbName));
return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
});
} catch (Exception e) {
throw new RuntimeException("Failed to list table names, error message is:" + e.getMessage(), e);
}
}
@Override
public void createDbImpl(CreateDbStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performCreateDb(stmt);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to create database: "
+ stmt.getFullDbName() + ": " + Util.getRootCauseMessage(e), e);
}
}
@Override
public void createDbImpl(CreateDatabaseCommand command) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performCreateDb(command);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to create database: "
+ command.getDbName() + ": " + Util.getRootCauseMessage(e), e);
}
}
@Override
public void afterCreateDb(String dbName) {
dorisCatalog.onRefreshCache(true);
}
private void performCreateDb(CreateDbStmt stmt) throws DdlException {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
String dbName = stmt.getFullDbName();
Map<String, String> properties = stmt.getProperties();
if (databaseExist(dbName)) {
if (stmt.isSetIfNotExists()) {
LOG.info("create database[{}] which already exists", dbName);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName);
}
}
if (!properties.isEmpty() && dorisCatalog instanceof IcebergExternalCatalog) {
String icebergCatalogType = ((IcebergExternalCatalog) dorisCatalog).getIcebergCatalogType();
if (!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
throw new DdlException(
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
}
}
nsCatalog.createNamespace(getNamespace(dbName), properties);
}
private void performCreateDb(CreateDatabaseCommand command) throws DdlException {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
String dbName = command.getDbName();
Map<String, String> properties = command.getProperties();
if (databaseExist(dbName)) {
if (command.isIfNotExists()) {
LOG.info("create database[{}] which already exists", dbName);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName);
}
}
if (!properties.isEmpty() && dorisCatalog instanceof IcebergExternalCatalog) {
String icebergCatalogType = ((IcebergExternalCatalog) dorisCatalog).getIcebergCatalogType();
if (!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
throw new DdlException(
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
}
}
nsCatalog.createNamespace(getNamespace(dbName), properties);
}
@Override
public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
preformDropDb(dbName, ifExists, force);
return null;
});
} catch (Exception e) {
throw new DdlException(
"Failed to drop database: " + dbName + ", error message is:" + e.getMessage(), e);
}
}
private void preformDropDb(String dbName, boolean ifExists, boolean force) throws DdlException {
if (!databaseExist(dbName)) {
if (ifExists) {
LOG.info("drop database[{}] which does not exist", dbName);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
}
}
if (force) {
// try to drop all tables in the database
List<String> tables = listTableNames(dbName);
for (String table : tables) {
performDropTable(dbName, table, true);
}
if (!tables.isEmpty()) {
LOG.info("drop database[{}] with force, drop all tables, num: {}", dbName, tables.size());
}
}
nsCatalog.dropNamespace(getNamespace(dbName));
}
@Override
public void afterDropDb(String dbName) {
dorisCatalog.onRefreshCache(true);
}
@Override
public boolean createTableImpl(CreateTableStmt stmt) throws UserException {
try {
preExecutionAuthenticator.execute(() -> performCreateTable(stmt));
} catch (Exception e) {
throw new DdlException(
"Failed to create table: " + stmt.getTableName() + ", error message is:" + e.getMessage(), e);
}
return false;
}
public boolean performCreateTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName());
}
String tableName = stmt.getTableName();
if (tableExist(dbName, tableName)) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
List<Column> columns = stmt.getColumns();
List<StructField> collect = columns.stream()
.map(col -> new StructField(col.getName(), col.getType(), col.getComment(), col.isAllowNull()))
.collect(Collectors.toList());
StructType structType = new StructType(new ArrayList<>(collect));
org.apache.iceberg.types.Type visit =
DorisTypeVisitor.visit(structType, new DorisTypeToIcebergType(structType));
Schema schema = new Schema(visit.asNestedType().asStructType().fields());
Map<String, String> properties = stmt.getProperties();
properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
catalog.createTable(getTableIdentifier(dbName, tableName), schema, partitionSpec, properties);
return false;
}
@Override
public void afterCreateTable(String dbName, String tblName) {
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db != null) {
db.setUnInitialized(true);
}
}
@Override
public void dropTableImpl(DropTableStmt stmt) throws DdlException {
if (stmt == null) {
throw new DdlException("DropTableStmt is null");
}
dropTableImpl(stmt.getDbName(), stmt.getTableName(), stmt.isSetIfExists());
}
public void dropTableImpl(String dbName, String tableName, boolean ifExists) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performDropTable(dbName, tableName, ifExists);
return null;
});
} catch (Exception e) {
throw new DdlException(
"Failed to drop table: " + tableName + ", error message is:" + e.getMessage(), e);
}
}
@Override
public void afterDropTable(String dbName, String tblName) {
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db != null) {
db.setUnInitialized(true);
}
}
private void performDropTable(DropTableStmt stmt) throws DdlException {
if (stmt == null) {
throw new DdlException("DropTableStmt is null");
}
performDropTable(stmt.getDbName(), stmt.getTableName(), stmt.isSetIfExists());
}
private void performDropTable(String dbName, String tableName, boolean ifExists) throws DdlException {
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
if (ifExists) {
LOG.info("database [{}] does not exist when drop table[{}]", dbName, tableName);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
}
if (!tableExist(dbName, tableName)) {
if (ifExists) {
LOG.info("drop table[{}] which does not exist", tableName);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName);
}
}
catalog.dropTable(getTableIdentifier(dbName, tableName), true);
}
@Override
public void truncateTableImpl(String dbName, String tblName, List<String> partitions) {
throw new UnsupportedOperationException("Truncate Iceberg table is not supported.");
}
public PreExecutionAuthenticator getPreExecutionAuthenticator() {
return preExecutionAuthenticator;
}
@Override
public Table loadTable(String dbName, String tblName) {
try {
return preExecutionAuthenticator.execute(() -> catalog.loadTable(getTableIdentifier(dbName, tblName)));
} catch (Exception e) {
throw new RuntimeException("Failed to load table, error message is:" + e.getMessage(), e);
}
}
private TableIdentifier getTableIdentifier(String dbName, String tblName) {
return externalCatalogName
.map(s -> TableIdentifier.of(s, dbName, tblName))
.orElseGet(() -> TableIdentifier.of(dbName, tblName));
}
private Namespace getNamespace(String dbName) {
return externalCatalogName
.map(s -> Namespace.of(s, dbName))
.orElseGet(() -> Namespace.of(dbName));
}
private Namespace getNamespace() {
return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty());
}
}