IcebergMetadataOps.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg;

import org.apache.doris.analysis.ColumnPosition;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.DorisTypeVisitor;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.nereids.trees.plans.commands.info.BranchOptions;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TagOptions;

import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.view.View;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

public class IcebergMetadataOps implements ExternalMetadataOps {

    private static final Logger LOG = LogManager.getLogger(IcebergMetadataOps.class);
    protected Catalog catalog;
    protected ExternalCatalog dorisCatalog;
    protected SupportsNamespaces nsCatalog;
    private PreExecutionAuthenticator preExecutionAuthenticator;
    // Generally, there should be only two levels under the catalog, namely <database>.<table>,
    // but the REST type catalog is obtained from an external server,
    // and the level provided by the external server may be three levels, <catalog>.<database>.<table>.
    // Therefore, if the external server provides a catalog,
    // the catalog needs to be recorded here to ensure semantic consistency.
    private Optional<String> externalCatalogName = Optional.empty();

    public IcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
        this.dorisCatalog = dorisCatalog;
        this.catalog = catalog;
        nsCatalog = (SupportsNamespaces) catalog;
        this.preExecutionAuthenticator = dorisCatalog.getPreExecutionAuthenticator();

        if (dorisCatalog.getProperties().containsKey(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME)) {
            externalCatalogName =
                Optional.of(dorisCatalog.getProperties().get(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME));
        }
    }

    public Catalog getCatalog() {
        return catalog;
    }

    public ExternalCatalog getExternalCatalog() {
        return dorisCatalog;
    }

    @Override
    public void close() {
        if (catalog != null) {
            catalog = null;
        }
    }

    @Override
    public boolean tableExist(String dbName, String tblName) {
        try {
            return preExecutionAuthenticator.execute(() -> catalog.tableExists(getTableIdentifier(dbName, tblName)));
        } catch (Exception e) {
            throw new RuntimeException("Failed to check table exist, error message is:" + e.getMessage(), e);
        }
    }

    public boolean databaseExist(String dbName) {
        try {
            return preExecutionAuthenticator.execute(() -> nsCatalog.namespaceExists(getNamespace(dbName)));
        } catch (Exception e) {
            throw new RuntimeException("Failed to check database exist, error message is:" + e.getMessage(), e);
        }
    }

    public List<String> listDatabaseNames() {
        try {
            return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces(getNamespace())
                   .stream()
                   .map(n -> n.level(n.length() - 1))
                   .collect(Collectors.toList()));
        } catch (Exception e) {
            throw new RuntimeException("Failed to list database names, error message is:" + e.getMessage(), e);
        }
    }


    @Override
    public List<String> listTableNames(String dbName) {
        try {
            return preExecutionAuthenticator.execute(() -> {
                List<TableIdentifier> tableIdentifiers = catalog.listTables(getNamespace(dbName));
                List<String> views;
                // Our original intention was simply to clearly define the responsibilities of ViewCatalog and Catalog.
                // IcebergMetadataOps handles listTableNames and listViewNames separately.
                // listTableNames should only focus on the table type,
                // but in reality, Iceberg's return includes views. Therefore, we added a filter to exclude views.
                if (catalog instanceof ViewCatalog) {
                    views = ((ViewCatalog) catalog).listViews(getNamespace(dbName))
                            .stream().map(TableIdentifier::name).collect(Collectors.toList());
                } else {
                    views = Collections.emptyList();
                }
                if (views.isEmpty()) {
                    return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
                } else {
                    return tableIdentifiers.stream()
                            .map(TableIdentifier::name)
                            .filter(name -> !views.contains(name)).collect(Collectors.toList());
                }
            });
        } catch (Exception e) {
            throw new RuntimeException("Failed to list table names, error message is:" + e.getMessage(), e);
        }
    }

    @Override
    public boolean createDbImpl(String dbName, boolean ifNotExists, Map<String, String> properties)
            throws DdlException {
        try {
            return preExecutionAuthenticator.execute(() -> performCreateDb(dbName, ifNotExists, properties));
        } catch (Exception e) {
            throw new DdlException("Failed to create database: "
                    + dbName + ": " + Util.getRootCauseMessage(e), e);
        }
    }

    @Override
    public void afterCreateDb() {
        dorisCatalog.resetMetaCacheNames();
    }

    private boolean performCreateDb(String dbName, boolean ifNotExists, Map<String, String> properties)
            throws DdlException {
        SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
        if (databaseExist(dbName)) {
            if (ifNotExists) {
                LOG.info("create database[{}] which already exists", dbName);
                return true;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName);
            }
        }
        if (!properties.isEmpty() && dorisCatalog instanceof IcebergExternalCatalog) {
            String icebergCatalogType = ((IcebergExternalCatalog) dorisCatalog).getIcebergCatalogType();
            if (!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
                throw new DdlException(
                        "Not supported: create database with properties for iceberg catalog type: "
                                + icebergCatalogType);
            }
        }
        nsCatalog.createNamespace(getNamespace(dbName), properties);
        return false;
    }

    @Override
    public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException {
        try {
            preExecutionAuthenticator.execute(() -> {
                preformDropDb(dbName, ifExists, force);
                return null;
            });
        } catch (Exception e) {
            throw new DdlException(
                    "Failed to drop database: " + dbName + ", error message is:" + e.getMessage(), e);
        }
    }

    private void preformDropDb(String dbName, boolean ifExists, boolean force) throws DdlException {
        ExternalDatabase dorisDb = dorisCatalog.getDbNullable(dbName);
        if (dorisDb == null) {
            if (ifExists) {
                LOG.info("drop database[{}] which does not exist", dbName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
            }
        }
        if (force) {
            // try to drop all tables in the database
            List<String> remoteTableNames = listTableNames(dorisDb.getRemoteName());
            for (String remoteTableName : remoteTableNames) {
                performDropTable(dorisDb.getRemoteName(), remoteTableName, true);
            }
            if (!remoteTableNames.isEmpty()) {
                LOG.info("drop database[{}] with force, drop all tables, num: {}", dbName, remoteTableNames.size());
            }
            // try to drop all views in the database
            List<String> remoteViewNames = listViewNames(dorisDb.getRemoteName());
            for (String remoteViewName : remoteViewNames) {
                performDropView(dorisDb.getRemoteName(), remoteViewName);
            }
            if (!remoteViewNames.isEmpty()) {
                LOG.info("drop database[{}] with force, drop all views, num: {}", dbName, remoteViewNames.size());
            }
        }
        nsCatalog.dropNamespace(getNamespace(dorisDb.getRemoteName()));
    }

    @Override
    public void afterDropDb(String dbName) {
        dorisCatalog.unregisterDatabase(dbName);
    }

    @Override
    public boolean createTableImpl(CreateTableStmt stmt) throws UserException {
        try {
            return preExecutionAuthenticator.execute(() -> performCreateTable(stmt));
        } catch (Exception e) {
            throw new DdlException(
                    "Failed to create table: " + stmt.getTableName() + ", error message is:" + e.getMessage(), e);
        }
    }

    private boolean performCreateTable(CreateTableStmt stmt) throws UserException {
        String dbName = stmt.getDbName();
        ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
        if (db == null) {
            throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName());
        }
        String tableName = stmt.getTableName();
        // 1. first, check if table exist in remote
        if (tableExist(db.getRemoteName(), tableName)) {
            if (stmt.isSetIfNotExists()) {
                LOG.info("create table[{}] which already exists", tableName);
                return true;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
            }
        }
        // 2. second, check fi table exist in local.
        // This is because case sensibility issue, eg:
        // 1. lower_case_table_name = 1
        // 2. create table tbl1;
        // 3. create table TBL1;  TBL1 does not exist in remote because the remote system is case-sensitive.
        //    but because lower_case_table_name = 1, the table can not be created in Doris because it is conflict with
        //    tbl1
        ExternalTable dorisTable = db.getTableNullable(tableName);
        if (dorisTable != null) {
            if (stmt.isSetIfNotExists()) {
                LOG.info("create table[{}] which already exists", tableName);
                return true;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
            }
        }
        List<Column> columns = stmt.getColumns();
        List<StructField> collect = columns.stream()
                .map(col -> new StructField(col.getName(), col.getType(), col.getComment(), col.isAllowNull()))
                .collect(Collectors.toList());
        StructType structType = new StructType(new ArrayList<>(collect));
        Type visit =
                DorisTypeVisitor.visit(structType, new DorisTypeToIcebergType(structType));
        Schema schema = new Schema(visit.asNestedType().asStructType().fields());
        Map<String, String> properties = stmt.getProperties();
        properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
        PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
        catalog.createTable(getTableIdentifier(dbName, tableName), schema, partitionSpec, properties);
        return false;
    }

    @Override
    public void afterCreateTable(String dbName, String tblName) {
        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
        if (db.isPresent()) {
            db.get().resetMetaCacheNames();
        }
        LOG.info("after create table {}.{}.{}, is db exists: {}",
                dorisCatalog.getName(), dbName, tblName, db.isPresent());
    }

    @Override
    public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) throws DdlException {
        try {
            preExecutionAuthenticator.execute(() -> {
                if (getExternalCatalog().getMetadataOps()
                        .viewExists(dorisTable.getRemoteDbName(), dorisTable.getRemoteName())) {
                    performDropView(dorisTable.getRemoteDbName(), dorisTable.getRemoteName());
                } else {
                    performDropTable(dorisTable.getRemoteDbName(), dorisTable.getRemoteName(), ifExists);
                }
                return null;
            });
        } catch (Exception e) {
            throw new DdlException(
                    "Failed to drop table: " + dorisTable.getName() + ", error message is:" + e.getMessage(), e);
        }
    }

    @Override
    public void afterDropTable(String dbName, String tblName) {
        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
        if (db.isPresent()) {
            db.get().unregisterTable(tblName);
        }
        LOG.info("after drop table {}.{}.{}. is db exists: {}",
                dorisCatalog.getName(), dbName, tblName, db.isPresent());
    }

    private void performDropTable(String remoteDbName, String remoteTblName, boolean ifExists) throws DdlException {
        if (!tableExist(remoteDbName, remoteTblName)) {
            if (ifExists) {
                LOG.info("drop table[{}] which does not exist", remoteTblName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, remoteTblName, remoteDbName);
            }
        }
        catalog.dropTable(getTableIdentifier(remoteDbName, remoteTblName), true);
    }

    public void renameTableImpl(String dbName, String tblName, String newTblName) throws DdlException {
        try {
            preExecutionAuthenticator.execute(() -> {
                catalog.renameTable(getTableIdentifier(dbName, tblName), getTableIdentifier(dbName, newTblName));
                return null;
            });
        } catch (Exception e) {
            throw new DdlException(
                    "Failed to rename table: " + tblName + " to " + newTblName + ", error message is:" + e.getMessage(),
                    e);
        }
    }

    @Override
    public void afterRenameTable(String dbName, String oldName, String newName) {
        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
        if (db.isPresent()) {
            db.get().unregisterTable(oldName);
            db.get().resetMetaCacheNames();
        }
        LOG.info("after rename table {}.{}.{} to {}, is db exists: {}",
                dorisCatalog.getName(), dbName, oldName, newName, db.isPresent());
    }

    @Override
    public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions) {
        throw new UnsupportedOperationException("Truncate Iceberg table is not supported.");
    }

    @Override
    public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceBranchInfo branchInfo)
            throws UserException {
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        BranchOptions branchOptions = branchInfo.getBranchOptions();

        Long snapshotId = branchOptions.getSnapshotId()
                .orElse(
                        // use current snapshot
                        Optional.ofNullable(icebergTable.currentSnapshot()).map(Snapshot::snapshotId).orElse(null));

        ManageSnapshots manageSnapshots = icebergTable.manageSnapshots();
        String branchName = branchInfo.getBranchName();
        boolean refExists = null != icebergTable.refs().get(branchName);
        boolean create = branchInfo.getCreate();
        boolean replace = branchInfo.getReplace();
        boolean ifNotExists = branchInfo.getIfNotExists();

        Runnable safeCreateBranch = () -> {
            if (snapshotId == null) {
                manageSnapshots.createBranch(branchName);
            } else {
                manageSnapshots.createBranch(branchName, snapshotId);
            }
        };

        if (create && replace && !refExists) {
            safeCreateBranch.run();
        } else if (replace) {
            if (snapshotId == null) {
                // Cannot perform a replace operation on an empty table
                throw new UserException(
                        "Cannot complete replace branch operation on " + icebergTable.name()
                                + " , main has no snapshot");
            }
            manageSnapshots.replaceBranch(branchName, snapshotId);
        } else {
            if (refExists && ifNotExists) {
                return;
            }
            safeCreateBranch.run();
        }

        branchOptions.getRetain().ifPresent(n -> manageSnapshots.setMaxSnapshotAgeMs(branchName, n));
        branchOptions.getNumSnapshots().ifPresent(n -> manageSnapshots.setMinSnapshotsToKeep(branchName, n));
        branchOptions.getRetention().ifPresent(n -> manageSnapshots.setMaxRefAgeMs(branchName, n));

        try {
            preExecutionAuthenticator.execute(() -> manageSnapshots.commit());
        } catch (Exception e) {
            throw new RuntimeException(
                    "Failed to create or replace branch: " + branchName + " in table: " + icebergTable.name()
                            + ", error message is: " + e.getMessage(), e);
        }
    }

    @Override
    public void afterOperateOnBranchOrTag(String dbName, String tblName) {
        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
        if (db.isPresent()) {
            Optional tbl = db.get().getTableForReplay(tblName);
            if (tbl.isPresent()) {
                Env.getCurrentEnv().getRefreshManager()
                        .refreshTableInternal(db.get(), (ExternalTable) tbl.get(),
                                System.currentTimeMillis());
            }
        }
    }

    @Override
    public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagInfo tagInfo)
            throws UserException {
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        TagOptions tagOptions = tagInfo.getTagOptions();
        Long snapshotId = tagOptions.getSnapshotId()
                .orElse(
                        // use current snapshot
                        Optional.ofNullable(icebergTable.currentSnapshot()).map(Snapshot::snapshotId).orElse(null));

        if (snapshotId == null) {
            // Creating tag for empty tables is not allowed
            throw new UserException(
                    "Cannot complete replace branch operation on " + icebergTable.name() + " , main has no snapshot");
        }

        String tagName = tagInfo.getTagName();
        boolean create = tagInfo.getCreate();
        boolean replace = tagInfo.getReplace();
        boolean ifNotExists = tagInfo.getIfNotExists();
        boolean refExists = null != icebergTable.refs().get(tagName);

        ManageSnapshots manageSnapshots = icebergTable.manageSnapshots();
        if (create && replace && !refExists) {
            manageSnapshots.createTag(tagName, snapshotId);
        } else if (replace) {
            manageSnapshots.replaceTag(tagName, snapshotId);
        } else {
            if (refExists && ifNotExists) {
                return;
            }
            manageSnapshots.createTag(tagName, snapshotId);
        }

        tagOptions.getRetain().ifPresent(n -> manageSnapshots.setMaxRefAgeMs(tagName, n));
        try {
            preExecutionAuthenticator.execute(() -> manageSnapshots.commit());
        } catch (Exception e) {
            throw new RuntimeException(
                    "Failed to create or replace tag: " + tagName + " in table: " + icebergTable.name()
                            + ", error message is: " + e.getMessage(), e);
        }
    }

    @Override
    public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) throws UserException {
        String tagName = tagInfo.getTagName();
        boolean ifExists = tagInfo.getIfExists();
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        SnapshotRef snapshotRef = icebergTable.refs().get(tagName);

        if (snapshotRef != null || !ifExists) {
            ManageSnapshots manageSnapshots = icebergTable.manageSnapshots();
            try {
                preExecutionAuthenticator.execute(() -> manageSnapshots.removeTag(tagName).commit());
            } catch (Exception e) {
                throw new RuntimeException(
                        "Failed to drop tag: " + tagName + " in table: " + icebergTable.name()
                        + ", error message is: " + e.getMessage(), e);
            }
        }
    }

    @Override
    public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) throws UserException {
        String branchName = branchInfo.getBranchName();
        boolean ifExists = branchInfo.getIfExists();
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        SnapshotRef snapshotRef = icebergTable.refs().get(branchName);

        if (snapshotRef != null || !ifExists) {
            ManageSnapshots manageSnapshots = icebergTable.manageSnapshots();
            try {
                preExecutionAuthenticator.execute(() -> manageSnapshots.removeBranch(branchName).commit());
            } catch (Exception e) {
                throw new RuntimeException(
                        "Failed to drop branch: " + branchName + " in table: " + icebergTable.name()
                        + ", error message is: " + e.getMessage(), e);
            }
        }
    }

    private void addOneColumn(UpdateSchema updateSchema, Column column) throws UserException {
        if (!column.isAllowNull()) {
            throw new UserException("can't add a non-nullable column to an Iceberg table");
        }
        org.apache.iceberg.types.Type dorisType = IcebergUtils.dorisTypeToIcebergType(column.getType());
        Literal<?> defaultValue = IcebergUtils.parseIcebergLiteral(column.getDefaultValue(), dorisType);
        updateSchema.addColumn(column.getName(), dorisType, column.getComment(), defaultValue);
    }

    private void applyPosition(UpdateSchema updateSchema, ColumnPosition position, String columnName) {
        if (position.isFirst()) {
            updateSchema.moveFirst(columnName);
        } else {
            updateSchema.moveAfter(columnName, position.getLastCol());
        }
    }

    private void refreshTable(ExternalTable dorisTable) {
        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dorisTable.getRemoteDbName());
        if (db.isPresent()) {
            Optional<?> tbl = db.get().getTableForReplay(dorisTable.getRemoteName());
            if (tbl.isPresent()) {
                Env.getCurrentEnv().getRefreshManager()
                        .refreshTableInternal(db.get(), (ExternalTable) tbl.get(), System.currentTimeMillis());
            }
        }
    }

    @Override
    public void addColumn(ExternalTable dorisTable, Column column, ColumnPosition position)
            throws UserException {
        validateCommonColumnInfo(column);
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        UpdateSchema updateSchema = icebergTable.updateSchema();
        addOneColumn(updateSchema, column);
        if (position != null) {
            applyPosition(updateSchema, position, column.getName());
        }
        try {
            preExecutionAuthenticator.execute(() -> updateSchema.commit());
        } catch (Exception e) {
            throw new UserException("Failed to add column: " + column.getName() + " to table: "
                    + icebergTable.name() + ", error message is: " + e.getMessage(), e);
        }
        refreshTable(dorisTable);
    }

    @Override
    public void addColumns(ExternalTable dorisTable, List<Column> columns) throws UserException {
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        UpdateSchema updateSchema = icebergTable.updateSchema();
        for (Column column : columns) {
            validateCommonColumnInfo(column);
            addOneColumn(updateSchema, column);
        }
        try {
            preExecutionAuthenticator.execute(() -> updateSchema.commit());
        } catch (Exception e) {
            throw new UserException("Failed to add columns to table: " + icebergTable.name()
                    + ", error message is: " + e.getMessage(), e);
        }
        refreshTable(dorisTable);
    }

    @Override
    public void dropColumn(ExternalTable dorisTable, String columnName) throws UserException {
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        UpdateSchema updateSchema = icebergTable.updateSchema();
        updateSchema.deleteColumn(columnName);
        try {
            preExecutionAuthenticator.execute(() -> updateSchema.commit());
        } catch (Exception e) {
            throw new UserException("Failed to drop column: " + columnName + " from table: "
                    + icebergTable.name() + ", error message is: " + e.getMessage(), e);
        }
        refreshTable(dorisTable);
    }

    @Override
    public void renameColumn(ExternalTable dorisTable, String oldName, String newName) throws UserException {
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        UpdateSchema updateSchema = icebergTable.updateSchema();
        updateSchema.renameColumn(oldName, newName);
        try {
            preExecutionAuthenticator.execute(() -> updateSchema.commit());
        } catch (Exception e) {
            throw new UserException("Failed to rename column: " + oldName + " to " + newName
                    + " in table: " + icebergTable.name() + ", error message is: " + e.getMessage(), e);
        }
        refreshTable(dorisTable);
    }

    @Override
    public void modifyColumn(ExternalTable dorisTable, Column column, ColumnPosition position)
            throws UserException {
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        validateForModifyColumn(column, icebergTable);
        Type icebergType = IcebergUtils.dorisTypeToIcebergType(column.getType());
        UpdateSchema updateSchema = icebergTable.updateSchema();
        updateSchema.updateColumn(column.getName(), icebergType.asPrimitiveType(), column.getComment());
        if (column.isAllowNull()) {
            // we can change a required column to optional, but not the other way around
            // because we don't know whether there is existing data with null values.
            updateSchema.makeColumnOptional(column.getName());
        }
        if (position != null) {
            applyPosition(updateSchema, position, column.getName());
        }
        try {
            preExecutionAuthenticator.execute(() -> updateSchema.commit());
        } catch (Exception e) {
            throw new UserException("Failed to modify column: " + column.getName() + " in table: "
                    + icebergTable.name() + ", error message is: " + e.getMessage(), e);
        }
        refreshTable(dorisTable);
    }

    private void validateForModifyColumn(Column column, Table icebergTable) throws UserException {
        validateCommonColumnInfo(column);
        // check complex type
        if (column.getType().isComplexType()) {
            throw new UserException("Modify column type to non-primitive type is not supported: " + column.getType());
        }
        // check exist
        NestedField currentCol = icebergTable.schema().findField(column.getName());
        if (currentCol == null) {
            throw new UserException("Column " + column.getName() + " does not exist");
        }
        // check nullable
        if (currentCol.isOptional() && !column.isAllowNull()) {
            throw new UserException("Can not change nullable column " + column.getName() + " to not null");
        }
    }

    private void validateCommonColumnInfo(Column column) throws UserException {
        // check aggregation method
        if (column.isAggregated()) {
            throw new UserException("Can not specify aggregation method for iceberg table column");
        }
        // check auto inc
        if (column.isAutoInc()) {
            throw new UserException("Can not specify auto incremental iceberg table column");
        }
    }

    @Override
    public void reorderColumns(ExternalTable dorisTable, List<String> newOrder) throws UserException {
        if (newOrder == null || newOrder.isEmpty()) {
            throw new UserException("Reorder column failed, new order is empty.");
        }
        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
        UpdateSchema updateSchema = icebergTable.updateSchema();
        updateSchema.moveFirst(newOrder.get(0));
        for (int i = 1; i < newOrder.size(); i++) {
            updateSchema.moveAfter(newOrder.get(i), newOrder.get(i - 1));
        }
        try {
            preExecutionAuthenticator.execute(() -> updateSchema.commit());
        } catch (Exception e) {
            throw new UserException("Failed to reorder columns in table: " + icebergTable.name()
                    + ", error message is: " + e.getMessage(), e);
        }
        refreshTable(dorisTable);
    }

    public PreExecutionAuthenticator getPreExecutionAuthenticator() {
        return preExecutionAuthenticator;
    }

    @Override
    public Table loadTable(String dbName, String tblName) {
        try {
            return preExecutionAuthenticator.execute(() -> catalog.loadTable(getTableIdentifier(dbName, tblName)));
        } catch (Exception e) {
            throw new RuntimeException("Failed to load table, error message is:" + e.getMessage(), e);
        }
    }

    @Override
    public boolean viewExists(String remoteDbName, String remoteViewName) {
        if (!(catalog instanceof ViewCatalog)) {
            return false;
        }
        try {
            return preExecutionAuthenticator.execute(() ->
                    ((ViewCatalog) catalog).viewExists(getTableIdentifier(remoteDbName, remoteViewName)));
        } catch (Exception e) {
            throw new RuntimeException("Failed to check view exist, error message is:" + e.getMessage(), e);

        }
    }

    @Override
    public View loadView(String dbName, String tblName) {
        if (!(catalog instanceof ViewCatalog)) {
            return null;
        }
        try {
            ViewCatalog viewCatalog = (ViewCatalog) catalog;
            return preExecutionAuthenticator.execute(() -> viewCatalog.loadView(TableIdentifier.of(dbName, tblName)));
        } catch (Exception e) {
            throw new RuntimeException("Failed to load view, error message is:" + e.getMessage(), e);
        }
    }

    @Override
    public List<String> listViewNames(String db) {
        if (!(catalog instanceof ViewCatalog)) {
            return Collections.emptyList();
        }
        try {
            return preExecutionAuthenticator.execute(() ->
                    ((ViewCatalog) catalog).listViews(Namespace.of(db))
                            .stream().map(TableIdentifier::name).collect(Collectors.toList()));
        } catch (Exception e) {
            throw new RuntimeException("Failed to list view names, error message is:" + e.getMessage(), e);
        }
    }

    private TableIdentifier getTableIdentifier(String dbName, String tblName) {
        return externalCatalogName
            .map(s -> TableIdentifier.of(s, dbName, tblName))
            .orElseGet(() -> TableIdentifier.of(dbName, tblName));
    }

    private Namespace getNamespace(String dbName) {
        return externalCatalogName
            .map(s -> Namespace.of(s, dbName))
            .orElseGet(() -> Namespace.of(dbName));
    }

    private Namespace getNamespace() {
        return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty());
    }

    public ThreadPoolExecutor getThreadPoolWithPreAuth() {
        return dorisCatalog.getThreadPoolExecutor();
    }

    private void performDropView(String remoteDbName, String remoteViewName) throws DdlException {
        if (!(catalog instanceof ViewCatalog)) {
            throw new DdlException("Drop Iceberg view is not supported with not view catalog.");
        }
        ViewCatalog viewCatalog = (ViewCatalog) catalog;
        viewCatalog.dropView(getTableIdentifier(remoteDbName, remoteViewName));
    }
}