IcebergExternalMetaCache.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
import org.apache.doris.datasource.metacache.AbstractExternalMetaCache;
import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.metacache.MetaCacheEntry;
import org.apache.doris.datasource.metacache.MetaCacheEntryDef;
import org.apache.doris.mtmv.MTMVRelatedTableIf;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.view.View;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/**
 * Iceberg engine implementation of {@link AbstractExternalMetaCache}.
 *
 * <p>Registered entries:
 * <ul>
 *   <li>{@code table}: loaded Iceberg {@link Table} instances per Doris table mapping</li>
 *   <li>{@code view}: loaded Iceberg {@link View} instances</li>
 *   <li>{@code manifest}: parsed manifest payload ({@link ManifestCacheValue}) keyed by
 *   manifest path and content type</li>
 *   <li>{@code schema}: schema cache keyed by table identity + schema id</li>
 * </ul>
 *
 * <p>Manifest entry keys are path-based and intentionally not table-scoped. This allows
 * shared manifests to reuse one cache entry across tables in the same catalog.
 *
 * <p>Invalidation behavior:
 * <ul>
 *   <li>catalog invalidation clears all entries and drops Iceberg {@link ManifestFiles} IO cache</li>
 *   <li>db/table invalidation clears table/view/schema entries, while keeping manifest entries</li>
 *   <li>partition-level invalidation falls back to table-level invalidation</li>
 * </ul>
 */
public class IcebergExternalMetaCache extends AbstractExternalMetaCache {
    private static final Logger LOG = LogManager.getLogger(IcebergExternalMetaCache.class);

    public static final String ENGINE = "iceberg";
    public static final String ENTRY_TABLE = "table";
    public static final String ENTRY_VIEW = "view";
    public static final String ENTRY_MANIFEST = "manifest";
    public static final String ENTRY_SCHEMA = "schema";

    private static final CacheSpec SCHEMA_CACHE_SPEC = CacheSpec.fromTtlValue(
            null, Config.external_cache_expire_time_seconds_after_access, Config.max_external_schema_cache_num);
    private static final CacheSpec MANIFEST_CACHE_SPEC = CacheSpec.fromTtlValue(
            String.valueOf(CacheSpec.CACHE_NO_TTL), CacheSpec.CACHE_NO_TTL, Config.max_external_table_cache_num);

    private final MetaCacheEntryDef<NameMapping, IcebergTableCacheValue> tableEntryDef;
    private final MetaCacheEntryDef<NameMapping, View> viewEntryDef;
    private final MetaCacheEntryDef<IcebergManifestEntryKey, ManifestCacheValue> manifestEntryDef;
    private final MetaCacheEntryDef<IcebergSchemaCacheKey, SchemaCacheValue> schemaEntryDef;

    public IcebergExternalMetaCache(ExecutorService refreshExecutor) {
        super(ENGINE, refreshExecutor);
        tableEntryDef = MetaCacheEntryDef.of(ENTRY_TABLE, NameMapping.class, IcebergTableCacheValue.class,
                this::loadTableCacheValue, DEFAULT_ENTRY_CACHE_SPEC);
        viewEntryDef = MetaCacheEntryDef.of(ENTRY_VIEW, NameMapping.class, View.class, this::loadView,
                DEFAULT_ENTRY_CACHE_SPEC);
        manifestEntryDef = MetaCacheEntryDef.of(ENTRY_MANIFEST, IcebergManifestEntryKey.class,
                ManifestCacheValue.class, this::loadManifestCacheValue, MANIFEST_CACHE_SPEC, false);
        schemaEntryDef = MetaCacheEntryDef.of(ENTRY_SCHEMA, IcebergSchemaCacheKey.class, SchemaCacheValue.class,
                this::loadSchemaCacheValue, SCHEMA_CACHE_SPEC);
        registerMetaCacheEntryDef(tableEntryDef);
        registerMetaCacheEntryDef(viewEntryDef);
        registerMetaCacheEntryDef(manifestEntryDef);
        registerMetaCacheEntryDef(schemaEntryDef);
    }

    public Table getIcebergTable(ExternalTable dorisTable) {
        NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
        return tableEntry(nameMapping.getCtlId()).get(nameMapping).getIcebergTable();
    }

    public IcebergSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable) {
        NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
        IcebergTableCacheValue tableCacheValue = tableEntry(nameMapping.getCtlId()).get(nameMapping);
        return tableCacheValue.getSnapshotCacheValue(() -> loadSnapshot(dorisTable, tableCacheValue.getIcebergTable()));
    }

    public List<Snapshot> getSnapshotList(ExternalTable dorisTable) {
        Table icebergTable = getIcebergTable(dorisTable);
        List<Snapshot> snapshots = com.google.common.collect.Lists.newArrayList();
        com.google.common.collect.Iterables.addAll(snapshots, icebergTable.snapshots());
        return snapshots;
    }

    public View getIcebergView(ExternalTable dorisTable) {
        NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
        return viewEntry(nameMapping.getCtlId()).get(nameMapping);
    }

    public IcebergSchemaCacheValue getIcebergSchemaCacheValue(NameMapping nameMapping, long schemaId) {
        IcebergSchemaCacheKey key = new IcebergSchemaCacheKey(nameMapping, schemaId);
        SchemaCacheValue schemaCacheValue = schemaEntry(nameMapping.getCtlId()).get(key);
        return (IcebergSchemaCacheValue) schemaCacheValue;
    }

    public ManifestCacheValue getManifestCacheValue(ExternalTable dorisTable,
            org.apache.iceberg.ManifestFile manifest,
            Table icebergTable,
            Consumer<Boolean> cacheHitRecorder) {
        NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
        MetaCacheEntry<IcebergManifestEntryKey, ManifestCacheValue> manifestEntry =
                manifestEntry(nameMapping.getCtlId());
        IcebergManifestEntryKey key = IcebergManifestEntryKey.of(manifest);
        boolean hit = manifestEntry.getIfPresent(key) != null;
        if (cacheHitRecorder != null) {
            cacheHitRecorder.accept(hit);
        }
        return manifestEntry.get(key, ignored -> loadManifestCacheValue(manifest, icebergTable, key.getContent()));
    }

    @Override
    public void invalidateCatalog(long catalogId) {
        dropManifestFileIoCacheForCatalog(catalogId);
        super.invalidateCatalog(catalogId);
    }

    @Override
    public void invalidateCatalogEntries(long catalogId) {
        dropManifestFileIoCacheForCatalog(catalogId);
        super.invalidateCatalogEntries(catalogId);
    }

    @Override
    public void invalidateDb(long catalogId, String dbName) {
        tableEntry(catalogId).invalidateIf(key -> matchDb(key, catalogId, dbName));
        viewEntry(catalogId).invalidateIf(key -> matchDb(key, catalogId, dbName));
        schemaEntry(catalogId).invalidateIf(key -> matchDb(key.getNameMapping(), catalogId, dbName));
    }

    @Override
    public void invalidateTable(long catalogId, String dbName, String tableName) {
        tableEntry(catalogId).invalidateIf(key -> matchTable(key, catalogId, dbName, tableName));
        viewEntry(catalogId).invalidateIf(key -> matchTable(key, catalogId, dbName, tableName));
        schemaEntry(catalogId).invalidateIf(key -> matchTable(key.getNameMapping(), catalogId, dbName, tableName));
    }

    @Override
    public void invalidatePartitions(long catalogId, String dbName, String tableName, List<String> partitions) {
        invalidateTable(catalogId, dbName, tableName);
    }

    private MetaCacheEntry<NameMapping, IcebergTableCacheValue> tableEntry(long catalogId) {
        return entry(catalogId, tableEntryDef);
    }

    private MetaCacheEntry<NameMapping, View> viewEntry(long catalogId) {
        return entry(catalogId, viewEntryDef);
    }

    private MetaCacheEntry<IcebergManifestEntryKey, ManifestCacheValue> manifestEntry(long catalogId) {
        return entry(catalogId, manifestEntryDef);
    }

    private MetaCacheEntry<IcebergSchemaCacheKey, SchemaCacheValue> schemaEntry(long catalogId) {
        return entry(catalogId, schemaEntryDef);
    }

    private IcebergTableCacheValue loadTableCacheValue(NameMapping nameMapping) {
        CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId());
        if (catalog == null) {
            throw new RuntimeException(String.format("Cannot find catalog %d when loading table %s/%s.",
                    nameMapping.getCtlId(), nameMapping.getLocalDbName(), nameMapping.getLocalTblName()));
        }

        IcebergMetadataOps ops = resolveMetadataOps(catalog);
        try {
            Table table = ((ExternalCatalog) catalog).getExecutionAuthenticator()
                    .execute(() -> ops.loadTable(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName()));
            return new IcebergTableCacheValue(table);
        } catch (Exception e) {
            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
        }
    }

    private View loadView(NameMapping nameMapping) {
        CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId());
        if (!(catalog instanceof IcebergExternalCatalog)) {
            return null;
        }
        IcebergMetadataOps ops = (IcebergMetadataOps) (((IcebergExternalCatalog) catalog).getMetadataOps());
        try {
            return ((ExternalCatalog) catalog).getExecutionAuthenticator().execute(
                    () -> ops.loadView(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName()));
        } catch (Exception e) {
            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
        }
    }

    private ManifestCacheValue loadManifestCacheValue(IcebergManifestEntryKey key) {
        throw new CacheException("Manifest cache entry '%s' requires contextual loader for %s",
                null, ENTRY_MANIFEST, key.getManifestPath());
    }

    private ManifestCacheValue loadManifestCacheValue(org.apache.iceberg.ManifestFile manifest, Table icebergTable,
            ManifestContent content) {
        if (manifest == null || icebergTable == null) {
            String manifestPath = manifest == null ? "null" : manifest.path();
            throw new CacheException("Manifest cache loader context is missing for %s",
                    null, manifestPath);
        }
        try {
            if (content == ManifestContent.DELETES) {
                return ManifestCacheValue.forDeleteFiles(
                        loadDeleteFiles(manifest, icebergTable));
            }
            return ManifestCacheValue.forDataFiles(loadDataFiles(manifest, icebergTable));
        } catch (IOException e) {
            throw new CacheException("Failed to read manifest %s", e, manifest.path());
        }
    }

    private SchemaCacheValue loadSchemaCacheValue(IcebergSchemaCacheKey key) {
        ExternalTable dorisTable = findExternalTable(key.getNameMapping());
        return dorisTable.initSchemaAndUpdateTime(key).orElseThrow(() ->
                new CacheException("failed to load iceberg schema cache value for: %s.%s.%s, schemaId: %s",
                        null, key.getNameMapping().getCtlId(), key.getNameMapping().getLocalDbName(),
                        key.getNameMapping().getLocalTblName(), key.getSchemaId()));
    }

    private IcebergSnapshotCacheValue loadSnapshot(ExternalTable dorisTable, Table icebergTable) {
        if (!(dorisTable instanceof MTMVRelatedTableIf)) {
            throw new RuntimeException(String.format("Table %s.%s is not a valid MTMV related table.",
                    dorisTable.getDbName(), dorisTable.getName()));
        }
        try {
            MTMVRelatedTableIf table = (MTMVRelatedTableIf) dorisTable;
            IcebergSnapshot latestIcebergSnapshot = IcebergUtils.getLatestIcebergSnapshot(icebergTable);
            IcebergPartitionInfo icebergPartitionInfo;
            if (!table.isValidRelatedTable()) {
                icebergPartitionInfo = IcebergPartitionInfo.empty();
            } else {
                icebergPartitionInfo = IcebergUtils.loadPartitionInfo(dorisTable, icebergTable,
                        latestIcebergSnapshot.getSnapshotId(), latestIcebergSnapshot.getSchemaId());
            }
            return new IcebergSnapshotCacheValue(icebergPartitionInfo, latestIcebergSnapshot);
        } catch (AnalysisException e) {
            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
        }
    }

    private IcebergMetadataOps resolveMetadataOps(CatalogIf catalog) {
        if (catalog instanceof HMSExternalCatalog) {
            return ((HMSExternalCatalog) catalog).getIcebergMetadataOps();
        } else if (catalog instanceof IcebergExternalCatalog) {
            return (IcebergMetadataOps) (((IcebergExternalCatalog) catalog).getMetadataOps());
        }
        throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table");
    }

    @Override
    protected Map<String, String> catalogPropertyCompatibilityMap() {
        return Collections.singletonMap(
                ExternalCatalog.SCHEMA_CACHE_TTL_SECOND,
                "meta.cache." + ENGINE + "." + ENTRY_SCHEMA + ".ttl-second");
    }

    private List<org.apache.iceberg.DataFile> loadDataFiles(org.apache.iceberg.ManifestFile manifest, Table table)
            throws IOException {
        List<org.apache.iceberg.DataFile> dataFiles = com.google.common.collect.Lists.newArrayList();
        try (ManifestReader<org.apache.iceberg.DataFile> reader = ManifestFiles.read(manifest, table.io())) {
            for (org.apache.iceberg.DataFile dataFile : reader) {
                dataFiles.add(dataFile.copy());
            }
        }
        return dataFiles;
    }

    private List<org.apache.iceberg.DeleteFile> loadDeleteFiles(org.apache.iceberg.ManifestFile manifest, Table table)
            throws IOException {
        List<org.apache.iceberg.DeleteFile> deleteFiles = com.google.common.collect.Lists.newArrayList();
        try (ManifestReader<org.apache.iceberg.DeleteFile> reader = ManifestFiles.readDeleteManifest(manifest,
                table.io(), table.specs())) {
            for (org.apache.iceberg.DeleteFile deleteFile : reader) {
                deleteFiles.add(deleteFile.copy());
            }
        }
        return deleteFiles;
    }

    private void dropManifestFileIoCacheForCatalog(long catalogId) {
        tableEntry(catalogId).forEach((key, value) -> {
            if (key.getCtlId() == catalogId) {
                dropManifestFileIoCache(value);
            }
        });
    }

    private void dropManifestFileIoCache(IcebergTableCacheValue tableCacheValue) {
        try {
            ManifestFiles.dropCache(tableCacheValue.getIcebergTable().io());
        } catch (Exception e) {
            LOG.warn("Failed to drop iceberg manifest files cache", e);
        }
    }

    private ExternalTable findExternalTable(NameMapping nameMapping) {
        CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId());
        if (!(catalog instanceof ExternalCatalog)) {
            throw new CacheException("catalog %s is not external when loading iceberg schema cache",
                    null, nameMapping.getCtlId());
        }
        ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
        return externalCatalog.getDb(nameMapping.getLocalDbName())
                .flatMap(db -> db.getTable(nameMapping.getLocalTblName()))
                .orElseThrow(() -> new CacheException(
                        "table %s.%s.%s not found when loading iceberg schema cache",
                        null, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
                        nameMapping.getLocalTblName()));
    }

    private boolean matchDb(NameMapping nameMapping, long catalogId, String dbName) {
        return nameMapping.getCtlId() == catalogId && nameMapping.getLocalDbName().equals(dbName);
    }

    private boolean matchTable(NameMapping nameMapping, long catalogId, String dbName, String tableName) {
        return matchDb(nameMapping, catalogId, dbName) && nameMapping.getLocalTblName().equals(tableName);
    }
}