CatalogMetaCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.metacache;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* Per-catalog cache container. One catalog can own multiple engine caches.
*/
public class CatalogMetaCache {
private final long catalogId;
private final Map<String, EngineMetaCache> engineCaches = new ConcurrentHashMap<>();
public CatalogMetaCache(long catalogId) {
this.catalogId = catalogId;
}
public long getCatalogId() {
return catalogId;
}
public EngineMetaCache getOrCreateEngineMetaCache(String engineType,
Supplier<? extends EngineMetaCache> cacheSupplier, ExternalCatalog catalog) {
String normalizedEngineType = normalizeEngineType(engineType);
ExternalCatalog nonNullCatalog = Objects.requireNonNull(catalog, "catalog cannot be null");
Objects.requireNonNull(cacheSupplier, "cacheSupplier cannot be null");
return engineCaches.computeIfAbsent(normalizedEngineType, key -> {
EngineMetaCache newCache = Objects.requireNonNull(cacheSupplier.get(), "cacheSupplier returned null");
try {
newCache.init(nonNullCatalog);
return newCache;
} catch (Exception e) {
try {
newCache.invalidateCatalog();
} catch (Exception cleanupException) {
e.addSuppressed(cleanupException);
}
throw new IllegalStateException(String.format("Failed to initialize engine cache: catalogId=%d,"
+ " engineType=%s", catalogId, normalizedEngineType), e);
}
});
}
public EngineMetaCache getEngineMetaCache(String engineType) {
return engineCaches.get(normalizeEngineType(engineType));
}
public EngineMetaCache removeEngineMetaCache(String engineType) {
return engineCaches.remove(normalizeEngineType(engineType));
}
public List<EngineMetaCache> getEngineMetaCaches() {
return ImmutableList.copyOf(engineCaches.values());
}
public void invalidateCatalog() {
engineCaches.values().forEach(EngineMetaCache::invalidateCatalog);
}
public void invalidateDb(String dbName) {
engineCaches.values().forEach(cache -> cache.invalidateDb(dbName));
}
public void invalidateTable(ExternalTable table) {
engineCaches.values().forEach(cache -> cache.invalidateTable(table));
}
private static String normalizeEngineType(String engineType) {
String trimmedEngineType = Objects.requireNonNull(engineType, "engineType cannot be null").trim();
if (trimmedEngineType.isEmpty()) {
throw new IllegalArgumentException("engineType cannot be empty");
}
return trimmedEngineType.toLowerCase(Locale.ROOT);
}
}