AbstractExternalMetaCache.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.metacache;

import org.apache.doris.common.Config;

import com.google.common.collect.Maps;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

/**
 * Base implementation of {@link ExternalMetaCache}.
 * It keeps the shared in-memory layout
 * Map<CatalogId, CatalogEntryGroup>, implements default
 * lifecycle behavior, and provides conservative invalidation fallback.
 * Subclasses register entry definitions during construction and entries are
 * initialized eagerly by {@link #initCatalog(long)}.
 */
public abstract class AbstractExternalMetaCache implements ExternalMetaCache {
    protected static final CacheSpec DEFAULT_ENTRY_CACHE_SPEC =
            CacheSpec.fromTtlValue(
                    null,
                    Config.external_cache_expire_time_seconds_after_access,
                    Config.max_external_table_cache_num);

    private final String engine;
    private final ExecutorService refreshExecutor;
    private final Map<Long, CatalogEntryGroup> catalogEntries = Maps.newConcurrentMap();
    private final Map<String, MetaCacheEntryDef<?, ?>> metaCacheEntryDefs = Maps.newConcurrentMap();

    protected AbstractExternalMetaCache(String engine, ExecutorService refreshExecutor) {
        this.engine = engine;
        this.refreshExecutor = Objects.requireNonNull(refreshExecutor, "refreshExecutor can not be null");
    }

    @Override
    public String engine() {
        return engine;
    }

    @Override
    public void initCatalog(long catalogId, Map<String, String> catalogProperties) {
        Map<String, String> safeCatalogProperties = CacheSpec.applyCompatibilityMap(
                catalogProperties, catalogPropertyCompatibilityMap());
        catalogEntries.computeIfAbsent(catalogId, id -> buildCatalogEntryGroup(safeCatalogProperties));
    }

    @Override
    public void checkCatalogInitialized(long catalogId) {
        requireCatalogEntryGroup(catalogId);
    }

    /**
     * Optional compatibility map for legacy catalog properties.
     *
     * <p>Map format: {@code legacyKey -> newKey}. The mapping is applied before
     * entry cache specs are parsed. If both keys exist, new key keeps precedence.
     */
    protected Map<String, String> catalogPropertyCompatibilityMap() {
        return Collections.emptyMap();
    }

    @Override
    @SuppressWarnings("unchecked")
    public <K, V> MetaCacheEntry<K, V> entry(long catalogId, String entryName, Class<K> keyType, Class<V> valueType) {
        CatalogEntryGroup group = requireCatalogEntryGroup(catalogId);
        MetaCacheEntryDef<?, ?> def = requireMetaCacheEntryDef(entryName);
        ensureTypeCompatible(def, keyType, valueType);

        MetaCacheEntry<?, ?> cacheEntry = group.get(entryName);
        if (cacheEntry == null) {
            throw new IllegalStateException(String.format(
                    "Entry '%s' is not initialized for engine '%s', catalog %d.",
                    entryName, engine, catalogId));
        }
        return (MetaCacheEntry<K, V>) cacheEntry;
    }

    @Override
    public void invalidateCatalog(long catalogId) {
        CatalogEntryGroup removed = catalogEntries.remove(catalogId);
        if (removed != null) {
            removed.invalidateAll();
        }
    }

    @Override
    public void invalidateCatalogEntries(long catalogId) {
        CatalogEntryGroup group = catalogEntries.get(catalogId);
        if (group != null) {
            group.invalidateAll();
        }
    }

    @Override
    public void invalidateDb(long catalogId, String dbName) {
        // Generic key mapping is unknown in base class; fallback to catalog-scope invalidation.
        invalidateCatalog(catalogId);
    }

    @Override
    public void invalidateTable(long catalogId, String dbName, String tableName) {
        // Generic key mapping is unknown in base class; fallback to catalog-scope invalidation.
        invalidateCatalog(catalogId);
    }

    @Override
    public void invalidatePartitions(long catalogId, String dbName, String tableName, List<String> partitions) {
        // Generic key mapping is unknown in base class; fallback to catalog-scope invalidation.
        invalidateCatalog(catalogId);
    }

    @Override
    public Map<String, Map<String, String>> stats(long catalogId) {
        CatalogEntryGroup group = catalogEntries.get(catalogId);
        return group == null ? Maps.newHashMap() : group.stats();
    }

    @Override
    public void close() {
        catalogEntries.values().forEach(CatalogEntryGroup::invalidateAll);
        catalogEntries.clear();
    }

    protected final <K, V> void registerMetaCacheEntryDef(MetaCacheEntryDef<K, V> entryDef) {
        Objects.requireNonNull(entryDef, "entryDef");
        if (!catalogEntries.isEmpty()) {
            throw new IllegalStateException(
                    String.format("Can not register entry '%s' after catalog initialization for engine '%s'.",
                            entryDef.getName(), engine));
        }
        MetaCacheEntryDef<?, ?> existing = metaCacheEntryDefs.putIfAbsent(entryDef.getName(), entryDef);
        if (existing != null) {
            throw new IllegalArgumentException(
                    String.format("Duplicated entry definition '%s' for engine '%s'.", entryDef.getName(), engine));
        }
    }

    protected final <K, V> MetaCacheEntry<K, V> entry(long catalogId, MetaCacheEntryDef<K, V> entryDef) {
        validateRegisteredMetaCacheEntryDef(entryDef);
        return entry(catalogId, entryDef.getName(), entryDef.getKeyType(), entryDef.getValueType());
    }

    private CatalogEntryGroup requireCatalogEntryGroup(long catalogId) {
        CatalogEntryGroup group = catalogEntries.get(catalogId);
        if (group == null) {
            throw new IllegalStateException(String.format(
                    "Catalog %d is not initialized for engine '%s'. Call initCatalog first.",
                    catalogId, engine));
        }
        return group;
    }

    private MetaCacheEntryDef<?, ?> requireMetaCacheEntryDef(String entryName) {
        MetaCacheEntryDef<?, ?> entryDef = metaCacheEntryDefs.get(entryName);
        if (entryDef == null) {
            throw new IllegalArgumentException(String.format(
                    "Entry '%s' is not registered for engine '%s'.", entryName, engine));
        }
        return entryDef;
    }

    private void ensureTypeCompatible(MetaCacheEntryDef<?, ?> entryDef, Class<?> keyType, Class<?> valueType) {
        if (!entryDef.getKeyType().equals(keyType) || !entryDef.getValueType().equals(valueType)) {
            throw new IllegalArgumentException(String.format(
                    "Entry '%s' for engine '%s' expects key/value types (%s, %s), but got (%s, %s).",
                    entryDef.getName(), engine, entryDef.getKeyType().getName(), entryDef.getValueType().getName(),
                    keyType.getName(), valueType.getName()));
        }
    }

    private <K, V> void validateRegisteredMetaCacheEntryDef(MetaCacheEntryDef<K, V> entryDef) {
        MetaCacheEntryDef<?, ?> registered = requireMetaCacheEntryDef(entryDef.getName());
        ensureTypeCompatible(registered, entryDef.getKeyType(), entryDef.getValueType());
    }

    private CatalogEntryGroup buildCatalogEntryGroup(Map<String, String> catalogProperties) {
        CatalogEntryGroup group = new CatalogEntryGroup();
        metaCacheEntryDefs.values()
                .forEach(entryDef -> group.put(entryDef.getName(), newMetaCacheEntry(entryDef, catalogProperties)));
        return group;
    }

    @SuppressWarnings("unchecked")
    private <K, V> MetaCacheEntry<K, V> newMetaCacheEntry(
            MetaCacheEntryDef<?, ?> rawEntryDef, Map<String, String> catalogProperties) {
        MetaCacheEntryDef<K, V> entryDef = (MetaCacheEntryDef<K, V>) rawEntryDef;
        CacheSpec cacheSpec = CacheSpec.fromProperties(
                catalogProperties, engine, entryDef.getName(), entryDef.getDefaultCacheSpec());
        return new MetaCacheEntry<>(entryDef.getName(), entryDef.getLoader(), cacheSpec,
                refreshExecutor, entryDef.isAutoRefresh());
    }
}