IcebergSessionCatalogAdapter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg;
import org.apache.doris.datasource.DelegatedCredential;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.metastore.IcebergRestProperties.DelegatedTokenMode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.BaseSessionCatalog;
import org.apache.iceberg.catalog.BaseViewSessionCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
class IcebergSessionCatalogAdapter {
private static final String SESSION_CATALOG_FIELD = "sessionCatalog";
private final Catalog catalog;
private final Optional<BaseSessionCatalog> sessionCatalog;
private final DelegatedTokenMode delegatedTokenMode;
IcebergSessionCatalogAdapter(Catalog catalog) {
this(catalog, DelegatedTokenMode.ACCESS_TOKEN);
}
IcebergSessionCatalogAdapter(Catalog catalog, DelegatedTokenMode delegatedTokenMode) {
this.catalog = catalog;
this.sessionCatalog = extractSessionCatalog(catalog);
this.delegatedTokenMode = delegatedTokenMode;
}
Catalog catalog(SessionContext context) {
if (!hasDelegatedCredential(context)) {
return catalog;
}
BaseSessionCatalog activeSessionCatalog = requireSessionCatalog();
return activeSessionCatalog.asCatalog(toIcebergSessionContext(context, delegatedTokenMode));
}
SupportsNamespaces namespaces(SessionContext context) {
return (SupportsNamespaces) catalog(context);
}
Catalog delegatedCatalog(SessionContext context) {
return requireSessionCatalog().asCatalog(toIcebergSessionContext(
requireDelegatedCredential(context), delegatedTokenMode));
}
SupportsNamespaces delegatedNamespaces(SessionContext context) {
return (SupportsNamespaces) delegatedCatalog(context);
}
Optional<ViewCatalog> delegatedViewCatalog(SessionContext context) {
BaseSessionCatalog activeSessionCatalog = requireSessionCatalog();
if (activeSessionCatalog instanceof BaseViewSessionCatalog) {
return Optional.of(((BaseViewSessionCatalog) activeSessionCatalog)
.asViewCatalog(toIcebergSessionContext(requireDelegatedCredential(context), delegatedTokenMode)));
}
requireDelegatedCredential(context);
return Optional.empty();
}
Optional<ViewCatalog> viewCatalog(SessionContext context) {
if (!hasDelegatedCredential(context)) {
return catalog instanceof ViewCatalog ? Optional.of((ViewCatalog) catalog) : Optional.empty();
}
BaseSessionCatalog sessionCatalog = requireSessionCatalog();
if (sessionCatalog instanceof BaseViewSessionCatalog) {
return Optional.of(((BaseViewSessionCatalog) sessionCatalog)
.asViewCatalog(toIcebergSessionContext(context, delegatedTokenMode)));
}
return Optional.empty();
}
private Optional<BaseSessionCatalog> extractSessionCatalog(Catalog catalog) {
Class<?> clazz = catalog.getClass();
while (clazz != null && clazz != Object.class) {
try {
Field field = clazz.getDeclaredField(SESSION_CATALOG_FIELD);
field.setAccessible(true);
Object value = field.get(catalog);
if (value instanceof BaseSessionCatalog) {
return Optional.of((BaseSessionCatalog) value);
}
throw new IllegalStateException("Iceberg REST sessionCatalog field is not a BaseSessionCatalog");
} catch (NoSuchFieldException e) {
clazz = clazz.getSuperclass();
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to access Iceberg REST sessionCatalog field", e);
}
}
return Optional.empty();
}
@VisibleForTesting
static org.apache.iceberg.catalog.SessionCatalog.SessionContext toIcebergSessionContext(
SessionContext context) {
return toIcebergSessionContext(context, DelegatedTokenMode.ACCESS_TOKEN);
}
@VisibleForTesting
static org.apache.iceberg.catalog.SessionCatalog.SessionContext toIcebergSessionContext(
SessionContext context, DelegatedTokenMode delegatedTokenMode) {
Map<String, String> credentials = ImmutableMap.of();
if (context.getDelegatedCredential().isPresent()) {
credentials = toIcebergCredentials(context.getDelegatedCredential().get(), delegatedTokenMode);
}
return new org.apache.iceberg.catalog.SessionCatalog.SessionContext(
context.getSessionId(), null, credentials, ImmutableMap.of());
}
private BaseSessionCatalog requireSessionCatalog() {
if (!sessionCatalog.isPresent()) {
throw new IllegalStateException("Iceberg REST user session requires a session-aware Iceberg catalog");
}
return sessionCatalog.get();
}
private static SessionContext requireDelegatedCredential(SessionContext context) {
if (!hasDelegatedCredential(context)) {
throw new IllegalStateException("Iceberg REST user session requires delegated credential");
}
return context;
}
private static Map<String, String> toIcebergCredentials(
DelegatedCredential credential, DelegatedTokenMode delegatedTokenMode) {
if (delegatedTokenMode == DelegatedTokenMode.ACCESS_TOKEN) {
return ImmutableMap.of(OAuth2Properties.TOKEN, credential.getToken());
}
return ImmutableMap.of(credential.getIcebergCredentialKey(), credential.getToken());
}
private static boolean hasDelegatedCredential(SessionContext context) {
return context != null && context.hasDelegatedCredential();
}
}