CatalogMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.analysis.AlterCatalogCommentStmt;
import org.apache.doris.analysis.AlterCatalogNameStmt;
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.DropCatalogStmt;
import org.apache.doris.analysis.ShowCatalogStmt;
import org.apache.doris.analysis.ShowCreateCatalogStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * CatalogMgr will load all catalogs at FE startup,
 * and save them in map with name.
 * Note: Catalog in sql syntax will be treated as catalog interface in code level.
 * TODO: Change the package name into catalog.
 */
public class CatalogMgr implements Writable, GsonPostProcessable {
    private static final Logger LOG = LogManager.getLogger(CatalogMgr.class);

    public static final String ACCESS_CONTROLLER_CLASS_PROP = "access_controller.class";
    public static final String ACCESS_CONTROLLER_PROPERTY_PREFIX_PROP = "access_controller.properties.";
    public static final String METADATA_REFRESH_INTERVAL_SEC = "metadata_refresh_interval_sec";
    public static final String CATALOG_TYPE_PROP = "type";

    private final MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(true);

    @SerializedName(value = "idToCatalog")
    private Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> idToCatalog = Maps.newConcurrentMap();
    // this map will be regenerated from idToCatalog, so not need to persist.
    private Map<String, CatalogIf> nameToCatalog = Maps.newConcurrentMap();

    // Use a separate instance to facilitate access.
    // internalDataSource still exists in idToCatalog and nameToCatalog
    private InternalCatalog internalCatalog;

    public CatalogMgr() {
        initInternalCatalog();
    }

    public static CatalogMgr read(DataInput in) throws IOException {
        String json = Text.readString(in);
        if (LOG.isDebugEnabled()) {
            LOG.debug("debug: read json: {}", json);
        }
        return GsonUtils.GSON.fromJson(json, CatalogMgr.class);
    }

    private void initInternalCatalog() {
        internalCatalog = EnvFactory.getInstance().createInternalCatalog();
        addCatalog(internalCatalog);
    }

    private void addCatalog(CatalogIf catalog) {
        nameToCatalog.put(catalog.getName(), catalog);
        idToCatalog.put(catalog.getId(), catalog);
        String catalogName = catalog.getName();
        if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
            ((ExternalCatalog) catalog).resetToUninitialized(false);
        }
        if (!Strings.isNullOrEmpty(catalog.getResource())) {
            Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
            if (resource != null) {
                resource.addReference(catalog.getName(), ReferenceType.CATALOG);
            }
        }
    }

    private CatalogIf removeCatalog(long catalogId) {
        CatalogIf catalog = idToCatalog.remove(catalogId);
        LOG.info("Removed catalog with id {}, name {}", catalogId, catalog == null ? "N/A" : catalog.getName());
        if (catalog != null) {
            catalog.onClose();
            nameToCatalog.remove(catalog.getName());
            if (ConnectContext.get() != null) {
                ConnectContext.get().removeLastDBOfCatalog(catalog.getName());
            }
            Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
            if (!Strings.isNullOrEmpty(catalog.getResource())) {
                Resource catalogResource = Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
                if (catalogResource != null) {
                    catalogResource.removeReference(catalog.getName(), ReferenceType.CATALOG);
                }
            }
            Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
        }
        return catalog;
    }

    public InternalCatalog getInternalCatalog() {
        return internalCatalog;
    }

    public CatalogIf getCatalog(String name) {
        return nameToCatalog.get(name);
    }

    public CatalogIf<? extends DatabaseIf<? extends TableIf>> getCatalog(long id) {
        return idToCatalog.get(id);
    }

    public CatalogIf getCatalogOrAnalysisException(long id) throws AnalysisException {
        return getCatalogOrException(id,
                catalog -> new AnalysisException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalog),
                        ErrorCode.ERR_UNKNOWN_CATALOG));
    }

    public <E extends Exception> CatalogIf<? extends DatabaseIf<? extends TableIf>>
            getCatalogOrException(long id, Function<Long, E> e) throws E {
        CatalogIf catalog = idToCatalog.get(id);
        if (catalog == null) {
            throw e.apply(id);
        }
        return catalog;
    }

    public <E extends Exception> CatalogIf getCatalogOrException(String name, Function<String, E> e) throws E {
        CatalogIf catalog = nameToCatalog.get(name);
        if (catalog == null) {
            throw e.apply(name);
        }
        return catalog;
    }

    public CatalogIf getCatalogOrAnalysisException(String name) throws AnalysisException {
        return getCatalogOrException(name,
                catalog -> new AnalysisException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalog),
                        ErrorCode.ERR_UNKNOWN_CATALOG));
    }

    public List<Long> getCatalogIds() {
        return Lists.newArrayList(idToCatalog.keySet());
    }

    /**
     * Get db allow return null
     **/
    public DatabaseIf getDbNullable(long dbId) {
        DatabaseIf db = internalCatalog.getDbNullable(dbId);
        if (db != null) {
            return db;
        }
        for (CatalogIf catalog : nameToCatalog.values()) {
            if (catalog == internalCatalog) {
                continue;
            }
            db = catalog.getDbNullable(dbId);
            if (db != null) {
                return db;
            }
        }
        return null;
    }

    private void writeLock() {
        lock.writeLock().lock();
    }

    private void writeUnlock() {
        lock.writeLock().unlock();
    }

    private void readLock() {
        lock.readLock().lock();
    }

    private void readUnlock() {
        lock.readLock().unlock();
    }

    private void createCatalogImpl(CatalogIf catalog, String catalogName,
            boolean ifNotExists) throws UserException {
        writeLock();
        try {
            if (nameToCatalog.containsKey(catalog.getName())) {
                if (ifNotExists) {
                    LOG.warn("Catalog {} is already exist.", catalogName);
                    return;
                }
                throw new DdlException("Catalog had already exist with name: " + catalogName);
            }
            createCatalogInternal(catalog, false);
            Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_CREATE_CATALOG, catalog.constructEditLog());
        } finally {
            writeUnlock();
        }
    }

    /**
     * Create and hold the catalog instance and write the meta log.
     */
    public void createCatalog(CreateCatalogCommand cmd) throws UserException {
        long id = Env.getCurrentEnv().getNextId();
        CatalogIf catalog = CatalogFactory.createFromCommand(id, cmd);
        createCatalogImpl(catalog, cmd.getCatalogName(), cmd.isSetIfNotExists());
    }

    /**
     * Create and hold the catalog instance and write the meta log.
     */
    public void createCatalog(CreateCatalogStmt stmt) throws UserException {
        long id = Env.getCurrentEnv().getNextId();
        CatalogIf catalog = CatalogFactory.createFromStmt(id, stmt);
        createCatalogImpl(catalog, stmt.getCatalogName(), stmt.isSetIfNotExists());
    }

    /**
     * Remove the catalog instance by name and write the meta log.
     */
    public void dropCatalog(String catalogName, boolean ifExists) throws UserException {
        writeLock();
        try {
            if (ifExists && !nameToCatalog.containsKey(catalogName)) {
                LOG.warn("Non catalog {} is found.", catalogName);
                return;
            }
            CatalogIf<DatabaseIf<TableIf>> catalog = nameToCatalog.get(catalogName);
            if (catalog == null) {
                throw new DdlException("No catalog found with name: " + catalogName);
            }
            CatalogLog log = new CatalogLog();
            log.setCatalogId(catalog.getId());
            replayDropCatalog(log);
            Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_DROP_CATALOG, log);

            if (ConnectContext.get() != null) {
                ConnectContext.get().removeLastDBOfCatalog(catalogName);
            }
            Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
        } finally {
            writeUnlock();
        }
    }

    /**
     * Remove the catalog instance by name and write the meta log.
     */
    public void dropCatalog(DropCatalogStmt stmt) throws UserException {
        dropCatalog(stmt.getCatalogName(), stmt.isSetIfExists());
    }

    /**
     * Modify the catalog name into a new one and write the meta log.
     */
    public void alterCatalogName(String catalogName, String newCatalogName) throws UserException {
        writeLock();
        try {
            CatalogIf catalog = nameToCatalog.get(catalogName);
            if (catalog == null) {
                throw new DdlException("No catalog found with name: " + catalogName);
            }
            if (nameToCatalog.get(newCatalogName) != null) {
                throw new DdlException("Catalog with name " + newCatalogName + " already exist");
            }
            CatalogLog log = new CatalogLog();
            log.setCatalogId(catalog.getId());
            log.setNewCatalogName(newCatalogName);
            replayAlterCatalogName(log);
            Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_ALTER_CATALOG_NAME, log);

            ConnectContext ctx = ConnectContext.get();
            if (ctx != null) {
                String db = ctx.getLastDBOfCatalog(catalogName);
                if (db != null) {
                    ctx.removeLastDBOfCatalog(catalogName);
                    ctx.addLastDBOfCatalog(log.getNewCatalogName(), db);
                }
            }
        } finally {
            writeUnlock();
        }
    }

    /**
     * Modify the catalog name into a new one and write the meta log.
     */
    public void alterCatalogName(AlterCatalogNameStmt stmt) throws UserException {
        alterCatalogName(stmt.getCatalogName(), stmt.getNewCatalogName());
    }

    /**
     * Modify the catalog comment to a new one and write the meta log.
     */
    public void alterCatalogComment(String catalogName, String comment) throws UserException {
        writeLock();
        try {
            CatalogIf catalog = nameToCatalog.get(catalogName);
            if (catalog == null) {
                throw new DdlException("No catalog found with name: " + catalogName);
            }
            CatalogLog log = new CatalogLog();
            log.setCatalogId(catalog.getId());
            log.setComment(comment);
            replayAlterCatalogComment(log);
            Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_ALTER_CATALOG_COMMENT, log);
        } finally {
            writeUnlock();
        }
    }

    /**
     * Modify the catalog comment to a new one and write the meta log.
     */
    public void alterCatalogComment(AlterCatalogCommentStmt stmt) throws UserException {
        alterCatalogComment(stmt.getCatalogName(), stmt.getComment());
    }

    /**
     * Modify the catalog property and write the meta log.
     */
    public void alterCatalogProps(String catalogName, Map<String, String> newProperties) throws UserException {
        writeLock();
        try {
            CatalogIf catalog = nameToCatalog.get(catalogName);
            if (catalog == null) {
                throw new DdlException("No catalog found with name: " + catalogName);
            }
            Map<String, String> oldProperties = catalog.getProperties();
            if (newProperties.containsKey("type") && !catalog.getType()
                    .equalsIgnoreCase(newProperties.get("type"))) {
                throw new DdlException("Can't modify the type of catalog property with name: " + catalogName);
            }
            CatalogLog log = new CatalogLog();
            log.setCatalogId(catalog.getId());
            log.setNewProps(newProperties);
            replayAlterCatalogProps(log, oldProperties, false);
            Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_ALTER_CATALOG_PROPS, log);
        } finally {
            writeUnlock();
        }
    }

    /**
     * Modify the catalog property and write the meta log.
     */
    public void alterCatalogProps(AlterCatalogPropertyStmt stmt) throws UserException {
        alterCatalogProps(stmt.getCatalogName(), stmt.getNewProperties());
    }

    /**
     * List all catalog or get the special catalog with a name.
     */
    public ShowResultSet showCatalogs(ShowCatalogStmt showStmt) throws AnalysisException {
        return showCatalogs(showStmt, InternalCatalog.INTERNAL_CATALOG_NAME);
    }

    public ShowResultSet showCatalogs(ShowCatalogStmt showStmt, String currentCtlg) throws AnalysisException {
        List<List<String>> rows = showCatalogs(showStmt.getCatalogName(), showStmt.getPattern(), currentCtlg);

        return new ShowResultSet(showStmt.getMetaData(), rows);
    }

    public List<List<String>> showCatalogs(
            String catalogName, String pattern, String currentCatalogName) throws AnalysisException {
        List<List<String>> rows = Lists.newArrayList();
        readLock();
        try {
            if (catalogName == null) {
                PatternMatcher matcher = null;
                if (pattern != null) {
                    matcher = PatternMatcherWrapper.createMysqlPattern(pattern,
                        CaseSensibility.CATALOG.getCaseSensibility());
                }
                for (CatalogIf catalog : listCatalogsWithCheckPriv(ConnectContext.get().getCurrentUserIdentity())) {
                    String name = catalog.getName();
                    // Filter catalog name
                    if (matcher != null && !matcher.match(name)) {
                        continue;
                    }
                    List<String> row = Lists.newArrayList();
                    row.add(String.valueOf(catalog.getId()));
                    row.add(name);
                    row.add(catalog.getType());
                    if (name.equals(currentCatalogName)) {
                        row.add("Yes");
                    } else {
                        row.add("No");
                    }
                    Map<String, String> props = catalog.getProperties();
                    String createTime = props.getOrDefault(ExternalCatalog.CREATE_TIME, FeConstants.null_string);
                    row.add(createTime);
                    row.add(TimeUtils.longToTimeString(catalog.getLastUpdateTime()));
                    row.add(catalog.getComment());
                    rows.add(row);

                    // sort by catalog name
                    rows.sort((x, y) -> {
                        return x.get(1).compareTo(y.get(1));
                    });
                }
            } else {
                if (!nameToCatalog.containsKey(catalogName)) {
                    throw new AnalysisException("No catalog found with name: " + catalogName);
                }
                CatalogIf<DatabaseIf> catalog = nameToCatalog.get(catalogName);
                if (!Env.getCurrentEnv().getAccessManager()
                        .checkCtlPriv(ConnectContext.get(), catalog.getName(), PrivPredicate.SHOW)) {
                    ErrorReport.reportAnalysisException(
                            ErrorCode.ERR_CATALOG_ACCESS_DENIED,
                            ConnectContext.get().getQualifiedUser(),
                            catalog.getName());
                }
                if (!Strings.isNullOrEmpty(catalog.getResource())) {
                    rows.add(Arrays.asList("resource", catalog.getResource()));
                }
                Map<String, String> sortedMap = getCatalogPropertiesWithPrintable(catalog);
                sortedMap.forEach((k, v) -> rows.add(Arrays.asList(k, v)));
            }
        } finally {
            readUnlock();
        }

        return rows;
    }

    public static Map<String, String> getCatalogPropertiesWithPrintable(CatalogIf<?> catalog) {
        // use tree map to maintain display order, making it easier to view properties
        Map<String, String> sortedMap = new TreeMap<>();
        catalog.getProperties().forEach(
                (key, value) -> {
                    if (PrintableMap.HIDDEN_KEY.contains(key)) {
                        return;
                    }
                    if (PrintableMap.SENSITIVE_KEY.contains(key)) {
                        sortedMap.put(key, PrintableMap.PASSWORD_MASK);
                    } else {
                        sortedMap.put(key, value);
                    }
                }
        );
        return sortedMap;
    }

    public List<List<String>> showCreateCatalog(String catalogName) throws AnalysisException {
        List<List<String>> rows = Lists.newArrayList();
        readLock();
        try {
            CatalogIf catalog = nameToCatalog.get(catalogName);
            if (catalog == null) {
                throw new AnalysisException("No catalog found with name " + catalogName);
            }
            StringBuilder sb = new StringBuilder();
            sb.append("\nCREATE CATALOG `").append(ClusterNamespace.getNameFromFullName(catalogName))
                    .append("`");
            if (!Strings.isNullOrEmpty(catalog.getComment())) {
                sb.append("\nCOMMENT \"").append(catalog.getComment()).append("\"\n");
            }
            if (catalog.getProperties().size() > 0) {
                sb.append(" PROPERTIES (\n");
                PrintableMap<String, String> printableMap = new PrintableMap<>(catalog.getProperties(), "=", true, true,
                        true, true);
                printableMap.setAdditionalHiddenKeys(ExternalCatalog.HIDDEN_PROPERTIES);
                sb.append(printableMap);
                sb.append("\n);");
            }

            rows.add(Lists.newArrayList(ClusterNamespace.getNameFromFullName(catalogName), sb.toString()));
        } finally {
            readUnlock();
        }

        return rows;
    }

    public ShowResultSet showCreateCatalog(ShowCreateCatalogStmt showStmt) throws AnalysisException {
        List<List<String>> rows = showCreateCatalog(showStmt.getCatalog());

        return new ShowResultSet(showStmt.getMetaData(), rows);
    }

    /**
     * Reply for create catalog event.
     */
    public void replayCreateCatalog(CatalogLog log) throws DdlException {
        CatalogIf catalog = CatalogFactory.createFromLog(log);
        createCatalogInternal(catalog, true);
    }

    private void createCatalogInternal(CatalogIf catalog, boolean isReplay) throws DdlException {
        writeLock();
        try {
            if (!isReplay && catalog instanceof ExternalCatalog) {
                ((ExternalCatalog) catalog).checkProperties();
            }
            Map<String, String> props = catalog.getProperties();
            if (props.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
                // need refresh
                long catalogId = catalog.getId();
                Integer metadataRefreshIntervalSec = Integer.valueOf(props.get(METADATA_REFRESH_INTERVAL_SEC));
                Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec};
                Env.getCurrentEnv().getRefreshManager().addToRefreshMap(catalogId, sec);
            }
            addCatalog(catalog);
        } finally {
            writeUnlock();
        }
    }

    /**
     * Reply for drop catalog event.
     */
    public void replayDropCatalog(CatalogLog log) {
        writeLock();
        try {
            removeCatalog(log.getCatalogId());
        } finally {
            writeUnlock();
        }
    }

    /**
     * Reply for alter catalog name event.
     */
    public void replayAlterCatalogName(CatalogLog log) {
        writeLock();
        try {
            CatalogIf catalog = removeCatalog(log.getCatalogId());
            catalog.modifyCatalogName(log.getNewCatalogName());
            addCatalog(catalog);
        } finally {
            writeUnlock();
        }
    }

    /**
     * Reply for alter catalog comment event.
     */
    public void replayAlterCatalogComment(CatalogLog log) {
        writeLock();
        try {
            CatalogIf catalog = idToCatalog.get(log.getCatalogId());
            if (catalog != null) {
                catalog.setComment(log.getComment());
            }
        } finally {
            writeUnlock();
        }
    }

    public List<CatalogIf> listCatalogs() {
        return nameToCatalog.values().stream().collect(Collectors.toList());
    }

    public List<CatalogIf> listCatalogsWithCheckPriv(UserIdentity userIdentity) {
        return nameToCatalog.values().stream().filter(
                catalog -> Env.getCurrentEnv().getAccessManager()
                        .checkCtlPriv(userIdentity, catalog.getName(), PrivPredicate.SHOW)
        ).collect(Collectors.toList());
    }


    /**
     * Reply for alter catalog props event.
     */
    public void replayAlterCatalogProps(CatalogLog log, Map<String, String> oldProperties, boolean isReplay)
            throws DdlException {
        writeLock();
        try {
            CatalogIf catalog = idToCatalog.get(log.getCatalogId());
            if (catalog instanceof ExternalCatalog) {
                Map<String, String> newProps = log.getNewProps();
                ((ExternalCatalog) catalog).tryModifyCatalogProps(newProps);
                if (!isReplay) {
                    try {
                        ((ExternalCatalog) catalog).checkProperties();
                    } catch (DdlException ddlException) {
                        if (oldProperties != null) {
                            ((ExternalCatalog) catalog).rollBackCatalogProps(oldProperties);
                        }
                        throw ddlException;
                    }
                }
                if (newProps.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
                    long catalogId = catalog.getId();
                    Integer metadataRefreshIntervalSec = Integer.valueOf(newProps.get(METADATA_REFRESH_INTERVAL_SEC));
                    Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec};
                    Env.getCurrentEnv().getRefreshManager().addToRefreshMap(catalogId, sec);
                }
            }
            catalog.modifyCatalogProps(log.getNewProps());
        } finally {
            writeUnlock();
        }
    }

    // init catalog and init db can happen at any time,
    // even after catalog or db is dropped.
    // Because it may already hold the catalog or db object before they are being dropped.
    // So just skip the edit log if object does not exist.
    public void replayInitCatalog(InitCatalogLog log) {
        ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
        if (catalog == null) {
            return;
        }
        catalog.replayInitCatalog(log);
    }

    public void replayInitExternalDb(InitDatabaseLog log) {
        ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
        if (catalog == null) {
            return;
        }
        Optional<ExternalDatabase<? extends ExternalTable>> db = catalog.getDbForReplay(log.getDbId());
        if (!db.isPresent()) {
            return;
        }
        db.get().replayInitDb(log, catalog);
    }

    public void unregisterExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists)
            throws DdlException {
        CatalogIf<?> catalog = nameToCatalog.get(catalogName);
        if (catalog == null) {
            throw new DdlException("No catalog found with name: " + catalogName);
        }
        if (!(catalog instanceof ExternalCatalog)) {
            throw new DdlException("Only support drop ExternalCatalog Tables");
        }
        ExternalDatabase<?> db = ((ExternalCatalog) catalog).getDbNullable(dbName);
        if (db == null) {
            if (!ignoreIfExists) {
                throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
            }
            return;
        }

        TableIf table = db.getTableNullable(tableName);
        if (table == null) {
            if (!ignoreIfExists) {
                throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
            }
            return;
        }

        db.writeLock();
        try {
            db.unregisterTable(table.getName());
        } finally {
            db.writeUnlock();
        }
    }

    public boolean externalTableExistInLocal(String dbName, String tableName, String catalogName) throws DdlException {
        CatalogIf catalog = nameToCatalog.get(catalogName);
        if (catalog == null) {
            throw new DdlException("No catalog found with name: " + catalogName);
        }
        if (!(catalog instanceof ExternalCatalog)) {
            throw new DdlException("Only support ExternalCatalog Tables");
        }
        return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName);
    }

    public void registerExternalTableFromEvent(String dbName, String tableName,
            String catalogName, long updateTime,
            boolean ignoreIfExists) throws DdlException {
        CatalogIf catalog = nameToCatalog.get(catalogName);
        if (catalog == null) {
            throw new DdlException("No catalog found with name: " + catalogName);
        }
        if (!(catalog instanceof ExternalCatalog)) {
            throw new DdlException("Only support create ExternalCatalog Tables");
        }
        DatabaseIf db = catalog.getDbNullable(dbName);
        if (db == null) {
            if (!ignoreIfExists) {
                throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
            }
            return;
        }

        long tblId;
        HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
        if (hmsCatalog.getUseMetaCache().get()) {
            tblId = Util.genIdByName(catalogName, dbName, tableName);
        } else {
            tblId = Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, tableName);
        }
        // -1L means it will be dropped later, ignore
        if (tblId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
            return;
        }

        db.writeLock();
        try {
            HMSExternalTable namedTable = ((HMSExternalDatabase) db)
                    .buildTableForInit(tableName, tableName, tblId, hmsCatalog, (HMSExternalDatabase) db, false);
            namedTable.setUpdateTime(updateTime);
            db.registerTable(namedTable);
        } finally {
            db.writeUnlock();
        }
    }

    public void unregisterExternalDatabase(String dbName, String catalogName)
            throws DdlException {
        CatalogIf catalog = nameToCatalog.get(catalogName);
        if (catalog == null) {
            throw new DdlException("No catalog found with name: " + catalogName);
        }
        if (!(catalog instanceof ExternalCatalog)) {
            throw new DdlException("Only support drop ExternalCatalog databases");
        }
        ((HMSExternalCatalog) catalog).unregisterDatabase(dbName);
    }

    public void registerExternalDatabaseFromEvent(String dbName, String catalogName)
            throws DdlException {
        CatalogIf catalog = nameToCatalog.get(catalogName);
        if (catalog == null) {
            throw new DdlException("No catalog found with name: " + catalogName);
        }
        if (!(catalog instanceof ExternalCatalog)) {
            throw new DdlException("Only support create ExternalCatalog databases");
        }

        HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
        long dbId;
        if (hmsCatalog.getUseMetaCache().get()) {
            dbId = Util.genIdByName(catalogName, dbName);
        } else {
            dbId = Env.getCurrentEnv().getExternalMetaIdMgr().getDbId(catalog.getId(), dbName);
        }
        // -1L means it will be dropped later, ignore
        if (dbId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
            return;
        }

        hmsCatalog.registerDatabase(dbId, dbName);
    }

    public void addExternalPartitions(String catalogName, String dbName, String tableName,
            List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
            throws DdlException {
        CatalogIf catalog = nameToCatalog.get(catalogName);
        if (catalog == null) {
            throw new DdlException("No catalog found with name: " + catalogName);
        }
        if (!(catalog instanceof ExternalCatalog)) {
            throw new DdlException("Only support ExternalCatalog");
        }
        DatabaseIf db = catalog.getDbNullable(dbName);
        if (db == null) {
            if (!ignoreIfNotExists) {
                throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
            }
            return;
        }

        TableIf table = db.getTableNullable(tableName);
        if (table == null) {
            if (!ignoreIfNotExists) {
                throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
            }
            return;
        }
        if (!(table instanceof HMSExternalTable)) {
            LOG.warn("only support HMSTable");
            return;
        }

        HMSExternalTable hmsTable = (HMSExternalTable) table;
        Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
        hmsTable.setEventUpdateTime(updateTime);
    }

    public void dropExternalPartitions(String catalogName, String dbName, String tableName,
            List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
            throws DdlException {
        CatalogIf catalog = nameToCatalog.get(catalogName);
        if (catalog == null) {
            throw new DdlException("No catalog found with name: " + catalogName);
        }
        if (!(catalog instanceof ExternalCatalog)) {
            throw new DdlException("Only support ExternalCatalog");
        }
        DatabaseIf db = catalog.getDbNullable(dbName);
        if (db == null) {
            if (!ignoreIfNotExists) {
                throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
            }
            return;
        }

        TableIf table = db.getTableNullable(tableName);
        if (table == null) {
            if (!ignoreIfNotExists) {
                throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
            }
            return;
        }

        HMSExternalTable hmsTable = (HMSExternalTable) table;
        Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), hmsTable, partitionNames);
        hmsTable.setEventUpdateTime(updateTime);
    }

    public void registerCatalogRefreshListener(Env env) {
        for (CatalogIf catalog : idToCatalog.values()) {
            Map<String, String> properties = catalog.getProperties();
            if (properties.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
                Integer metadataRefreshIntervalSec = Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC));
                Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec};
                env.getRefreshManager().addToRefreshMap(catalog.getId(), sec);
            }
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this);
        Text.writeString(out, json);
    }

    @Override
    public void gsonPostProcess() throws IOException {
        // After deserializing from Gson, the concurrent map may become a normal map.
        // So here we reconstruct the concurrent map.
        Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> newIdToCatalog = Maps.newConcurrentMap();
        Map<String, CatalogIf> newNameToCatalog = Maps.newConcurrentMap();
        for (CatalogIf catalog : idToCatalog.values()) {
            newNameToCatalog.put(catalog.getName(), catalog);
            newIdToCatalog.put(catalog.getId(), catalog);
            // ATTN: can not call catalog.getProperties() here, because ResourceMgr is not replayed yet.
        }
        this.idToCatalog = newIdToCatalog;
        this.nameToCatalog = newNameToCatalog;
        internalCatalog = (InternalCatalog) idToCatalog.get(InternalCatalog.INTERNAL_CATALOG_ID);
    }

    public Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> getIdToCatalog() {
        return idToCatalog;
    }

    public Set<CatalogIf> getCopyOfCatalog() {
        return new HashSet<>(idToCatalog.values());
    }

    public int getCatalogNum() {
        return idToCatalog.size();
    }
}