ExternalMetaCacheMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Type;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.ClassLoaderUtils;
import org.apache.doris.datasource.doris.DorisExternalMetaCache;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveExternalMetaCache;
import org.apache.doris.datasource.hudi.HudiExternalMetaCache;
import org.apache.doris.datasource.iceberg.IcebergExternalMetaCache;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalMetaCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.metacache.AbstractExternalMetaCache;
import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.metacache.ExternalMetaCache;
import org.apache.doris.datasource.metacache.ExternalMetaCacheFactory;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.metacache.MetaCacheEntry;
import org.apache.doris.datasource.metacache.MetaCacheEntryDef;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonExternalMetaCache;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import javax.annotation.Nullable;

/**
 * Cache meta of external catalog
 * 1. Meta for hive meta store, mainly for partition.
 * 2. Table Schema cache.
 * 3. Row count cache.
 */
public class ExternalMetaCacheMgr {
    private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class);
    private static final String ENTRY_SCHEMA = "schema";
    private static final String ENGINE_DEFAULT = "default";
    private static final String ENGINE_HIVE = "hive";
    private static final String ENGINE_HUDI = "hudi";
    private static final String ENGINE_ICEBERG = "iceberg";
    private static final String ENGINE_PAIMON = "paimon";
    private static final String ENGINE_MAXCOMPUTE = "maxcompute";
    private static final String ENGINE_DORIS = "doris";

    /**
     * Executors for loading caches
     * 1. rowCountRefreshExecutor
     * For row count cache.
     * Row count cache is an async loading cache, and we can ignore the result
     * if cache missing or thread pool is full.
     * So use a separate executor for this cache.
     * <p>
     * 2.  commonRefreshExecutor
     * For other caches. Other caches are sync loading cache.
     * But commonRefreshExecutor will be used for async refresh.
     * That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously.
     * if cache entry need refresh, it will be reloaded in commonRefreshExecutor.
     * <p>
     * 3. fileListingExecutor
     * File listing is a heavy operation, so use a separate executor for it.
     * For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh.
     * And fileListingExecutor will be used to list file.
     */
    private ExecutorService rowCountRefreshExecutor;
    private ExecutorService commonRefreshExecutor;
    private ExecutorService fileListingExecutor;
    // This executor is used to schedule the getting split tasks
    private ExecutorService scheduleExecutor;
    private final ExternalMetaCacheFactory.Context factoryContext;

    private final Map<String, ExternalMetaCache> engineCaches = Maps.newConcurrentMap();
    // all catalogs could share the same fsCache.
    private FileSystemCache fsCache;
    // all external table row count cache.
    private ExternalRowCountCache rowCountCache;

    public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
        rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "RowCountRefreshExecutor", 0, true);

        commonRefreshExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 10000,
                "CommonRefreshExecutor", 10, true);

        // The queue size should be large enough,
        // because there may be thousands of partitions being queried at the same time.
        fileListingExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "FileListingExecutor", 10, true);

        scheduleExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "scheduleExecutor", 10, true);

        fsCache = new FileSystemCache();
        rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
        factoryContext = new FactoryContext(commonRefreshExecutor, fileListingExecutor);

        initEngineCaches();
    }

    private ExecutorService newThreadPool(boolean isCheckpointCatalog, int numThread, int queueSize,
            String poolName, int timeoutSeconds,
            boolean needRegisterMetric) {
        String executorNamePrefix = isCheckpointCatalog ? "Checkpoint" : "NotCheckpoint";
        String realPoolName = executorNamePrefix + poolName;
        // Business threads require a fixed size thread pool and use queues to store unprocessed tasks.
        // Checkpoint threads have almost no business and need to be released in a timely manner to avoid thread leakage
        if (isCheckpointCatalog) {
            return ThreadPoolManager.newDaemonCacheThreadPool(numThread, realPoolName, needRegisterMetric);
        } else {
            return ThreadPoolManager.newDaemonFixedThreadPool(numThread, queueSize, realPoolName, timeoutSeconds,
                    needRegisterMetric);
        }
    }

    public ExecutorService getFileListingExecutor() {
        return fileListingExecutor;
    }

    public ExecutorService getScheduleExecutor() {
        return scheduleExecutor;
    }

    public ExternalMetaCache engine(String engine) {
        Objects.requireNonNull(engine, "engine is null");
        String normalizedEngine = normalizeEngineName(engine);
        ExternalMetaCache found = engineCaches.get(normalizedEngine);
        if (found != null) {
            return found;
        }
        throw new IllegalArgumentException(
                String.format("unsupported external meta cache engine '%s'", normalizedEngine));
    }

    public void prepareCatalog(long catalogId) {
        Map<String, String> catalogProperties = getCatalogProperties(catalogId);
        routeCatalogEngines(catalogId, cache -> cache.initCatalog(catalogId, catalogProperties));
    }

    public void prepareCatalogByEngine(long catalogId, String engine) {
        prepareCatalogByEngine(catalogId, engine, getCatalogProperties(catalogId));
    }

    public void prepareCatalogByEngine(long catalogId, String engine, Map<String, String> catalogProperties) {
        Map<String, String> safeCatalogProperties = catalogProperties == null
                ? Maps.newHashMap()
                : Maps.newHashMap(catalogProperties);
        routeSpecifiedEngine(engine, cache -> cache.initCatalog(catalogId, safeCatalogProperties));
    }

    public void invalidateCatalog(long catalogId) {
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidateCatalog",
                () -> cache.invalidateCatalogEntries(catalogId)));
    }

    public void invalidateCatalogEntries(long catalogId) {
        invalidateCatalog(catalogId);
    }

    public void invalidateCatalogByEngine(long catalogId, String engine) {
        routeSpecifiedEngine(engine, cache -> safeInvalidate(
                cache, catalogId, "invalidateCatalogByEngine",
                () -> cache.invalidateCatalogEntries(catalogId)));
    }

    public void removeCatalog(long catalogId) {
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "removeCatalog",
                () -> cache.invalidateCatalog(catalogId)));
    }

    public void removeCatalogByEngine(long catalogId, String engine) {
        routeSpecifiedEngine(engine, cache -> safeInvalidate(
                cache, catalogId, "removeCatalogByEngine",
                () -> cache.invalidateCatalog(catalogId)));
    }

    public void invalidateDb(long catalogId, String dbName) {
        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidateDb", () -> cache.invalidateDb(catalogId, normalizedDbName)));
    }

    public void invalidateTable(long catalogId, String dbName, String tableName) {
        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidateTable",
                () -> cache.invalidateTable(catalogId, normalizedDbName, tableName)));
    }

    public void invalidateTableByEngine(long catalogId, String engine, String dbName, String tableName) {
        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
        routeSpecifiedEngine(engine, cache -> safeInvalidate(
                cache, catalogId, "invalidateTableByEngine",
                () -> cache.invalidateTable(catalogId, normalizedDbName, tableName)));
    }

    public void invalidatePartitions(long catalogId,
            String dbName, String tableName, List<String> partitions) {
        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidatePartitions",
                () -> cache.invalidatePartitions(catalogId, normalizedDbName, tableName, partitions)));
    }

    public Map<String, Map<String, String>> getCatalogCacheStats(long catalogId) {
        Map<String, Map<String, String>> stats = Maps.newHashMap();
        engineCaches.forEach((engineName, externalMetaCache) -> externalMetaCache.stats(catalogId)
                .forEach((entryName, entryStats) -> stats.put(engineName + "." + entryName, entryStats)));
        return stats;
    }

    private void initEngineCaches() {
        registerBuiltinEngineCaches();
        loadSpiEngineCaches();
    }

    private void registerBuiltinEngineCaches() {
        registerEngineCache(new DefaultExternalMetaCache(ENGINE_DEFAULT, commonRefreshExecutor), "builtin");
        registerEngineCache(new HiveExternalMetaCache(commonRefreshExecutor, fileListingExecutor), "builtin");
        registerEngineCache(new HudiExternalMetaCache(commonRefreshExecutor), "builtin");
        registerEngineCache(new IcebergExternalMetaCache(commonRefreshExecutor), "builtin");
        registerEngineCache(new PaimonExternalMetaCache(commonRefreshExecutor), "builtin");
        registerEngineCache(new MaxComputeExternalMetaCache(commonRefreshExecutor), "builtin");
        registerEngineCache(new DorisExternalMetaCache(commonRefreshExecutor), "builtin");
    }

    private void loadSpiEngineCaches() {
        loadSpiEngineCaches(ServiceLoader.load(ExternalMetaCacheFactory.class), "classpath");
        try {
            loadSpiEngineCaches(ClassLoaderUtils.loadServicesFromDirectory(ExternalMetaCacheFactory.class),
                    Config.external_meta_cache_plugins_dir);
        } catch (IOException e) {
            LOG.warn("failed to load external meta cache SPI factories from directory {}",
                    Config.external_meta_cache_plugins_dir, e);
        }
    }

    private void loadSpiEngineCaches(Iterable<ExternalMetaCacheFactory> factories, String source) {
        for (ExternalMetaCacheFactory factory : factories) {
            registerSpiFactory(factory, source);
        }
    }

    private void registerSpiFactory(ExternalMetaCacheFactory factory, String source) {
        String engineName = normalizeEngineName(factory.engine());
        ExternalMetaCache cache;
        try {
            cache = factory.create(factoryContext);
        } catch (Exception e) {
            LOG.warn("failed to initialize external meta cache SPI factory {} for engine {}",
                    factory.getClass().getName(), engineName, e);
            return;
        }
        if (cache == null) {
            LOG.warn("external meta cache SPI factory {} returns null cache for engine {}",
                    factory.getClass().getName(), engineName);
            return;
        }
        registerEngineCache(cache, "spi(" + source + ")");
    }

    private void registerEngineCache(ExternalMetaCache cache, String source) {
        String engineName = normalizeEngineName(cache.engine());
        ExternalMetaCache existing = engineCaches.putIfAbsent(engineName, cache);
        if (existing != null) {
            LOG.warn("skip duplicated external meta cache engine '{}' from {}, existing class: {}, new class: {}",
                    engineName, source, existing.getClass().getName(), cache.getClass().getName());
            return;
        }
        if ("builtin".equals(source)) {
            LOG.debug("registered external meta cache engine '{}' from {}", engineName, source);
        } else {
            LOG.info("registered external meta cache engine '{}' from {}", engineName, source);
        }
    }

    private static String normalizeEngineName(String engine) {
        if (engine == null) {
            return ENGINE_DEFAULT;
        }
        String normalized = engine.trim().toLowerCase(Locale.ROOT);
        if (normalized.isEmpty()) {
            return ENGINE_DEFAULT;
        }
        switch (normalized) {
            case "hms":
                return ENGINE_HIVE;
            case "external_doris":
                return ENGINE_DORIS;
            case "max_compute":
                return ENGINE_MAXCOMPUTE;
            default:
                return normalized;
        }
    }

    private void routeCatalogEngines(long catalogId, Consumer<ExternalMetaCache> action) {
        engineCaches.values().forEach(action);
    }

    private void routeSpecifiedEngine(String engine, Consumer<ExternalMetaCache> action) {
        action.accept(this.engine(engine));
    }

    private void safeInvalidate(ExternalMetaCache cache, long catalogId, String operation, Runnable action) {
        try {
            cache.checkCatalogInitialized(catalogId);
            action.run();
        } catch (IllegalStateException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("skip {} for catalog {} on engine '{}' because cache entry is absent",
                        operation, catalogId, cache.engine());
            }
        }
    }

    private static class FactoryContext implements ExternalMetaCacheFactory.Context {
        private final ExecutorService refreshExecutor;
        private final ExecutorService fileListingExecutor;

        private FactoryContext(ExecutorService refreshExecutor, ExecutorService fileListingExecutor) {
            this.refreshExecutor = refreshExecutor;
            this.fileListingExecutor = fileListingExecutor;
        }

        @Override
        public ExecutorService refreshExecutor() {
            return refreshExecutor;
        }

        @Override
        public ExecutorService fileListingExecutor() {
            return fileListingExecutor;
        }
    }

    private Map<String, String> getCatalogProperties(long catalogId) {
        CatalogIf<?> catalog = getCatalog(catalogId);
        if (catalog == null || catalog.getProperties() == null) {
            return Maps.newHashMap();
        }
        return Maps.newHashMap(catalog.getProperties());
    }

    @Nullable
    private CatalogIf<?> getCatalog(long catalogId) {
        if (Env.getCurrentEnv() == null || Env.getCurrentEnv().getCatalogMgr() == null) {
            return null;
        }
        return Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
    }

    public HiveExternalMetaCache getMetaStoreCache() {
        return getHiveExternalMetaCache();
    }

    @SuppressWarnings("unchecked")
    public Optional<SchemaCacheValue> getSchemaCacheValue(ExternalTable table, SchemaCacheKey key) {
        long catalogId = table.getCatalog().getId();
        Class<SchemaCacheKey> keyType = (Class<SchemaCacheKey>) key.getClass();
        String resolvedEngine = table.getMetaCacheEngine();
        return Optional.ofNullable(engine(resolvedEngine)
                .entry(catalogId, ENTRY_SCHEMA, keyType, SchemaCacheValue.class)
                .get(key));
    }

    public HudiExternalMetaCache getHudiExternalMetaCache() {
        return (HudiExternalMetaCache) engine(ENGINE_HUDI);
    }

    public HiveExternalMetaCache getHiveExternalMetaCache() {
        return (HiveExternalMetaCache) engine(ENGINE_HIVE);
    }

    public IcebergExternalMetaCache getIcebergExternalMetaCache() {
        return (IcebergExternalMetaCache) engine(ENGINE_ICEBERG);
    }

    public PaimonExternalMetaCache getPaimonExternalMetaCache() {
        return (PaimonExternalMetaCache) engine(ENGINE_PAIMON);
    }

    public MaxComputeExternalMetaCache getMaxComputeExternalMetaCache() {
        return (MaxComputeExternalMetaCache) engine(ENGINE_MAXCOMPUTE);
    }

    public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
        return getMaxComputeExternalMetaCache().getMaxComputeMetadataCache(catalogId);
    }

    public FileSystemCache getFsCache() {
        return fsCache;
    }

    public ExternalRowCountCache getRowCountCache() {
        return rowCountCache;
    }

    public DorisExternalMetaCache getDorisExternalMetaCache() {
        return (DorisExternalMetaCache) engine(ENGINE_DORIS);
    }

    public void invalidateTableCache(ExternalTable dorisTable) {
        invalidateTable(dorisTable.getCatalog().getId(),
                ClusterNamespace.getNameFromFullName(dorisTable.getDbName()),
                dorisTable.getName());
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalid table cache for {}.{} in catalog {}", dorisTable.getRemoteDbName(),
                    dorisTable.getRemoteName(), dorisTable.getCatalog().getName());
        }
    }

    public void invalidateDbCache(long catalogId, String dbName) {
        invalidateDb(catalogId, dbName);
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
        }
    }

    public void invalidateCatalogCache(long catalogId) {
        // Refresh flows should clear cached data but keep per-engine catalog entry groups.
        invalidateCatalog(catalogId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalid catalog cache for {}", catalogId);
        }
    }

    public void addPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
        String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
        HiveExternalMetaCache metaCache = getHiveExternalMetaCache();
        List<Type> partitionColumnTypes;
        try {
            partitionColumnTypes = table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table));
        } catch (NotSupportedException e) {
            LOG.warn("Ignore not supported hms table, message: {} ", e.getMessage());
            return;
        }
        metaCache.addPartitionsCache(table.getOrBuildNameMapping(), partitionNames, partitionColumnTypes);
        if (LOG.isDebugEnabled()) {
            LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
        }
    }

    public void dropPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
        String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
        HiveExternalMetaCache metaCache = getHiveExternalMetaCache();
        metaCache.dropPartitionsCache(table, partitionNames, true);
        if (LOG.isDebugEnabled()) {
            LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
        }
    }

    public void invalidatePartitionsCache(ExternalTable dorisTable, List<String> partitionNames) {
        HiveExternalMetaCache metaCache = getHiveExternalMetaCache();
        for (String partitionName : partitionNames) {
            metaCache.invalidatePartitionCache(dorisTable, partitionName);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalidate partition cache for {}.{} in catalog {}",
                    dorisTable.getDbName(), dorisTable.getName(), dorisTable.getCatalog().getName());
        }
    }

    public <T> MetaCache<T> buildMetaCache(String name,
            OptionalLong expireAfterAccessSec, OptionalLong refreshAfterWriteSec, long maxSize,
            CacheLoader<String, List<Pair<String, String>>> namesCacheLoader,
            CacheLoader<String, Optional<T>> metaObjCacheLoader,
            RemovalListener<String, Optional<T>> removalListener) {
        MetaCache<T> metaCache = new MetaCache<>(
                name, commonRefreshExecutor, expireAfterAccessSec, refreshAfterWriteSec,
                maxSize, namesCacheLoader, metaObjCacheLoader, removalListener);
        return metaCache;
    }

    public static Map<String, String> getCacheStats(CacheStats cacheStats, long estimatedSize) {
        Map<String, String> stats = Maps.newHashMap();
        stats.put("hit_ratio", String.valueOf(cacheStats.hitRate()));
        stats.put("hit_count", String.valueOf(cacheStats.hitCount()));
        stats.put("read_count", String.valueOf(cacheStats.hitCount() + cacheStats.missCount()));
        stats.put("eviction_count", String.valueOf(cacheStats.evictionCount()));
        stats.put("average_load_penalty", String.valueOf(cacheStats.averageLoadPenalty()));
        stats.put("estimated_size", String.valueOf(estimatedSize));
        return stats;
    }

    /**
     * Fallback implementation of {@link AbstractExternalMetaCache} for engines that do not
     * provide dedicated cache entries.
     *
     * <p>Registered entries:
     * <ul>
     *   <li>{@code schema}: schema-only cache keyed by {@link SchemaCacheKey}</li>
     * </ul>
     *
     * <p>This class keeps compatibility for generic external engines and routes only schema
     * loading/invalidation. No engine-specific metadata (partitions/files/snapshots) is cached.
     */
    private static class DefaultExternalMetaCache extends AbstractExternalMetaCache {
        private static final CacheSpec SCHEMA_CACHE_SPEC = CacheSpec.fromTtlValue(
                null, Config.external_cache_expire_time_seconds_after_access, Config.max_external_schema_cache_num);

        private final MetaCacheEntryDef<SchemaCacheKey, SchemaCacheValue> schemaEntryDef;

        DefaultExternalMetaCache(String engine, ExecutorService refreshExecutor) {
            super(engine, refreshExecutor);
            schemaEntryDef = MetaCacheEntryDef.of(
                    ENTRY_SCHEMA,
                    SchemaCacheKey.class,
                    SchemaCacheValue.class,
                    this::loadSchemaCacheValue,
                    SCHEMA_CACHE_SPEC);
            registerMetaCacheEntryDef(schemaEntryDef);
        }

        @Override
        protected Map<String, String> catalogPropertyCompatibilityMap() {
            return Collections.singletonMap(
                    ExternalCatalog.SCHEMA_CACHE_TTL_SECOND,
                    "meta.cache." + engine() + "." + ENTRY_SCHEMA + ".ttl-second");
        }

        @Override
        public void invalidateDb(long catalogId, String dbName) {
            schemaEntry(catalogId).invalidateIf(key -> matchDb(key, catalogId, dbName));
        }

        @Override
        public void invalidateTable(long catalogId, String dbName, String tableName) {
            schemaEntry(catalogId).invalidateIf(key -> matchTable(key, catalogId, dbName, tableName));
        }

        @Override
        public void invalidatePartitions(long catalogId, String dbName, String tableName, List<String> partitions) {
            invalidateTable(catalogId, dbName, tableName);
        }

        private MetaCacheEntry<SchemaCacheKey, SchemaCacheValue> schemaEntry(long catalogId) {
            return entry(catalogId, schemaEntryDef);
        }

        private boolean matchDb(SchemaCacheKey key, long catalogId, String dbName) {
            return key.getNameMapping().getCtlId() == catalogId
                    && (key.getNameMapping().getLocalDbName().equals(dbName)
                    || ClusterNamespace.getNameFromFullName(key.getNameMapping().getLocalDbName()).equals(dbName));
        }

        private boolean matchTable(SchemaCacheKey key, long catalogId, String dbName, String tableName) {
            return matchDb(key, catalogId, dbName)
                    && key.getNameMapping().getLocalTblName().equals(tableName);
        }

        private SchemaCacheValue loadSchemaCacheValue(SchemaCacheKey key) {
            CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(key.getNameMapping().getCtlId());
            if (!(catalog instanceof ExternalCatalog)) {
                throw new CacheException("catalog %s is not external when loading schema cache",
                        null, key.getNameMapping().getCtlId());
            }
            ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
            return externalCatalog.getSchema(key).orElseThrow(() -> new CacheException(
                    "failed to load schema cache value for: %s.%s.%s",
                    null, key.getNameMapping().getCtlId(),
                    key.getNameMapping().getLocalDbName(),
                    key.getNameMapping().getLocalTblName()));
        }
    }
}