ExternalMetaCacheMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.datasource.doris.DorisExternalMetaCache;
import org.apache.doris.datasource.hive.HiveExternalMetaCache;
import org.apache.doris.datasource.hudi.HudiExternalMetaCache;
import org.apache.doris.datasource.iceberg.IcebergExternalMetaCache;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalMetaCache;
import org.apache.doris.datasource.metacache.AbstractExternalMetaCache;
import org.apache.doris.datasource.metacache.ExternalMetaCache;
import org.apache.doris.datasource.metacache.ExternalMetaCacheRegistry;
import org.apache.doris.datasource.metacache.ExternalMetaCacheRouteResolver;
import org.apache.doris.datasource.metacache.LegacyMetaCacheFactory;
import org.apache.doris.datasource.metacache.MetaCacheEntryDef;
import org.apache.doris.datasource.metacache.MetaCacheEntryInvalidation;
import org.apache.doris.datasource.metacache.MetaCacheEntryStats;
import org.apache.doris.datasource.paimon.PaimonExternalMetaCache;
import org.apache.doris.fs.FileSystemCache;

import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import javax.annotation.Nullable;

/**
 * Cache meta of external catalog
 * 1. Meta for hive meta store, mainly for partition.
 * 2. Table Schema cache.
 * 3. Row count cache.
 */
public class ExternalMetaCacheMgr {
    private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class);
    private static final String ENTRY_SCHEMA = "schema";
    private static final String ENGINE_DEFAULT = "default";
    private static final String ENGINE_HIVE = "hive";
    private static final String ENGINE_HUDI = "hudi";
    private static final String ENGINE_ICEBERG = "iceberg";
    private static final String ENGINE_PAIMON = "paimon";
    private static final String ENGINE_MAXCOMPUTE = "maxcompute";
    private static final String ENGINE_DORIS = "doris";

    /**
     * Executors for loading caches
     * 1. rowCountRefreshExecutor
     * For row count cache.
     * Row count cache is an async loading cache, and we can ignore the result
     * if cache missing or thread pool is full.
     * So use a separate executor for this cache.
     * <p>
     * 2.  commonRefreshExecutor
     * For other caches. Other caches are sync loading cache.
     * But commonRefreshExecutor will be used for async refresh.
     * That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously.
     * if cache entry need refresh, it will be reloaded in commonRefreshExecutor.
     * <p>
     * 3. fileListingExecutor
     * File listing is a heavy operation, so use a separate executor for it.
     * For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh.
     * And fileListingExecutor will be used to list file.
     */
    private ExecutorService rowCountRefreshExecutor;
    private ExecutorService commonRefreshExecutor;
    private ExecutorService fileListingExecutor;
    // This executor is used to schedule the getting split tasks
    private ExecutorService scheduleExecutor;
    private final ExternalMetaCacheRegistry cacheRegistry;
    private final ExternalMetaCacheRouteResolver routeResolver;
    private final LegacyMetaCacheFactory legacyMetaCacheFactory;

    // all catalogs could share the same fsCache.
    private FileSystemCache fsCache;
    // all external table row count cache.
    private ExternalRowCountCache rowCountCache;

    public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
        rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "RowCountRefreshExecutor", 0, true);

        commonRefreshExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 10000,
                "CommonRefreshExecutor", 10, true);

        // The queue size should be large enough,
        // because there may be thousands of partitions being queried at the same time.
        fileListingExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "FileListingExecutor", 10, true);

        scheduleExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "scheduleExecutor", 10, true);

        fsCache = new FileSystemCache();
        rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
        cacheRegistry = new ExternalMetaCacheRegistry();
        routeResolver = new ExternalMetaCacheRouteResolver(cacheRegistry);
        legacyMetaCacheFactory = new LegacyMetaCacheFactory(commonRefreshExecutor);

        initEngineCaches();
    }

    private ExecutorService newThreadPool(boolean isCheckpointCatalog, int numThread, int queueSize,
            String poolName, int timeoutSeconds,
            boolean needRegisterMetric) {
        String executorNamePrefix = isCheckpointCatalog ? "Checkpoint" : "NotCheckpoint";
        String realPoolName = executorNamePrefix + poolName;
        // Business threads require a fixed size thread pool and use queues to store unprocessed tasks.
        // Checkpoint threads have almost no business and need to be released in a timely manner to avoid thread leakage
        if (isCheckpointCatalog) {
            return ThreadPoolManager.newDaemonCacheThreadPool(numThread, realPoolName, needRegisterMetric);
        } else {
            return ThreadPoolManager.newDaemonFixedThreadPool(numThread, queueSize, realPoolName, timeoutSeconds,
                    needRegisterMetric);
        }
    }

    public ExecutorService getFileListingExecutor() {
        return fileListingExecutor;
    }

    public ExecutorService getScheduleExecutor() {
        return scheduleExecutor;
    }

    ExternalMetaCache engine(String engine) {
        return cacheRegistry.resolve(engine);
    }

    public HiveExternalMetaCache hive(long catalogId) {
        prepareCatalogByEngine(catalogId, ENGINE_HIVE);
        return (HiveExternalMetaCache) engine(ENGINE_HIVE);
    }

    public HudiExternalMetaCache hudi(long catalogId) {
        prepareCatalogByEngine(catalogId, ENGINE_HUDI);
        return (HudiExternalMetaCache) engine(ENGINE_HUDI);
    }

    public IcebergExternalMetaCache iceberg(long catalogId) {
        prepareCatalogByEngine(catalogId, ENGINE_ICEBERG);
        return (IcebergExternalMetaCache) engine(ENGINE_ICEBERG);
    }

    public PaimonExternalMetaCache paimon(long catalogId) {
        prepareCatalogByEngine(catalogId, ENGINE_PAIMON);
        return (PaimonExternalMetaCache) engine(ENGINE_PAIMON);
    }

    public MaxComputeExternalMetaCache maxCompute(long catalogId) {
        prepareCatalogByEngine(catalogId, ENGINE_MAXCOMPUTE);
        return (MaxComputeExternalMetaCache) engine(ENGINE_MAXCOMPUTE);
    }

    public DorisExternalMetaCache doris(long catalogId) {
        prepareCatalogByEngine(catalogId, ENGINE_DORIS);
        return (DorisExternalMetaCache) engine(ENGINE_DORIS);
    }

    public void prepareCatalog(long catalogId) {
        Map<String, String> catalogProperties = getCatalogProperties(catalogId);
        routeCatalogEngines(catalogId, cache -> cache.initCatalog(catalogId, catalogProperties));
    }

    public void prepareCatalogByEngine(long catalogId, String engine) {
        prepareCatalogByEngine(catalogId, engine, getCatalogProperties(catalogId));
    }

    public void prepareCatalogByEngine(long catalogId, String engine, Map<String, String> catalogProperties) {
        Map<String, String> safeCatalogProperties = catalogProperties == null
                ? Maps.newHashMap()
                : Maps.newHashMap(catalogProperties);
        routeSpecifiedEngine(engine, cache -> cache.initCatalog(catalogId, safeCatalogProperties));
    }

    public void invalidateCatalog(long catalogId) {
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidateCatalog",
                () -> cache.invalidateCatalogEntries(catalogId)));
    }

    public void invalidateCatalogByEngine(long catalogId, String engine) {
        routeSpecifiedEngine(engine, cache -> safeInvalidate(
                cache, catalogId, "invalidateCatalogByEngine",
                () -> cache.invalidateCatalogEntries(catalogId)));
    }

    public void removeCatalog(long catalogId) {
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "removeCatalog",
                () -> cache.invalidateCatalog(catalogId)));
    }

    public void removeCatalogByEngine(long catalogId, String engine) {
        routeSpecifiedEngine(engine, cache -> safeInvalidate(
                cache, catalogId, "removeCatalogByEngine",
                () -> cache.invalidateCatalog(catalogId)));
    }

    public void invalidateDb(long catalogId, String dbName) {
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidateDb", () -> cache.invalidateDb(catalogId, dbName)));
    }

    public void invalidateTable(long catalogId, String dbName, String tableName) {
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidateTable",
                () -> cache.invalidateTable(catalogId, dbName, tableName)));
    }

    public void invalidateTableByEngine(long catalogId, String engine, String dbName, String tableName) {
        routeSpecifiedEngine(engine, cache -> safeInvalidate(
                cache, catalogId, "invalidateTableByEngine",
                () -> cache.invalidateTable(catalogId, dbName, tableName)));
    }

    public void invalidatePartitions(long catalogId,
            String dbName, String tableName, List<String> partitions) {
        routeCatalogEngines(catalogId, cache -> safeInvalidate(
                cache, catalogId, "invalidatePartitions",
                () -> cache.invalidatePartitions(catalogId, dbName, tableName, partitions)));
    }

    public List<CatalogMetaCacheStats> getCatalogCacheStats(long catalogId) {
        List<CatalogMetaCacheStats> stats = new ArrayList<>();
        cacheRegistry.allCaches().forEach(externalMetaCache -> externalMetaCache.stats(catalogId)
                .forEach((entryName, entryStats) -> stats.add(
                        new CatalogMetaCacheStats(externalMetaCache.engine(), entryName, entryStats))));
        stats.sort(Comparator.comparing(CatalogMetaCacheStats::getEngineName)
                .thenComparing(CatalogMetaCacheStats::getEntryName));
        return stats;
    }

    public static final class CatalogMetaCacheStats {
        private final String engineName;
        private final String entryName;
        private final MetaCacheEntryStats entryStats;

        public CatalogMetaCacheStats(String engineName, String entryName, MetaCacheEntryStats entryStats) {
            this.engineName = Objects.requireNonNull(engineName, "engineName");
            this.entryName = Objects.requireNonNull(entryName, "entryName");
            this.entryStats = Objects.requireNonNull(entryStats, "entryStats");
        }

        public String getEngineName() {
            return engineName;
        }

        public String getEntryName() {
            return entryName;
        }

        public MetaCacheEntryStats getEntryStats() {
            return entryStats;
        }
    }

    private void initEngineCaches() {
        registerBuiltinEngineCaches();
    }

    private void registerBuiltinEngineCaches() {
        cacheRegistry.register(new DefaultExternalMetaCache(ENGINE_DEFAULT, commonRefreshExecutor));
        cacheRegistry.register(new HiveExternalMetaCache(commonRefreshExecutor, fileListingExecutor));
        cacheRegistry.register(new HudiExternalMetaCache(commonRefreshExecutor));
        cacheRegistry.register(new IcebergExternalMetaCache(commonRefreshExecutor));
        cacheRegistry.register(new PaimonExternalMetaCache(commonRefreshExecutor));
        cacheRegistry.register(new MaxComputeExternalMetaCache(commonRefreshExecutor));
        cacheRegistry.register(new DorisExternalMetaCache(commonRefreshExecutor));
    }

    private void routeCatalogEngines(long catalogId, Consumer<ExternalMetaCache> action) {
        routeResolver.resolveCatalogCaches(catalogId, getCatalog(catalogId)).forEach(action);
    }

    private void routeSpecifiedEngine(String engine, Consumer<ExternalMetaCache> action) {
        action.accept(this.engine(engine));
    }

    List<String> resolveCatalogEngineNamesForTest(@Nullable CatalogIf<?> catalog, long catalogId) {
        List<String> resolved = new ArrayList<>();
        routeResolver.resolveCatalogCaches(catalogId, catalog).forEach(cache -> resolved.add(cache.engine()));
        return new ArrayList<>(resolved);
    }

    private void safeInvalidate(ExternalMetaCache cache, long catalogId, String operation, Runnable action) {
        if (!cache.isCatalogInitialized(catalogId)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("skip {} for catalog {} on engine '{}' because cache entry is absent",
                        operation, catalogId, cache.engine());
            }
            return;
        }
        action.run();
    }

    private Map<String, String> getCatalogProperties(long catalogId) {
        CatalogIf<?> catalog = getCatalog(catalogId);
        if (catalog == null) {
            throw new IllegalStateException(String.format("Catalog %d does not exist.", catalogId));
        }
        if (catalog.getProperties() == null) {
            return Maps.newHashMap();
        }
        return Maps.newHashMap(catalog.getProperties());
    }

    @Nullable
    private CatalogIf<?> getCatalog(long catalogId) {
        if (Env.getCurrentEnv() == null || Env.getCurrentEnv().getCatalogMgr() == null) {
            return null;
        }
        return Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
    }

    @SuppressWarnings("unchecked")
    public Optional<SchemaCacheValue> getSchemaCacheValue(ExternalTable table, SchemaCacheKey key) {
        long catalogId = table.getCatalog().getId();
        String resolvedEngine = table.getMetaCacheEngine();
        prepareCatalogByEngine(catalogId, resolvedEngine);
        return ((ExternalMetaCache) engine(resolvedEngine)).getSchemaValue(catalogId, key);
    }

    public FileSystemCache getFsCache() {
        return fsCache;
    }

    public ExternalRowCountCache getRowCountCache() {
        return rowCountCache;
    }

    public void invalidateTableCache(ExternalTable dorisTable) {
        invalidateTable(dorisTable.getCatalog().getId(),
                dorisTable.getDbName(),
                dorisTable.getName());
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalid table cache for {}.{} in catalog {}", dorisTable.getRemoteDbName(),
                    dorisTable.getRemoteName(), dorisTable.getCatalog().getName());
        }
    }

    public LegacyMetaCacheFactory legacyMetaCacheFactory() {
        return legacyMetaCacheFactory;
    }

    public static Map<String, String> getCacheStats(CacheStats cacheStats, long estimatedSize) {
        Map<String, String> stats = Maps.newHashMap();
        stats.put("hit_ratio", String.valueOf(cacheStats.hitRate()));
        stats.put("hit_count", String.valueOf(cacheStats.hitCount()));
        stats.put("read_count", String.valueOf(cacheStats.hitCount() + cacheStats.missCount()));
        stats.put("eviction_count", String.valueOf(cacheStats.evictionCount()));
        stats.put("average_load_penalty", String.valueOf(cacheStats.averageLoadPenalty()));
        stats.put("estimated_size", String.valueOf(estimatedSize));
        return stats;
    }

    void replaceEngineCachesForTest(List<? extends ExternalMetaCache> caches) {
        cacheRegistry.resetForTest(caches);
    }

    /**
     * Fallback implementation of {@link AbstractExternalMetaCache} for engines that do not
     * provide dedicated cache entries.
     *
     * <p>Registered entries:
     * <ul>
     *   <li>{@code schema}: schema-only cache keyed by {@link SchemaCacheKey}</li>
     * </ul>
     *
     * <p>This class keeps compatibility for generic external engines and routes only schema
     * loading/invalidation. No engine-specific metadata (partitions/files/snapshots) is cached.
     */
    private static class DefaultExternalMetaCache extends AbstractExternalMetaCache {
        DefaultExternalMetaCache(String engine, ExecutorService refreshExecutor) {
            super(engine, refreshExecutor);
            registerEntry(MetaCacheEntryDef.of(
                    ENTRY_SCHEMA,
                    SchemaCacheKey.class,
                    SchemaCacheValue.class,
                    this::loadSchemaCacheValue,
                    defaultSchemaCacheSpec(),
                    MetaCacheEntryInvalidation.forTableIdentity(
                            key -> key.getNameMapping().getLocalDbName(),
                            key -> key.getNameMapping().getLocalTblName())));
        }

        @Override
        protected Map<String, String> catalogPropertyCompatibilityMap() {
            return singleCompatibilityMap(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND, ENTRY_SCHEMA);
        }

        private SchemaCacheValue loadSchemaCacheValue(SchemaCacheKey key) {
            CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(key.getNameMapping().getCtlId());
            if (!(catalog instanceof ExternalCatalog)) {
                throw new CacheException("catalog %s is not external when loading schema cache",
                        null, key.getNameMapping().getCtlId());
            }
            ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
            return externalCatalog.getSchema(key).orElseThrow(() -> new CacheException(
                    "failed to load schema cache value for: %s.%s.%s",
                    null, key.getNameMapping().getCtlId(),
                    key.getNameMapping().getLocalDbName(),
                    key.getNameMapping().getLocalTblName()));
        }
    }
}