ExternalMetaCacheMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Type;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.ClassLoaderUtils;
import org.apache.doris.datasource.doris.DorisExternalMetaCache;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveExternalMetaCache;
import org.apache.doris.datasource.hudi.HudiExternalMetaCache;
import org.apache.doris.datasource.iceberg.IcebergExternalMetaCache;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalMetaCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.metacache.AbstractExternalMetaCache;
import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.metacache.ExternalMetaCache;
import org.apache.doris.datasource.metacache.ExternalMetaCacheFactory;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.metacache.MetaCacheEntry;
import org.apache.doris.datasource.metacache.MetaCacheEntryDef;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonExternalMetaCache;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import javax.annotation.Nullable;
/**
* Cache meta of external catalog
* 1. Meta for hive meta store, mainly for partition.
* 2. Table Schema cache.
* 3. Row count cache.
*/
public class ExternalMetaCacheMgr {
private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class);
private static final String ENTRY_SCHEMA = "schema";
private static final String ENGINE_DEFAULT = "default";
private static final String ENGINE_HIVE = "hive";
private static final String ENGINE_HUDI = "hudi";
private static final String ENGINE_ICEBERG = "iceberg";
private static final String ENGINE_PAIMON = "paimon";
private static final String ENGINE_MAXCOMPUTE = "maxcompute";
private static final String ENGINE_DORIS = "doris";
/**
* Executors for loading caches
* 1. rowCountRefreshExecutor
* For row count cache.
* Row count cache is an async loading cache, and we can ignore the result
* if cache missing or thread pool is full.
* So use a separate executor for this cache.
* <p>
* 2. commonRefreshExecutor
* For other caches. Other caches are sync loading cache.
* But commonRefreshExecutor will be used for async refresh.
* That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously.
* if cache entry need refresh, it will be reloaded in commonRefreshExecutor.
* <p>
* 3. fileListingExecutor
* File listing is a heavy operation, so use a separate executor for it.
* For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh.
* And fileListingExecutor will be used to list file.
*/
private ExecutorService rowCountRefreshExecutor;
private ExecutorService commonRefreshExecutor;
private ExecutorService fileListingExecutor;
// This executor is used to schedule the getting split tasks
private ExecutorService scheduleExecutor;
private final ExternalMetaCacheFactory.Context factoryContext;
private final Map<String, ExternalMetaCache> engineCaches = Maps.newConcurrentMap();
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
private ExternalRowCountCache rowCountCache;
public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"RowCountRefreshExecutor", 0, true);
commonRefreshExecutor = newThreadPool(isCheckpointCatalog,
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 10000,
"CommonRefreshExecutor", 10, true);
// The queue size should be large enough,
// because there may be thousands of partitions being queried at the same time.
fileListingExecutor = newThreadPool(isCheckpointCatalog,
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"FileListingExecutor", 10, true);
scheduleExecutor = newThreadPool(isCheckpointCatalog,
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"scheduleExecutor", 10, true);
fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
factoryContext = new FactoryContext(commonRefreshExecutor, fileListingExecutor);
initEngineCaches();
}
private ExecutorService newThreadPool(boolean isCheckpointCatalog, int numThread, int queueSize,
String poolName, int timeoutSeconds,
boolean needRegisterMetric) {
String executorNamePrefix = isCheckpointCatalog ? "Checkpoint" : "NotCheckpoint";
String realPoolName = executorNamePrefix + poolName;
// Business threads require a fixed size thread pool and use queues to store unprocessed tasks.
// Checkpoint threads have almost no business and need to be released in a timely manner to avoid thread leakage
if (isCheckpointCatalog) {
return ThreadPoolManager.newDaemonCacheThreadPool(numThread, realPoolName, needRegisterMetric);
} else {
return ThreadPoolManager.newDaemonFixedThreadPool(numThread, queueSize, realPoolName, timeoutSeconds,
needRegisterMetric);
}
}
public ExecutorService getFileListingExecutor() {
return fileListingExecutor;
}
public ExecutorService getScheduleExecutor() {
return scheduleExecutor;
}
public ExternalMetaCache engine(String engine) {
Objects.requireNonNull(engine, "engine is null");
String normalizedEngine = normalizeEngineName(engine);
ExternalMetaCache found = engineCaches.get(normalizedEngine);
if (found != null) {
return found;
}
throw new IllegalArgumentException(
String.format("unsupported external meta cache engine '%s'", normalizedEngine));
}
public void prepareCatalog(long catalogId) {
Map<String, String> catalogProperties = getCatalogProperties(catalogId);
routeCatalogEngines(catalogId, cache -> cache.initCatalog(catalogId, catalogProperties));
}
public void prepareCatalogByEngine(long catalogId, String engine) {
prepareCatalogByEngine(catalogId, engine, getCatalogProperties(catalogId));
}
public void prepareCatalogByEngine(long catalogId, String engine, Map<String, String> catalogProperties) {
Map<String, String> safeCatalogProperties = catalogProperties == null
? Maps.newHashMap()
: Maps.newHashMap(catalogProperties);
routeSpecifiedEngine(engine, cache -> cache.initCatalog(catalogId, safeCatalogProperties));
}
public void invalidateCatalog(long catalogId) {
routeCatalogEngines(catalogId, cache -> safeInvalidate(
cache, catalogId, "invalidateCatalog",
() -> cache.invalidateCatalogEntries(catalogId)));
}
public void invalidateCatalogEntries(long catalogId) {
invalidateCatalog(catalogId);
}
public void invalidateCatalogByEngine(long catalogId, String engine) {
routeSpecifiedEngine(engine, cache -> safeInvalidate(
cache, catalogId, "invalidateCatalogByEngine",
() -> cache.invalidateCatalogEntries(catalogId)));
}
public void removeCatalog(long catalogId) {
routeCatalogEngines(catalogId, cache -> safeInvalidate(
cache, catalogId, "removeCatalog",
() -> cache.invalidateCatalog(catalogId)));
}
public void removeCatalogByEngine(long catalogId, String engine) {
routeSpecifiedEngine(engine, cache -> safeInvalidate(
cache, catalogId, "removeCatalogByEngine",
() -> cache.invalidateCatalog(catalogId)));
}
public void invalidateDb(long catalogId, String dbName) {
String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
routeCatalogEngines(catalogId, cache -> safeInvalidate(
cache, catalogId, "invalidateDb", () -> cache.invalidateDb(catalogId, normalizedDbName)));
}
public void invalidateTable(long catalogId, String dbName, String tableName) {
String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
routeCatalogEngines(catalogId, cache -> safeInvalidate(
cache, catalogId, "invalidateTable",
() -> cache.invalidateTable(catalogId, normalizedDbName, tableName)));
}
public void invalidateTableByEngine(long catalogId, String engine, String dbName, String tableName) {
String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
routeSpecifiedEngine(engine, cache -> safeInvalidate(
cache, catalogId, "invalidateTableByEngine",
() -> cache.invalidateTable(catalogId, normalizedDbName, tableName)));
}
public void invalidatePartitions(long catalogId,
String dbName, String tableName, List<String> partitions) {
String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
routeCatalogEngines(catalogId, cache -> safeInvalidate(
cache, catalogId, "invalidatePartitions",
() -> cache.invalidatePartitions(catalogId, normalizedDbName, tableName, partitions)));
}
public Map<String, Map<String, String>> getCatalogCacheStats(long catalogId) {
Map<String, Map<String, String>> stats = Maps.newHashMap();
engineCaches.forEach((engineName, externalMetaCache) -> externalMetaCache.stats(catalogId)
.forEach((entryName, entryStats) -> stats.put(engineName + "." + entryName, entryStats)));
return stats;
}
private void initEngineCaches() {
registerBuiltinEngineCaches();
loadSpiEngineCaches();
}
private void registerBuiltinEngineCaches() {
registerEngineCache(new DefaultExternalMetaCache(ENGINE_DEFAULT, commonRefreshExecutor), "builtin");
registerEngineCache(new HiveExternalMetaCache(commonRefreshExecutor, fileListingExecutor), "builtin");
registerEngineCache(new HudiExternalMetaCache(commonRefreshExecutor), "builtin");
registerEngineCache(new IcebergExternalMetaCache(commonRefreshExecutor), "builtin");
registerEngineCache(new PaimonExternalMetaCache(commonRefreshExecutor), "builtin");
registerEngineCache(new MaxComputeExternalMetaCache(commonRefreshExecutor), "builtin");
registerEngineCache(new DorisExternalMetaCache(commonRefreshExecutor), "builtin");
}
private void loadSpiEngineCaches() {
loadSpiEngineCaches(ServiceLoader.load(ExternalMetaCacheFactory.class), "classpath");
try {
loadSpiEngineCaches(ClassLoaderUtils.loadServicesFromDirectory(ExternalMetaCacheFactory.class),
Config.external_meta_cache_plugins_dir);
} catch (IOException e) {
LOG.warn("failed to load external meta cache SPI factories from directory {}",
Config.external_meta_cache_plugins_dir, e);
}
}
private void loadSpiEngineCaches(Iterable<ExternalMetaCacheFactory> factories, String source) {
for (ExternalMetaCacheFactory factory : factories) {
registerSpiFactory(factory, source);
}
}
private void registerSpiFactory(ExternalMetaCacheFactory factory, String source) {
String engineName = normalizeEngineName(factory.engine());
ExternalMetaCache cache;
try {
cache = factory.create(factoryContext);
} catch (Exception e) {
LOG.warn("failed to initialize external meta cache SPI factory {} for engine {}",
factory.getClass().getName(), engineName, e);
return;
}
if (cache == null) {
LOG.warn("external meta cache SPI factory {} returns null cache for engine {}",
factory.getClass().getName(), engineName);
return;
}
registerEngineCache(cache, "spi(" + source + ")");
}
private void registerEngineCache(ExternalMetaCache cache, String source) {
String engineName = normalizeEngineName(cache.engine());
ExternalMetaCache existing = engineCaches.putIfAbsent(engineName, cache);
if (existing != null) {
LOG.warn("skip duplicated external meta cache engine '{}' from {}, existing class: {}, new class: {}",
engineName, source, existing.getClass().getName(), cache.getClass().getName());
return;
}
if ("builtin".equals(source)) {
LOG.debug("registered external meta cache engine '{}' from {}", engineName, source);
} else {
LOG.info("registered external meta cache engine '{}' from {}", engineName, source);
}
}
private static String normalizeEngineName(String engine) {
if (engine == null) {
return ENGINE_DEFAULT;
}
String normalized = engine.trim().toLowerCase(Locale.ROOT);
if (normalized.isEmpty()) {
return ENGINE_DEFAULT;
}
switch (normalized) {
case "hms":
return ENGINE_HIVE;
case "external_doris":
return ENGINE_DORIS;
case "max_compute":
return ENGINE_MAXCOMPUTE;
default:
return normalized;
}
}
private void routeCatalogEngines(long catalogId, Consumer<ExternalMetaCache> action) {
engineCaches.values().forEach(action);
}
private void routeSpecifiedEngine(String engine, Consumer<ExternalMetaCache> action) {
action.accept(this.engine(engine));
}
private void safeInvalidate(ExternalMetaCache cache, long catalogId, String operation, Runnable action) {
try {
cache.checkCatalogInitialized(catalogId);
action.run();
} catch (IllegalStateException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip {} for catalog {} on engine '{}' because cache entry is absent",
operation, catalogId, cache.engine());
}
}
}
private static class FactoryContext implements ExternalMetaCacheFactory.Context {
private final ExecutorService refreshExecutor;
private final ExecutorService fileListingExecutor;
private FactoryContext(ExecutorService refreshExecutor, ExecutorService fileListingExecutor) {
this.refreshExecutor = refreshExecutor;
this.fileListingExecutor = fileListingExecutor;
}
@Override
public ExecutorService refreshExecutor() {
return refreshExecutor;
}
@Override
public ExecutorService fileListingExecutor() {
return fileListingExecutor;
}
}
private Map<String, String> getCatalogProperties(long catalogId) {
CatalogIf<?> catalog = getCatalog(catalogId);
if (catalog == null || catalog.getProperties() == null) {
return Maps.newHashMap();
}
return Maps.newHashMap(catalog.getProperties());
}
@Nullable
private CatalogIf<?> getCatalog(long catalogId) {
if (Env.getCurrentEnv() == null || Env.getCurrentEnv().getCatalogMgr() == null) {
return null;
}
return Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
}
public HiveExternalMetaCache getMetaStoreCache() {
return getHiveExternalMetaCache();
}
@SuppressWarnings("unchecked")
public Optional<SchemaCacheValue> getSchemaCacheValue(ExternalTable table, SchemaCacheKey key) {
long catalogId = table.getCatalog().getId();
Class<SchemaCacheKey> keyType = (Class<SchemaCacheKey>) key.getClass();
String resolvedEngine = table.getMetaCacheEngine();
return Optional.ofNullable(engine(resolvedEngine)
.entry(catalogId, ENTRY_SCHEMA, keyType, SchemaCacheValue.class)
.get(key));
}
public HudiExternalMetaCache getHudiExternalMetaCache() {
return (HudiExternalMetaCache) engine(ENGINE_HUDI);
}
public HiveExternalMetaCache getHiveExternalMetaCache() {
return (HiveExternalMetaCache) engine(ENGINE_HIVE);
}
public IcebergExternalMetaCache getIcebergExternalMetaCache() {
return (IcebergExternalMetaCache) engine(ENGINE_ICEBERG);
}
public PaimonExternalMetaCache getPaimonExternalMetaCache() {
return (PaimonExternalMetaCache) engine(ENGINE_PAIMON);
}
public MaxComputeExternalMetaCache getMaxComputeExternalMetaCache() {
return (MaxComputeExternalMetaCache) engine(ENGINE_MAXCOMPUTE);
}
public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
return getMaxComputeExternalMetaCache().getMaxComputeMetadataCache(catalogId);
}
public FileSystemCache getFsCache() {
return fsCache;
}
public ExternalRowCountCache getRowCountCache() {
return rowCountCache;
}
public DorisExternalMetaCache getDorisExternalMetaCache() {
return (DorisExternalMetaCache) engine(ENGINE_DORIS);
}
public void invalidateTableCache(ExternalTable dorisTable) {
invalidateTable(dorisTable.getCatalog().getId(),
ClusterNamespace.getNameFromFullName(dorisTable.getDbName()),
dorisTable.getName());
if (LOG.isDebugEnabled()) {
LOG.debug("invalid table cache for {}.{} in catalog {}", dorisTable.getRemoteDbName(),
dorisTable.getRemoteName(), dorisTable.getCatalog().getName());
}
}
public void invalidateDbCache(long catalogId, String dbName) {
invalidateDb(catalogId, dbName);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
}
}
public void invalidateCatalogCache(long catalogId) {
// Refresh flows should clear cached data but keep per-engine catalog entry groups.
invalidateCatalog(catalogId);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid catalog cache for {}", catalogId);
}
}
public void addPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveExternalMetaCache metaCache = getHiveExternalMetaCache();
List<Type> partitionColumnTypes;
try {
partitionColumnTypes = table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table));
} catch (NotSupportedException e) {
LOG.warn("Ignore not supported hms table, message: {} ", e.getMessage());
return;
}
metaCache.addPartitionsCache(table.getOrBuildNameMapping(), partitionNames, partitionColumnTypes);
if (LOG.isDebugEnabled()) {
LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
}
}
public void dropPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveExternalMetaCache metaCache = getHiveExternalMetaCache();
metaCache.dropPartitionsCache(table, partitionNames, true);
if (LOG.isDebugEnabled()) {
LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
}
}
public void invalidatePartitionsCache(ExternalTable dorisTable, List<String> partitionNames) {
HiveExternalMetaCache metaCache = getHiveExternalMetaCache();
for (String partitionName : partitionNames) {
metaCache.invalidatePartitionCache(dorisTable, partitionName);
}
if (LOG.isDebugEnabled()) {
LOG.debug("invalidate partition cache for {}.{} in catalog {}",
dorisTable.getDbName(), dorisTable.getName(), dorisTable.getCatalog().getName());
}
}
public <T> MetaCache<T> buildMetaCache(String name,
OptionalLong expireAfterAccessSec, OptionalLong refreshAfterWriteSec, long maxSize,
CacheLoader<String, List<Pair<String, String>>> namesCacheLoader,
CacheLoader<String, Optional<T>> metaObjCacheLoader,
RemovalListener<String, Optional<T>> removalListener) {
MetaCache<T> metaCache = new MetaCache<>(
name, commonRefreshExecutor, expireAfterAccessSec, refreshAfterWriteSec,
maxSize, namesCacheLoader, metaObjCacheLoader, removalListener);
return metaCache;
}
public static Map<String, String> getCacheStats(CacheStats cacheStats, long estimatedSize) {
Map<String, String> stats = Maps.newHashMap();
stats.put("hit_ratio", String.valueOf(cacheStats.hitRate()));
stats.put("hit_count", String.valueOf(cacheStats.hitCount()));
stats.put("read_count", String.valueOf(cacheStats.hitCount() + cacheStats.missCount()));
stats.put("eviction_count", String.valueOf(cacheStats.evictionCount()));
stats.put("average_load_penalty", String.valueOf(cacheStats.averageLoadPenalty()));
stats.put("estimated_size", String.valueOf(estimatedSize));
return stats;
}
/**
* Fallback implementation of {@link AbstractExternalMetaCache} for engines that do not
* provide dedicated cache entries.
*
* <p>Registered entries:
* <ul>
* <li>{@code schema}: schema-only cache keyed by {@link SchemaCacheKey}</li>
* </ul>
*
* <p>This class keeps compatibility for generic external engines and routes only schema
* loading/invalidation. No engine-specific metadata (partitions/files/snapshots) is cached.
*/
private static class DefaultExternalMetaCache extends AbstractExternalMetaCache {
private static final CacheSpec SCHEMA_CACHE_SPEC = CacheSpec.fromTtlValue(
null, Config.external_cache_expire_time_seconds_after_access, Config.max_external_schema_cache_num);
private final MetaCacheEntryDef<SchemaCacheKey, SchemaCacheValue> schemaEntryDef;
DefaultExternalMetaCache(String engine, ExecutorService refreshExecutor) {
super(engine, refreshExecutor);
schemaEntryDef = MetaCacheEntryDef.of(
ENTRY_SCHEMA,
SchemaCacheKey.class,
SchemaCacheValue.class,
this::loadSchemaCacheValue,
SCHEMA_CACHE_SPEC);
registerMetaCacheEntryDef(schemaEntryDef);
}
@Override
protected Map<String, String> catalogPropertyCompatibilityMap() {
return Collections.singletonMap(
ExternalCatalog.SCHEMA_CACHE_TTL_SECOND,
"meta.cache." + engine() + "." + ENTRY_SCHEMA + ".ttl-second");
}
@Override
public void invalidateDb(long catalogId, String dbName) {
schemaEntry(catalogId).invalidateIf(key -> matchDb(key, catalogId, dbName));
}
@Override
public void invalidateTable(long catalogId, String dbName, String tableName) {
schemaEntry(catalogId).invalidateIf(key -> matchTable(key, catalogId, dbName, tableName));
}
@Override
public void invalidatePartitions(long catalogId, String dbName, String tableName, List<String> partitions) {
invalidateTable(catalogId, dbName, tableName);
}
private MetaCacheEntry<SchemaCacheKey, SchemaCacheValue> schemaEntry(long catalogId) {
return entry(catalogId, schemaEntryDef);
}
private boolean matchDb(SchemaCacheKey key, long catalogId, String dbName) {
return key.getNameMapping().getCtlId() == catalogId
&& (key.getNameMapping().getLocalDbName().equals(dbName)
|| ClusterNamespace.getNameFromFullName(key.getNameMapping().getLocalDbName()).equals(dbName));
}
private boolean matchTable(SchemaCacheKey key, long catalogId, String dbName, String tableName) {
return matchDb(key, catalogId, dbName)
&& key.getNameMapping().getLocalTblName().equals(tableName);
}
private SchemaCacheValue loadSchemaCacheValue(SchemaCacheKey key) {
CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(key.getNameMapping().getCtlId());
if (!(catalog instanceof ExternalCatalog)) {
throw new CacheException("catalog %s is not external when loading schema cache",
null, key.getNameMapping().getCtlId());
}
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
return externalCatalog.getSchema(key).orElseThrow(() -> new CacheException(
"failed to load schema cache value for: %s.%s.%s",
null, key.getNameMapping().getCtlId(),
key.getNameMapping().getLocalDbName(),
key.getNameMapping().getLocalTblName()));
}
}
}