AbstractExternalMetaCache.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.metacache;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SchemaCacheValue;

import com.google.common.collect.Maps;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;

/**
 * Base implementation of {@link ExternalMetaCache}.
 * It keeps the shared in-memory layout
 * Map<CatalogId, CatalogEntryGroup>, implements default
 * lifecycle behavior, and provides conservative invalidation fallback.
 * Subclasses register entry definitions during construction and expect callers
 * to initialize a catalog explicitly before accessing entries.
 */
public abstract class AbstractExternalMetaCache implements ExternalMetaCache {
    protected static CacheSpec defaultEntryCacheSpec() {
        return CacheSpec.of(
                true,
                Config.external_cache_expire_time_seconds_after_access,
                Config.max_external_table_cache_num);
    }

    protected static CacheSpec defaultSchemaCacheSpec() {
        return CacheSpec.of(
                true,
                Config.external_cache_expire_time_seconds_after_access,
                Config.max_external_schema_cache_num);
    }

    private final String engine;
    private final ExecutorService refreshExecutor;
    private final Map<Long, CatalogEntryGroup> catalogEntries = Maps.newConcurrentMap();
    private final Map<String, MetaCacheEntryDef<?, ?>> metaCacheEntryDefs = Maps.newConcurrentMap();

    protected AbstractExternalMetaCache(String engine, ExecutorService refreshExecutor) {
        this.engine = engine;
        this.refreshExecutor = Objects.requireNonNull(refreshExecutor, "refreshExecutor can not be null");
    }

    @Override
    public String engine() {
        return engine;
    }

    @Override
    public Collection<String> aliases() {
        return Collections.singleton(engine);
    }

    @Override
    public void initCatalog(long catalogId, Map<String, String> catalogProperties) {
        Map<String, String> safeCatalogProperties = CacheSpec.applyCompatibilityMap(
                catalogProperties, catalogPropertyCompatibilityMap());
        catalogEntries.computeIfAbsent(catalogId, id -> buildCatalogEntryGroup(safeCatalogProperties));
    }

    @Override
    public void checkCatalogInitialized(long catalogId) {
        requireCatalogEntryGroup(catalogId);
    }

    @Override
    public boolean isCatalogInitialized(long catalogId) {
        return catalogEntries.containsKey(catalogId);
    }

    /**
     * Optional compatibility map for legacy catalog properties.
     *
     * <p>Map format: {@code legacyKey -> newKey}. The mapping is applied before
     * entry cache specs are parsed. If both keys exist, new key keeps precedence.
     */
    protected Map<String, String> catalogPropertyCompatibilityMap() {
        return Collections.emptyMap();
    }

    @Override
    @SuppressWarnings("unchecked")
    public <K, V> MetaCacheEntry<K, V> entry(long catalogId, String entryName, Class<K> keyType, Class<V> valueType) {
        CatalogEntryGroup group = requireCatalogEntryGroup(catalogId);
        MetaCacheEntryDef<?, ?> def = requireMetaCacheEntryDef(entryName);
        ensureTypeCompatible(def, keyType, valueType);

        MetaCacheEntry<?, ?> cacheEntry = group.get(entryName);
        if (cacheEntry == null) {
            throw new IllegalStateException(String.format(
                    "Entry '%s' is not initialized for engine '%s', catalog %d.",
                    entryName, engine, catalogId));
        }
        return (MetaCacheEntry<K, V>) cacheEntry;
    }

    @Override
    public void invalidateCatalog(long catalogId) {
        CatalogEntryGroup removed = catalogEntries.remove(catalogId);
        if (removed != null) {
            removed.invalidateAll();
        }
    }

    @Override
    public void invalidateCatalogEntries(long catalogId) {
        CatalogEntryGroup group = catalogEntries.get(catalogId);
        if (group != null) {
            group.invalidateAll();
        }
    }

    @Override
    public void invalidateDb(long catalogId, String dbName) {
        invalidateEntries(catalogId, entryDef -> entryDef.getInvalidation().dbPredicate(dbName));
    }

    @Override
    public void invalidateTable(long catalogId, String dbName, String tableName) {
        invalidateEntries(catalogId, entryDef -> entryDef.getInvalidation().tablePredicate(dbName, tableName));
    }

    @Override
    public void invalidatePartitions(long catalogId, String dbName, String tableName, List<String> partitions) {
        invalidateEntries(catalogId,
                entryDef -> entryDef.getInvalidation().partitionPredicate(dbName, tableName, partitions));
    }

    @Override
    public Map<String, MetaCacheEntryStats> stats(long catalogId) {
        CatalogEntryGroup group = catalogEntries.get(catalogId);
        return group == null ? Maps.newHashMap() : group.stats();
    }

    @Override
    public void close() {
        catalogEntries.values().forEach(CatalogEntryGroup::invalidateAll);
        catalogEntries.clear();
    }

    protected final <K, V> void registerMetaCacheEntryDef(MetaCacheEntryDef<K, V> entryDef) {
        Objects.requireNonNull(entryDef, "entryDef");
        if (!catalogEntries.isEmpty()) {
            throw new IllegalStateException(
                    String.format("Can not register entry '%s' after catalog initialization for engine '%s'.",
                            entryDef.getName(), engine));
        }
        MetaCacheEntryDef<?, ?> existing = metaCacheEntryDefs.putIfAbsent(entryDef.getName(), entryDef);
        if (existing != null) {
            throw new IllegalArgumentException(
                    String.format("Duplicated entry definition '%s' for engine '%s'.", entryDef.getName(), engine));
        }
    }

    protected final <K, V> EntryHandle<K, V> registerEntry(MetaCacheEntryDef<K, V> entryDef) {
        registerMetaCacheEntryDef(entryDef);
        return new EntryHandle<>(entryDef);
    }

    protected final <K, V> MetaCacheEntry<K, V> entry(long catalogId, MetaCacheEntryDef<K, V> entryDef) {
        validateRegisteredMetaCacheEntryDef(entryDef);
        return entry(catalogId, entryDef.getName(), entryDef.getKeyType(), entryDef.getValueType());
    }

    protected final String metaCacheTtlKey(String entryName) {
        return "meta.cache." + engine + "." + entryName + ".ttl-second";
    }

    protected final Map<String, String> singleCompatibilityMap(String legacyKey, String entryName) {
        return Collections.singletonMap(legacyKey, metaCacheTtlKey(entryName));
    }

    protected final boolean matchDb(NameMapping nameMapping, String dbName) {
        return nameMapping.getLocalDbName().equals(dbName);
    }

    protected final boolean matchTable(NameMapping nameMapping, String dbName, String tableName) {
        return matchDb(nameMapping, dbName) && nameMapping.getLocalTblName().equals(tableName);
    }

    protected final ExternalTable findExternalTable(NameMapping nameMapping, String engineNameForError) {
        CatalogIf<?> catalog = getCatalog(nameMapping.getCtlId());
        if (!(catalog instanceof ExternalCatalog)) {
            throw new CacheException("catalog %s is not external when loading %s schema cache",
                    null, nameMapping.getCtlId(), engineNameForError);
        }
        ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
        return externalCatalog.getDb(nameMapping.getLocalDbName())
                .flatMap(db -> db.getTable(nameMapping.getLocalTblName()))
                .orElseThrow(() -> new CacheException(
                        "table %s.%s.%s not found when loading %s schema cache",
                        null, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
                        nameMapping.getLocalTblName(), engineNameForError));
    }

    private CatalogEntryGroup requireCatalogEntryGroup(long catalogId) {
        CatalogEntryGroup group = catalogEntries.get(catalogId);
        if (group == null) {
            throw new IllegalStateException(String.format(
                    "Catalog %d is not initialized for engine '%s'.",
                    catalogId, engine));
        }
        return group;
    }

    protected CatalogIf<?> getCatalog(long catalogId) {
        if (Env.getCurrentEnv() == null || Env.getCurrentEnv().getCatalogMgr() == null) {
            return null;
        }
        return Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
    }

    private MetaCacheEntryDef<?, ?> requireMetaCacheEntryDef(String entryName) {
        MetaCacheEntryDef<?, ?> entryDef = metaCacheEntryDefs.get(entryName);
        if (entryDef == null) {
            throw new IllegalArgumentException(String.format(
                    "Entry '%s' is not registered for engine '%s'.", entryName, engine));
        }
        return entryDef;
    }

    private void ensureTypeCompatible(MetaCacheEntryDef<?, ?> entryDef, Class<?> keyType, Class<?> valueType) {
        if (!entryDef.getKeyType().equals(keyType) || !entryDef.getValueType().equals(valueType)) {
            throw new IllegalArgumentException(String.format(
                    "Entry '%s' for engine '%s' expects key/value types (%s, %s), but got (%s, %s).",
                    entryDef.getName(), engine, entryDef.getKeyType().getName(), entryDef.getValueType().getName(),
                    keyType.getName(), valueType.getName()));
        }
    }

    private <K, V> void validateRegisteredMetaCacheEntryDef(MetaCacheEntryDef<K, V> entryDef) {
        MetaCacheEntryDef<?, ?> registered = requireMetaCacheEntryDef(entryDef.getName());
        ensureTypeCompatible(registered, entryDef.getKeyType(), entryDef.getValueType());
    }

    private void invalidateEntries(long catalogId, Function<MetaCacheEntryDef<?, ?>, Predicate<?>> predicateFactory) {
        CatalogEntryGroup group = catalogEntries.get(catalogId);
        if (group == null) {
            return;
        }
        metaCacheEntryDefs.values().forEach(entryDef -> invalidateEntryIfMatched(group, entryDef, predicateFactory));
    }

    @SuppressWarnings("unchecked")
    private <K, V> void invalidateEntryIfMatched(CatalogEntryGroup group, MetaCacheEntryDef<K, V> entryDef,
            Function<MetaCacheEntryDef<?, ?>, Predicate<?>> predicateFactory) {
        Predicate<K> predicate = (Predicate<K>) predicateFactory.apply(entryDef);
        if (predicate == null) {
            return;
        }
        MetaCacheEntry<K, V> entry = (MetaCacheEntry<K, V>) group.get(entryDef.getName());
        if (entry != null) {
            entry.invalidateIf(predicate);
        }
    }

    private CatalogEntryGroup buildCatalogEntryGroup(Map<String, String> catalogProperties) {
        CatalogEntryGroup group = new CatalogEntryGroup();
        metaCacheEntryDefs.values()
                .forEach(entryDef -> group.put(entryDef.getName(), newMetaCacheEntry(entryDef, catalogProperties)));
        return group;
    }

    @SuppressWarnings("unchecked")
    private <K, V> MetaCacheEntry<K, V> newMetaCacheEntry(
            MetaCacheEntryDef<?, ?> rawEntryDef, Map<String, String> catalogProperties) {
        MetaCacheEntryDef<K, V> entryDef = (MetaCacheEntryDef<K, V>) rawEntryDef;
        CacheSpec cacheSpec = CacheSpec.fromProperties(
                catalogProperties, engine, entryDef.getName(), entryDef.getDefaultCacheSpec());
        return new MetaCacheEntry<>(entryDef.getName(),
                wrapSchemaValidator(entryDef.getLoader(), entryDef.getValueType()),
                cacheSpec,
                refreshExecutor, entryDef.isAutoRefresh(), entryDef.isContextualOnly());
    }

    private <K, V> Function<K, V> wrapSchemaValidator(Function<K, V> loader, Class<V> valueType) {
        if (loader == null) {
            return null;
        }
        if (!SchemaCacheValue.class.isAssignableFrom(valueType)) {
            return loader;
        }
        return key -> {
            V value = loader.apply(key);
            ((SchemaCacheValue) value).validateSchema();
            return value;
        };
    }

    protected final class EntryHandle<K, V> {
        private final MetaCacheEntryDef<K, V> entryDef;

        private EntryHandle(MetaCacheEntryDef<K, V> entryDef) {
            this.entryDef = entryDef;
        }

        public MetaCacheEntry<K, V> get(long catalogId) {
            return entry(catalogId, entryDef);
        }

        public MetaCacheEntry<K, V> getIfInitialized(long catalogId) {
            return isCatalogInitialized(catalogId) ? get(catalogId) : null;
        }
    }
}