IcebergExternalMetaCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
import org.apache.doris.datasource.metacache.AbstractExternalMetaCache;
import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.metacache.MetaCacheEntry;
import org.apache.doris.datasource.metacache.MetaCacheEntryDef;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.view.View;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
/**
* Iceberg engine implementation of {@link AbstractExternalMetaCache}.
*
* <p>Registered entries:
* <ul>
* <li>{@code table}: loaded Iceberg {@link Table} instances per Doris table mapping</li>
* <li>{@code view}: loaded Iceberg {@link View} instances</li>
* <li>{@code manifest}: parsed manifest payload ({@link ManifestCacheValue}) keyed by
* manifest path and content type</li>
* <li>{@code schema}: schema cache keyed by table identity + schema id</li>
* </ul>
*
* <p>Manifest entry keys are path-based and intentionally not table-scoped. This allows
* shared manifests to reuse one cache entry across tables in the same catalog.
*
* <p>Invalidation behavior:
* <ul>
* <li>catalog invalidation clears all entries and drops Iceberg {@link ManifestFiles} IO cache</li>
* <li>db/table invalidation clears table/view/schema entries, while keeping manifest entries</li>
* <li>partition-level invalidation falls back to table-level invalidation</li>
* </ul>
*/
public class IcebergExternalMetaCache extends AbstractExternalMetaCache {
private static final Logger LOG = LogManager.getLogger(IcebergExternalMetaCache.class);
public static final String ENGINE = "iceberg";
public static final String ENTRY_TABLE = "table";
public static final String ENTRY_VIEW = "view";
public static final String ENTRY_MANIFEST = "manifest";
public static final String ENTRY_SCHEMA = "schema";
private static final CacheSpec SCHEMA_CACHE_SPEC = CacheSpec.fromTtlValue(
null, Config.external_cache_expire_time_seconds_after_access, Config.max_external_schema_cache_num);
private static final CacheSpec MANIFEST_CACHE_SPEC = CacheSpec.fromTtlValue(
String.valueOf(CacheSpec.CACHE_NO_TTL), CacheSpec.CACHE_NO_TTL, Config.max_external_table_cache_num);
private final MetaCacheEntryDef<NameMapping, IcebergTableCacheValue> tableEntryDef;
private final MetaCacheEntryDef<NameMapping, View> viewEntryDef;
private final MetaCacheEntryDef<IcebergManifestEntryKey, ManifestCacheValue> manifestEntryDef;
private final MetaCacheEntryDef<IcebergSchemaCacheKey, SchemaCacheValue> schemaEntryDef;
public IcebergExternalMetaCache(ExecutorService refreshExecutor) {
super(ENGINE, refreshExecutor);
tableEntryDef = MetaCacheEntryDef.of(ENTRY_TABLE, NameMapping.class, IcebergTableCacheValue.class,
this::loadTableCacheValue, DEFAULT_ENTRY_CACHE_SPEC);
viewEntryDef = MetaCacheEntryDef.of(ENTRY_VIEW, NameMapping.class, View.class, this::loadView,
DEFAULT_ENTRY_CACHE_SPEC);
manifestEntryDef = MetaCacheEntryDef.of(ENTRY_MANIFEST, IcebergManifestEntryKey.class,
ManifestCacheValue.class, this::loadManifestCacheValue, MANIFEST_CACHE_SPEC, false);
schemaEntryDef = MetaCacheEntryDef.of(ENTRY_SCHEMA, IcebergSchemaCacheKey.class, SchemaCacheValue.class,
this::loadSchemaCacheValue, SCHEMA_CACHE_SPEC);
registerMetaCacheEntryDef(tableEntryDef);
registerMetaCacheEntryDef(viewEntryDef);
registerMetaCacheEntryDef(manifestEntryDef);
registerMetaCacheEntryDef(schemaEntryDef);
}
public Table getIcebergTable(ExternalTable dorisTable) {
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
return tableEntry(nameMapping.getCtlId()).get(nameMapping).getIcebergTable();
}
public IcebergSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable) {
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
IcebergTableCacheValue tableCacheValue = tableEntry(nameMapping.getCtlId()).get(nameMapping);
return tableCacheValue.getSnapshotCacheValue(() -> loadSnapshot(dorisTable, tableCacheValue.getIcebergTable()));
}
public List<Snapshot> getSnapshotList(ExternalTable dorisTable) {
Table icebergTable = getIcebergTable(dorisTable);
List<Snapshot> snapshots = com.google.common.collect.Lists.newArrayList();
com.google.common.collect.Iterables.addAll(snapshots, icebergTable.snapshots());
return snapshots;
}
public View getIcebergView(ExternalTable dorisTable) {
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
return viewEntry(nameMapping.getCtlId()).get(nameMapping);
}
public IcebergSchemaCacheValue getIcebergSchemaCacheValue(NameMapping nameMapping, long schemaId) {
IcebergSchemaCacheKey key = new IcebergSchemaCacheKey(nameMapping, schemaId);
SchemaCacheValue schemaCacheValue = schemaEntry(nameMapping.getCtlId()).get(key);
return (IcebergSchemaCacheValue) schemaCacheValue;
}
public ManifestCacheValue getManifestCacheValue(ExternalTable dorisTable,
org.apache.iceberg.ManifestFile manifest,
Table icebergTable,
Consumer<Boolean> cacheHitRecorder) {
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
MetaCacheEntry<IcebergManifestEntryKey, ManifestCacheValue> manifestEntry =
manifestEntry(nameMapping.getCtlId());
IcebergManifestEntryKey key = IcebergManifestEntryKey.of(manifest);
boolean hit = manifestEntry.getIfPresent(key) != null;
if (cacheHitRecorder != null) {
cacheHitRecorder.accept(hit);
}
return manifestEntry.get(key, ignored -> loadManifestCacheValue(manifest, icebergTable, key.getContent()));
}
@Override
public void invalidateCatalog(long catalogId) {
dropManifestFileIoCacheForCatalog(catalogId);
super.invalidateCatalog(catalogId);
}
@Override
public void invalidateCatalogEntries(long catalogId) {
dropManifestFileIoCacheForCatalog(catalogId);
super.invalidateCatalogEntries(catalogId);
}
@Override
public void invalidateDb(long catalogId, String dbName) {
tableEntry(catalogId).invalidateIf(key -> matchDb(key, catalogId, dbName));
viewEntry(catalogId).invalidateIf(key -> matchDb(key, catalogId, dbName));
schemaEntry(catalogId).invalidateIf(key -> matchDb(key.getNameMapping(), catalogId, dbName));
}
@Override
public void invalidateTable(long catalogId, String dbName, String tableName) {
tableEntry(catalogId).invalidateIf(key -> matchTable(key, catalogId, dbName, tableName));
viewEntry(catalogId).invalidateIf(key -> matchTable(key, catalogId, dbName, tableName));
schemaEntry(catalogId).invalidateIf(key -> matchTable(key.getNameMapping(), catalogId, dbName, tableName));
}
@Override
public void invalidatePartitions(long catalogId, String dbName, String tableName, List<String> partitions) {
invalidateTable(catalogId, dbName, tableName);
}
private MetaCacheEntry<NameMapping, IcebergTableCacheValue> tableEntry(long catalogId) {
return entry(catalogId, tableEntryDef);
}
private MetaCacheEntry<NameMapping, View> viewEntry(long catalogId) {
return entry(catalogId, viewEntryDef);
}
private MetaCacheEntry<IcebergManifestEntryKey, ManifestCacheValue> manifestEntry(long catalogId) {
return entry(catalogId, manifestEntryDef);
}
private MetaCacheEntry<IcebergSchemaCacheKey, SchemaCacheValue> schemaEntry(long catalogId) {
return entry(catalogId, schemaEntryDef);
}
private IcebergTableCacheValue loadTableCacheValue(NameMapping nameMapping) {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId());
if (catalog == null) {
throw new RuntimeException(String.format("Cannot find catalog %d when loading table %s/%s.",
nameMapping.getCtlId(), nameMapping.getLocalDbName(), nameMapping.getLocalTblName()));
}
IcebergMetadataOps ops = resolveMetadataOps(catalog);
try {
Table table = ((ExternalCatalog) catalog).getExecutionAuthenticator()
.execute(() -> ops.loadTable(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName()));
return new IcebergTableCacheValue(table);
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
private View loadView(NameMapping nameMapping) {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId());
if (!(catalog instanceof IcebergExternalCatalog)) {
return null;
}
IcebergMetadataOps ops = (IcebergMetadataOps) (((IcebergExternalCatalog) catalog).getMetadataOps());
try {
return ((ExternalCatalog) catalog).getExecutionAuthenticator().execute(
() -> ops.loadView(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName()));
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
private ManifestCacheValue loadManifestCacheValue(IcebergManifestEntryKey key) {
throw new CacheException("Manifest cache entry '%s' requires contextual loader for %s",
null, ENTRY_MANIFEST, key.getManifestPath());
}
private ManifestCacheValue loadManifestCacheValue(org.apache.iceberg.ManifestFile manifest, Table icebergTable,
ManifestContent content) {
if (manifest == null || icebergTable == null) {
String manifestPath = manifest == null ? "null" : manifest.path();
throw new CacheException("Manifest cache loader context is missing for %s",
null, manifestPath);
}
try {
if (content == ManifestContent.DELETES) {
return ManifestCacheValue.forDeleteFiles(
loadDeleteFiles(manifest, icebergTable));
}
return ManifestCacheValue.forDataFiles(loadDataFiles(manifest, icebergTable));
} catch (IOException e) {
throw new CacheException("Failed to read manifest %s", e, manifest.path());
}
}
private SchemaCacheValue loadSchemaCacheValue(IcebergSchemaCacheKey key) {
ExternalTable dorisTable = findExternalTable(key.getNameMapping());
return dorisTable.initSchemaAndUpdateTime(key).orElseThrow(() ->
new CacheException("failed to load iceberg schema cache value for: %s.%s.%s, schemaId: %s",
null, key.getNameMapping().getCtlId(), key.getNameMapping().getLocalDbName(),
key.getNameMapping().getLocalTblName(), key.getSchemaId()));
}
private IcebergSnapshotCacheValue loadSnapshot(ExternalTable dorisTable, Table icebergTable) {
if (!(dorisTable instanceof MTMVRelatedTableIf)) {
throw new RuntimeException(String.format("Table %s.%s is not a valid MTMV related table.",
dorisTable.getDbName(), dorisTable.getName()));
}
try {
MTMVRelatedTableIf table = (MTMVRelatedTableIf) dorisTable;
IcebergSnapshot latestIcebergSnapshot = IcebergUtils.getLatestIcebergSnapshot(icebergTable);
IcebergPartitionInfo icebergPartitionInfo;
if (!table.isValidRelatedTable()) {
icebergPartitionInfo = IcebergPartitionInfo.empty();
} else {
icebergPartitionInfo = IcebergUtils.loadPartitionInfo(dorisTable, icebergTable,
latestIcebergSnapshot.getSnapshotId(), latestIcebergSnapshot.getSchemaId());
}
return new IcebergSnapshotCacheValue(icebergPartitionInfo, latestIcebergSnapshot);
} catch (AnalysisException e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
private IcebergMetadataOps resolveMetadataOps(CatalogIf catalog) {
if (catalog instanceof HMSExternalCatalog) {
return ((HMSExternalCatalog) catalog).getIcebergMetadataOps();
} else if (catalog instanceof IcebergExternalCatalog) {
return (IcebergMetadataOps) (((IcebergExternalCatalog) catalog).getMetadataOps());
}
throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table");
}
@Override
protected Map<String, String> catalogPropertyCompatibilityMap() {
return Collections.singletonMap(
ExternalCatalog.SCHEMA_CACHE_TTL_SECOND,
"meta.cache." + ENGINE + "." + ENTRY_SCHEMA + ".ttl-second");
}
private List<org.apache.iceberg.DataFile> loadDataFiles(org.apache.iceberg.ManifestFile manifest, Table table)
throws IOException {
List<org.apache.iceberg.DataFile> dataFiles = com.google.common.collect.Lists.newArrayList();
try (ManifestReader<org.apache.iceberg.DataFile> reader = ManifestFiles.read(manifest, table.io())) {
for (org.apache.iceberg.DataFile dataFile : reader) {
dataFiles.add(dataFile.copy());
}
}
return dataFiles;
}
private List<org.apache.iceberg.DeleteFile> loadDeleteFiles(org.apache.iceberg.ManifestFile manifest, Table table)
throws IOException {
List<org.apache.iceberg.DeleteFile> deleteFiles = com.google.common.collect.Lists.newArrayList();
try (ManifestReader<org.apache.iceberg.DeleteFile> reader = ManifestFiles.readDeleteManifest(manifest,
table.io(), table.specs())) {
for (org.apache.iceberg.DeleteFile deleteFile : reader) {
deleteFiles.add(deleteFile.copy());
}
}
return deleteFiles;
}
private void dropManifestFileIoCacheForCatalog(long catalogId) {
tableEntry(catalogId).forEach((key, value) -> {
if (key.getCtlId() == catalogId) {
dropManifestFileIoCache(value);
}
});
}
private void dropManifestFileIoCache(IcebergTableCacheValue tableCacheValue) {
try {
ManifestFiles.dropCache(tableCacheValue.getIcebergTable().io());
} catch (Exception e) {
LOG.warn("Failed to drop iceberg manifest files cache", e);
}
}
private ExternalTable findExternalTable(NameMapping nameMapping) {
CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId());
if (!(catalog instanceof ExternalCatalog)) {
throw new CacheException("catalog %s is not external when loading iceberg schema cache",
null, nameMapping.getCtlId());
}
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
return externalCatalog.getDb(nameMapping.getLocalDbName())
.flatMap(db -> db.getTable(nameMapping.getLocalTblName()))
.orElseThrow(() -> new CacheException(
"table %s.%s.%s not found when loading iceberg schema cache",
null, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
nameMapping.getLocalTblName()));
}
private boolean matchDb(NameMapping nameMapping, long catalogId, String dbName) {
return nameMapping.getCtlId() == catalogId && nameMapping.getLocalDbName().equals(dbName);
}
private boolean matchTable(NameMapping nameMapping, long catalogId, String dbName, String tableName) {
return matchDb(nameMapping, catalogId, dbName) && nameMapping.getLocalTblName().equals(tableName);
}
}