ExternalMetaCacheMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.Type;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonMetadataCache;
import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fsv2.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;

/**
 * Cache meta of external catalog
 * 1. Meta for hive meta store, mainly for partition.
 * 2. Table Schema cache.
 * 3. Row count cache.
 */
public class ExternalMetaCacheMgr {
    private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class);

    /**
     * Executors for loading caches
     * 1. rowCountRefreshExecutor
     * For row count cache.
     * Row count cache is an async loading cache, and we can ignore the result
     * if cache missing or thread pool is full.
     * So use a separate executor for this cache.
     * <p>
     * 2.  commonRefreshExecutor
     * For other caches. Other caches are sync loading cache.
     * But commonRefreshExecutor will be used for async refresh.
     * That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously.
     * if cache entry need refresh, it will be reloaded in commonRefreshExecutor.
     * <p>
     * 3. fileListingExecutor
     * File listing is a heavy operation, so use a separate executor for it.
     * For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh.
     * And fileListingExecutor will be used to list file.
     */
    private ExecutorService rowCountRefreshExecutor;
    private ExecutorService commonRefreshExecutor;
    private ExecutorService fileListingExecutor;
    // This executor is used to schedule the getting split tasks
    private ExecutorService scheduleExecutor;

    // catalog id -> HiveMetaStoreCache
    private final Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
    // catalog id -> table schema cache
    private final Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
    // hudi partition manager
    private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
    // all catalogs could share the same fsCache.
    private FileSystemCache fsCache;
    // all external table row count cache.
    private ExternalRowCountCache rowCountCache;
    private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
    private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
    private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;

    public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
        rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "RowCountRefreshExecutor", 0, true);

        commonRefreshExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 10000,
                "CommonRefreshExecutor", 10, true);

        // The queue size should be large enough,
        // because there may be thousands of partitions being queried at the same time.
        fileListingExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "FileListingExecutor", 10, true);

        scheduleExecutor = newThreadPool(isCheckpointCatalog,
                Config.max_external_cache_loader_thread_pool_size,
                Config.max_external_cache_loader_thread_pool_size * 1000,
                "scheduleExecutor", 10, true);

        fsCache = new FileSystemCache();
        rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);

        hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
        icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
        maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
        paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
    }

    private ExecutorService newThreadPool(boolean isCheckpointCatalog, int numThread, int queueSize,
            String poolName, int timeoutSeconds,
            boolean needRegisterMetric) {
        String executorNamePrefix = isCheckpointCatalog ? "Checkpoint" : "NotCheckpoint";
        String realPoolName = executorNamePrefix + poolName;
        // Business threads require a fixed size thread pool and use queues to store unprocessed tasks.
        // Checkpoint threads have almost no business and need to be released in a timely manner to avoid thread leakage
        if (isCheckpointCatalog) {
            return ThreadPoolManager.newDaemonCacheThreadPool(numThread, realPoolName, needRegisterMetric);
        } else {
            return ThreadPoolManager.newDaemonFixedThreadPool(numThread, queueSize, realPoolName, timeoutSeconds,
                    needRegisterMetric);
        }
    }

    public ExecutorService getFileListingExecutor() {
        return fileListingExecutor;
    }

    public ExecutorService getScheduleExecutor() {
        return scheduleExecutor;
    }

    public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
        HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
        if (cache == null) {
            synchronized (cacheMap) {
                if (!cacheMap.containsKey(catalog.getId())) {
                    cacheMap.put(catalog.getId(),
                            new HiveMetaStoreCache(catalog, commonRefreshExecutor, fileListingExecutor));
                }
                cache = cacheMap.get(catalog.getId());
            }
        }
        return cache;
    }

    public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
        ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId());
        if (cache == null) {
            synchronized (schemaCacheMap) {
                if (!schemaCacheMap.containsKey(catalog.getId())) {
                    schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, commonRefreshExecutor));
                }
                cache = schemaCacheMap.get(catalog.getId());
            }
        }
        return cache;
    }

    public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
        return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
    }

    public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) {
        return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
    }

    public HudiCachedMetaClientProcessor getMetaClientProcessor(ExternalCatalog catalog) {
        return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog);
    }

    public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
        return hudiMetadataCacheMgr;
    }

    public IcebergMetadataCache getIcebergMetadataCache() {
        return icebergMetadataCacheMgr.getIcebergMetadataCache();
    }

    public PaimonMetadataCache getPaimonMetadataCache() {
        return paimonMetadataCacheMgr.getPaimonMetadataCache();
    }

    public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
        return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
    }

    public FileSystemCache getFsCache() {
        return fsCache;
    }

    public ExternalRowCountCache getRowCountCache() {
        return rowCountCache;
    }

    public void removeCache(long catalogId) {
        if (cacheMap.remove(catalogId) != null) {
            LOG.info("remove hive metastore cache for catalog {}", catalogId);
        }
        synchronized (schemaCacheMap) {
            if (schemaCacheMap.remove(catalogId) != null) {
                LOG.info("remove schema cache for catalog {}", catalogId);
            }
        }
        hudiMetadataCacheMgr.removeCache(catalogId);
        icebergMetadataCacheMgr.removeCache(catalogId);
        maxComputeMetadataCacheMgr.removeCache(catalogId);
        paimonMetadataCacheMgr.removeCache(catalogId);
    }

    public void invalidateTableCache(long catalogId, String dbName, String tblName) {
        dbName = ClusterNamespace.getNameFromFullName(dbName);
        synchronized (schemaCacheMap) {
            ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
            if (schemaCache != null) {
                schemaCache.invalidateTableCache(dbName, tblName);
            }
        }
        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
        if (metaCache != null) {
            metaCache.invalidateTableCache(dbName, tblName);
        }
        hudiMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
        icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
        maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
        paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId);
        }
    }

    public void invalidateDbCache(long catalogId, String dbName) {
        dbName = ClusterNamespace.getNameFromFullName(dbName);
        synchronized (schemaCacheMap) {
            ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
            if (schemaCache != null) {
                schemaCache.invalidateDbCache(dbName);
            }
        }
        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
        if (metaCache != null) {
            metaCache.invalidateDbCache(dbName);
        }
        hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
        icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
        maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
        paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
        }
    }

    public void invalidateCatalogCache(long catalogId) {
        synchronized (schemaCacheMap) {
            schemaCacheMap.remove(catalogId);
        }
        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
        if (metaCache != null) {
            metaCache.invalidateAll();
        }
        hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
        icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
        maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
        paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalid catalog cache for {}", catalogId);
        }
    }

    public void invalidSchemaCache(long catalogId) {
        synchronized (schemaCacheMap) {
            schemaCacheMap.remove(catalogId);
        }
    }

    public void addPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
        String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
        if (metaCache != null) {
            List<Type> partitionColumnTypes;
            try {
                partitionColumnTypes = table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table));
            } catch (NotSupportedException e) {
                LOG.warn("Ignore not supported hms table, message: {} ", e.getMessage());
                return;
            }
            metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, partitionColumnTypes);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
        }
    }

    public void dropPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
        String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
        if (metaCache != null) {
            metaCache.dropPartitionsCache(dbName, table.getName(), partitionNames, true);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
        }
    }

    public void invalidatePartitionsCache(long catalogId, String dbName, String tableName,
            List<String> partitionNames) {
        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
        if (metaCache != null) {
            dbName = ClusterNamespace.getNameFromFullName(dbName);
            for (String partitionName : partitionNames) {
                metaCache.invalidatePartitionCache(dbName, tableName, partitionName);
            }

        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("invalidate partition cache for {}.{} in catalog {}", dbName, tableName, catalogId);
        }
    }

    public <T> MetaCache<T> buildMetaCache(String name,
            OptionalLong expireAfterWriteSec, OptionalLong refreshAfterWriteSec, long maxSize,
            CacheLoader<String, List<Pair<String, String>>> namesCacheLoader,
            CacheLoader<String, Optional<T>> metaObjCacheLoader,
            RemovalListener<String, Optional<T>> removalListener) {
        MetaCache<T> metaCache = new MetaCache<>(name, commonRefreshExecutor, expireAfterWriteSec, refreshAfterWriteSec,
                maxSize, namesCacheLoader, metaObjCacheLoader, removalListener);
        return metaCache;
    }

    public static Map<String, String> getCacheStats(CacheStats cacheStats, long estimatedSize) {
        Map<String, String> stats = Maps.newHashMap();
        stats.put("hit_ratio", String.valueOf(cacheStats.hitRate()));
        stats.put("hit_count", String.valueOf(cacheStats.hitCount()));
        stats.put("read_count", String.valueOf(cacheStats.hitCount() + cacheStats.missCount()));
        stats.put("eviction_count", String.valueOf(cacheStats.evictionCount()));
        stats.put("average_load_penalty", String.valueOf(cacheStats.averageLoadPenalty()));
        stats.put("estimated_size", String.valueOf(estimatedSize));
        return stats;
    }
}