IcebergRestExternalCatalog.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.catalog.MysqlDb;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.metastore.IcebergRestProperties;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
private static final Logger LOG = LogManager.getLogger(IcebergRestExternalCatalog.class);
public IcebergRestExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment) {
super(catalogId, name, comment);
catalogProperty = new CatalogProperty(resource, props);
}
@Override
public List<String> getDbNames() {
SessionContext ctx = SessionContext.current();
if (!shouldBypassDatabaseCache(ctx)) {
return super.getDbNames();
}
makeSureInitialized();
return listLocalDatabaseNamesWithoutCache(ctx);
}
@Override
public List<String> getDbNamesOrEmpty() {
SessionContext ctx = SessionContext.current();
if (!shouldBypassDatabaseCache(ctx)) {
return super.getDbNamesOrEmpty();
}
try {
return getDbNames();
} catch (Exception e) {
LOG.warn("failed to get db names in catalog {}", getName(), e);
return Collections.emptyList();
}
}
@Override
public ExternalDatabase<? extends ExternalTable> getDbNullable(String dbName) {
if (dbName == null || dbName.isEmpty() || isSystemDatabase(dbName)) {
return super.getDbNullable(dbName);
}
SessionContext ctx = SessionContext.current();
if (!shouldBypassDatabaseCache(ctx)) {
return super.getDbNullable(dbName);
}
try {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("failed to get db {} in catalog {}", dbName, getName(), e);
return null;
}
return getDbNullableWithoutCache(ctx, dbName);
}
@Override
protected boolean shouldBypassTableNameCache(SessionContext ctx) {
return shouldBypassDatabaseCache(ctx);
}
@Override
protected SessionContext getCatalogInitializationSessionContext() {
SessionContext ctx = SessionContext.current();
return shouldBypassDatabaseCache(ctx) ? ctx : SessionContext.empty();
}
protected List<String> listDatabaseNames(SessionContext ctx) {
return ((IcebergMetadataOps) metadataOps).listDatabaseNames(ctx);
}
private ExternalDatabase<? extends ExternalTable> getDbNullableWithoutCache(SessionContext ctx,
String requestedLocalDbName) {
Optional<String> remoteDbName = resolveRemoteDatabaseName(ctx, requestedLocalDbName);
if (!remoteDbName.isPresent()) {
return null;
}
String localDbName = localDatabaseName(remoteDbName.get());
return buildDbForInit(remoteDbName.get(), localDbName, Util.genIdByName(getName(), localDbName),
InitCatalogLog.Type.ICEBERG, false);
}
private Optional<String> resolveRemoteDatabaseName(SessionContext ctx, String requestedLocalDbName) {
return listFilteredRemoteDatabaseNames(ctx).stream()
.filter(remoteDbName -> localDatabaseNameMatches(localDatabaseName(remoteDbName), requestedLocalDbName))
.findFirst();
}
private List<String> listLocalDatabaseNamesWithoutCache(SessionContext ctx) {
List<String> localDbNames = listFilteredRemoteDatabaseNames(ctx).stream()
.map(this::localDatabaseName)
.collect(Collectors.toCollection(Lists::newArrayList));
// System databases are Doris-internal and always visible. The cached path
// (ExternalCatalog#getFilteredDatabaseNames) appends them unconditionally, but that path is
// bypassed for user-session metadata, so mirror the same injection here.
localDbNames.remove(InfoSchemaDb.DATABASE_NAME);
localDbNames.add(InfoSchemaDb.DATABASE_NAME);
localDbNames.remove(MysqlDb.DATABASE_NAME);
localDbNames.add(MysqlDb.DATABASE_NAME);
return localDbNames;
}
private List<String> listFilteredRemoteDatabaseNames(SessionContext ctx) {
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
return Lists.newArrayList(listDatabaseNames(ctx)).stream()
.filter(dbName -> isDatabaseVisible(dbName, includeDatabaseMap, excludeDatabaseMap))
.collect(Collectors.toList());
}
private boolean isDatabaseVisible(String dbName, Map<String, Boolean> includeDatabaseMap,
Map<String, Boolean> excludeDatabaseMap) {
if (excludeDatabaseMap.containsKey(dbName)) {
return false;
}
return includeDatabaseMap.isEmpty() || includeDatabaseMap.containsKey(dbName);
}
private boolean localDatabaseNameMatches(String localDbName, String requestedLocalDbName) {
if (getLowerCaseDatabaseNames() == 1) {
return localDbName.equals(requestedLocalDbName.toLowerCase());
}
if (getLowerCaseDatabaseNames() == 2) {
return localDbName.equalsIgnoreCase(requestedLocalDbName);
}
return localDbName.equals(requestedLocalDbName);
}
private String localDatabaseName(String remoteDbName) {
String localDbName = fromRemoteDatabaseName(remoteDbName);
if (getLowerCaseDatabaseNames() == 1) {
return localDbName.toLowerCase();
}
if (getLowerCaseDatabaseNames() == 2) {
return remoteDbName;
}
return localDbName;
}
private boolean isSystemDatabase(String dbName) {
return dbName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME) || dbName.equalsIgnoreCase(MysqlDb.DATABASE_NAME);
}
private boolean shouldBypassDatabaseCache(SessionContext ctx) {
return ctx != null && ctx.hasDelegatedCredential() && isIcebergRestUserSessionPropertyEnabled();
}
/**
* Whether REST user-session mode is enabled, decided purely from catalog properties.
*
* <p>Unlike {@link #isIcebergRestUserSessionEnabled()} (which calls {@code makeSureInitialized()}
* and reads the initialized metastore properties), this check is intentionally usable
* <em>before</em> catalog initialization: it drives the cache-bypass decision in
* {@link #getDbNames()} / {@link #getDbNullable(String)} which must run before
* {@code makeSureInitialized()}. {@code CatalogProperty#getMetastoreProperties()} lazily parses
* and never returns null, so this does not force catalog initialization.
*/
private boolean isIcebergRestUserSessionPropertyEnabled() {
return catalogProperty.getMetastoreProperties() instanceof IcebergRestProperties
&& ((IcebergRestProperties) catalogProperty.getMetastoreProperties()).isIcebergRestUserSessionEnabled();
}
}