IcebergRestExternalCatalog.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg;

import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.catalog.MysqlDb;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.metastore.IcebergRestProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;

import com.google.common.collect.Lists;
import org.apache.iceberg.rest.RESTSessionCatalog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class IcebergRestExternalCatalog extends IcebergExternalCatalog implements IcebergUserSessionCatalog {

    private static final Logger LOG = LogManager.getLogger(IcebergRestExternalCatalog.class);

    public IcebergRestExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
            String comment) {
        super(catalogId, name, comment);
        catalogProperty = new CatalogProperty(resource, props);
    }

    @Override
    public void onClose() {
        super.onClose();
        // The default Catalog is RESTSessionCatalog.asCatalog(empty), which is not itself Closeable; the
        // closeable REST client/auth lives on the RESTSessionCatalog owned by IcebergRestProperties, so close
        // it here. For a REST catalog the metastore properties are always IcebergRestProperties; they are only
        // null before any properties have been parsed (e.g. closing a never-initialized catalog).
        MetastoreProperties metaProps = catalogProperty.getMetastoreProperties();
        if (metaProps != null) {
            ((IcebergRestProperties) metaProps).closeRestSessionCatalog();
        }
    }

    @Override
    public List<String> getDbNames() {
        SessionContext ctx = SessionContext.current();
        if (!shouldBypassDatabaseCache(ctx)) {
            return super.getDbNames();
        }
        makeSureInitialized();
        return listLocalDatabaseNamesWithoutCache(ctx);
    }

    @Override
    public List<String> getDbNamesOrEmpty() {
        SessionContext ctx = SessionContext.current();
        if (!shouldBypassDatabaseCache(ctx)) {
            return super.getDbNamesOrEmpty();
        }
        try {
            return getDbNames();
        } catch (Exception e) {
            LOG.warn("failed to get db names in catalog {}", getName(), e);
            return Collections.emptyList();
        }
    }

    @Override
    public ExternalDatabase<? extends ExternalTable> getDbNullable(String dbName) {
        if (dbName == null || dbName.isEmpty() || isSystemDatabase(dbName)) {
            return super.getDbNullable(dbName);
        }
        SessionContext ctx = SessionContext.current();
        if (!shouldBypassDatabaseCache(ctx)) {
            return super.getDbNullable(dbName);
        }
        try {
            makeSureInitialized();
        } catch (Exception e) {
            LOG.warn("failed to get db {} in catalog {}", dbName, getName(), e);
            return null;
        }
        return getDbNullableWithoutCache(ctx, dbName);
    }

    @Override
    protected boolean shouldBypassTableNameCache(SessionContext ctx) {
        return shouldBypassDatabaseCache(ctx);
    }

    @Override
    protected SessionContext getCatalogInitializationSessionContext() {
        SessionContext ctx = SessionContext.current();
        return shouldBypassDatabaseCache(ctx) ? ctx : SessionContext.empty();
    }

    protected List<String> listDatabaseNames(SessionContext ctx) {
        return ((IcebergMetadataOps) metadataOps).listDatabaseNames(ctx);
    }

    private ExternalDatabase<? extends ExternalTable> getDbNullableWithoutCache(SessionContext ctx,
            String requestedLocalDbName) {
        Optional<String> remoteDbName = resolveRemoteDatabaseName(ctx, requestedLocalDbName);
        if (!remoteDbName.isPresent()) {
            return null;
        }
        String localDbName = localDatabaseName(remoteDbName.get());
        return buildDbForInit(remoteDbName.get(), localDbName, Util.genIdByName(getName(), localDbName),
                InitCatalogLog.Type.ICEBERG, false);
    }

    private Optional<String> resolveRemoteDatabaseName(SessionContext ctx, String requestedLocalDbName) {
        return listFilteredRemoteDatabaseNames(ctx).stream()
                .filter(remoteDbName -> localDatabaseNameMatches(localDatabaseName(remoteDbName), requestedLocalDbName))
                .findFirst();
    }

    private List<String> listLocalDatabaseNamesWithoutCache(SessionContext ctx) {
        List<String> localDbNames = listFilteredRemoteDatabaseNames(ctx).stream()
                .map(this::localDatabaseName)
                .collect(Collectors.toCollection(Lists::newArrayList));
        // System databases are Doris-internal and always visible. The cached path
        // (ExternalCatalog#getFilteredDatabaseNames) appends them unconditionally, but that path is
        // bypassed for user-session metadata, so mirror the same injection here.
        localDbNames.remove(InfoSchemaDb.DATABASE_NAME);
        localDbNames.add(InfoSchemaDb.DATABASE_NAME);
        localDbNames.remove(MysqlDb.DATABASE_NAME);
        localDbNames.add(MysqlDb.DATABASE_NAME);
        return localDbNames;
    }

    private List<String> listFilteredRemoteDatabaseNames(SessionContext ctx) {
        Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
        Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
        return Lists.newArrayList(listDatabaseNames(ctx)).stream()
                .filter(dbName -> isDatabaseVisible(dbName, includeDatabaseMap, excludeDatabaseMap))
                .collect(Collectors.toList());
    }

    private boolean isDatabaseVisible(String dbName, Map<String, Boolean> includeDatabaseMap,
            Map<String, Boolean> excludeDatabaseMap) {
        if (excludeDatabaseMap.containsKey(dbName)) {
            return false;
        }
        return includeDatabaseMap.isEmpty() || includeDatabaseMap.containsKey(dbName);
    }

    private boolean localDatabaseNameMatches(String localDbName, String requestedLocalDbName) {
        if (getLowerCaseDatabaseNames() == 1) {
            return localDbName.equals(requestedLocalDbName.toLowerCase());
        }
        if (getLowerCaseDatabaseNames() == 2) {
            return localDbName.equalsIgnoreCase(requestedLocalDbName);
        }
        return localDbName.equals(requestedLocalDbName);
    }

    private String localDatabaseName(String remoteDbName) {
        String localDbName = fromRemoteDatabaseName(remoteDbName);
        if (getLowerCaseDatabaseNames() == 1) {
            return localDbName.toLowerCase();
        }
        if (getLowerCaseDatabaseNames() == 2) {
            return remoteDbName;
        }
        return localDbName;
    }

    private boolean isSystemDatabase(String dbName) {
        return dbName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME) || dbName.equalsIgnoreCase(MysqlDb.DATABASE_NAME);
    }

    private boolean shouldBypassDatabaseCache(SessionContext ctx) {
        // Bypassing the shared cache and using a per-user session catalog are the same condition.
        return useSessionCatalog(ctx);
    }

    /**
     * Whether the given request should use a per-user session catalog. This is the single source of truth for
     * that decision, shared by the cache-bypass logic above and by {@link IcebergMetadataOps} via
     * {@link IcebergUserSessionCatalog}.
     *
     * <p>Three outcomes:
     * <ul>
     *   <li>dynamic identity disabled: {@code false} — use the shared default path;</li>
     *   <li>dynamic identity enabled and the request carries a delegated credential: {@code true} — use the
     *       per-user session catalog;</li>
     *   <li>dynamic identity enabled but the request has <em>no</em> delegated credential: throw. A catalog
     *       configured with {@code iceberg.rest.session=user} has no shared/default identity to fall back on,
     *       and silently borrowing another request's token would leak credentials across users. So a session
     *       without a token (e.g. a password login) is rejected here rather than served by the default path.</li>
     * </ul>
     */
    @Override
    public boolean useSessionCatalog(SessionContext ctx) {
        if (!isIcebergRestUserSessionEnabled()) {
            return false;
        }
        if (ctx == null || !ctx.hasDelegatedCredential()) {
            throw new IllegalStateException("Catalog " + getName() + " is configured with dynamic identity "
                    + "(iceberg.rest.session=user) but the current session has no delegated credential. "
                    + "Access requires a token-based identity (e.g. OAuth/OIDC/JWT).");
        }
        return true;
    }

    @Override
    public RESTSessionCatalog getRestSessionCatalog() {
        IcebergRestProperties props = restProperties();
        return props == null ? null : props.getRestSessionCatalog();
    }

    @Override
    public IcebergRestProperties.DelegatedTokenMode getDelegatedTokenMode() {
        IcebergRestProperties props = restProperties();
        return props == null ? IcebergRestProperties.DelegatedTokenMode.ACCESS_TOKEN : props.getDelegatedTokenMode();
    }

    @Override
    public boolean isViewEnabled() {
        IcebergRestProperties props = restProperties();
        return props != null && props.isIcebergRestViewEnabled();
    }

    @Override
    public boolean isNestedNamespaceEnabled() {
        IcebergRestProperties props = restProperties();
        return props != null && props.isIcebergRestNestedNamespaceEnabled();
    }

    /**
     * Whether REST user-session mode is enabled for this catalog.
     *
     * <p>This is REST-specific behavior, so it lives on {@link IcebergRestExternalCatalog} rather than the
     * generic {@link IcebergExternalCatalog} base class. The decision is made purely from catalog
     * properties via {@code CatalogProperty#getMetastoreProperties()}, which lazily parses and never
     * returns null. It therefore does <em>not</em> force {@code makeSureInitialized()}, and is safe to
     * call before catalog initialization, e.g. from the cache-bypass decision in {@link #getDbNames()} /
     * {@link #getDbNullable(String)} which runs before initialization.
     */
    public boolean isIcebergRestUserSessionEnabled() {
        IcebergRestProperties props = restProperties();
        return props != null && props.isIcebergRestUserSessionEnabled();
    }

    private IcebergRestProperties restProperties() {
        MetastoreProperties metaProps = catalogProperty.getMetastoreProperties();
        return metaProps instanceof IcebergRestProperties ? (IcebergRestProperties) metaProps : null;
    }
}