IcebergRestProperties.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.metastore;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.ParamRules;
import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.logging.log4j.util.Strings;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class IcebergRestProperties extends MetastoreProperties {
// REST catalog property constants
private static final String PREFIX_PROPERTY = "prefix";
private static final String VENDED_CREDENTIALS_HEADER = "header.X-Iceberg-Access-Delegation";
private static final String VENDED_CREDENTIALS_VALUE = "vended-credentials";
private Map<String, String> icebergRestCatalogProperties;
@ConnectorProperty(names = {"iceberg.rest.uri", "uri"},
description = "The uri of the iceberg rest catalog service.")
private String icebergRestUri = "";
@ConnectorProperty(names = {"iceberg.rest.prefix"},
required = false,
description = "The prefix of the iceberg rest catalog service.")
private String icebergRestPrefix = "";
@ConnectorProperty(names = {"iceberg.rest.warehouse", "warehouse"},
required = false,
description = "The warehouse of the iceberg rest catalog service.")
private String icebergRestWarehouse = "";
@ConnectorProperty(names = {"iceberg.rest.security.type"},
required = false,
description = "The security type of the iceberg rest catalog service,"
+ "optional: (none, oauth2), default: none.")
private String icebergRestSecurityType = "none";
@ConnectorProperty(names = {"iceberg.rest.session"},
required = false,
supported = false,
description = "The session type of the iceberg rest catalog service,"
+ "optional: (none, user), default: none.")
private String icebergRestSession = "none";
@ConnectorProperty(names = {"iceberg.rest.session-timeout"},
required = false,
supported = false,
description = "The session timeout of the iceberg rest catalog service.")
private String icebergRestSessionTimeout = "0";
@ConnectorProperty(names = {"iceberg.rest.oauth2.token"},
required = false,
description = "The oauth2 token for the iceberg rest catalog service.")
private String icebergRestOauth2Token;
@ConnectorProperty(names = {"iceberg.rest.oauth2.credential"},
required = false,
description = "The oauth2 credential for the iceberg rest catalog service.")
private String icebergRestOauth2Credential;
@ConnectorProperty(names = {"iceberg.rest.oauth2.scope"},
required = false,
description = "The oauth2 scope for the iceberg rest catalog service.")
private String icebergRestOauth2Scope;
@ConnectorProperty(names = {"iceberg.rest.oauth2.server-uri"},
required = false,
description = "The oauth2 server uri for fetching token.")
private String icebergRestOauth2ServerUri;
@ConnectorProperty(names = {"iceberg.rest.oauth2.token-refresh-enabled"},
required = false,
description = "Enable oauth2 token refresh for the iceberg rest catalog service.")
private String icebergRestOauth2TokenRefreshEnabled = "false";
@ConnectorProperty(names = {"iceberg.rest.vended-credentials-enabled"},
required = false,
description = "Enable vended credentials for the iceberg rest catalog service.")
private String icebergRestVendedCredentialsEnabled = "false";
@ConnectorProperty(names = {"iceberg.rest.nested-namespace-enabled"},
required = false,
supported = false,
description = "Enable nested namespace for the iceberg rest catalog service.")
private String icebergRestNestedNamespaceEnabled = "true";
@ConnectorProperty(names = {"iceberg.rest.case-insensitive-name-matching"},
required = false,
supported = false,
description = "Enable case insensitive name matching for the iceberg rest catalog service.")
private String icebergRestCaseInsensitiveNameMatching = "false";
@ConnectorProperty(names = {"iceberg.rest.case-insensitive-name-matching.cache-ttl"},
required = false,
supported = false,
description = "The cache TTL for case insensitive name matching in ms.")
private String icebergRestCaseInsensitiveNameMatchingCacheTtlMs = "0";
public IcebergRestProperties(Map<String, String> origProps) {
super(Type.ICEBERG_REST, origProps);
}
@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
validateSecurityType();
buildRules().validate();
initIcebergRestCatalogProperties();
}
@Override
protected void checkRequiredProperties() {
}
private void validateSecurityType() {
try {
Security.valueOf(icebergRestSecurityType.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid security type: " + icebergRestSecurityType
+ ". Supported values are: none, oauth2");
}
}
private ParamRules buildRules() {
ParamRules rules = new ParamRules()
// OAuth2 requires either credential or token, but not both
.mutuallyExclusive(icebergRestOauth2Credential, icebergRestOauth2Token,
"OAuth2 cannot have both credential and token configured")
// If using credential flow, server URI is required
.requireAllIfPresent(icebergRestOauth2Credential,
new String[] {icebergRestOauth2ServerUri},
"OAuth2 credential flow requires server-uri");
// Custom validation: OAuth2 scope should not be used with token
if (Strings.isNotBlank(icebergRestOauth2Token) && Strings.isNotBlank(icebergRestOauth2Scope)) {
throw new IllegalArgumentException("OAuth2 scope is only applicable when using credential, not token");
}
// Custom validation: If OAuth2 is enabled, require either credential or token
if ("oauth2".equalsIgnoreCase(icebergRestSecurityType)) {
boolean hasCredential = Strings.isNotBlank(icebergRestOauth2Credential);
boolean hasToken = Strings.isNotBlank(icebergRestOauth2Token);
if (!hasCredential && !hasToken) {
throw new IllegalArgumentException("OAuth2 requires either credential or token");
}
}
return rules;
}
private void initIcebergRestCatalogProperties() {
icebergRestCatalogProperties = new HashMap<>();
// Core catalog properties
addCoreCatalogProperties();
// Optional properties
addOptionalProperties();
// Authentication properties
addAuthenticationProperties();
}
private void addCoreCatalogProperties() {
// See CatalogUtil.java
icebergRestCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
// See CatalogProperties.java
icebergRestCatalogProperties.put(CatalogProperties.URI, icebergRestUri);
}
private void addOptionalProperties() {
if (Strings.isNotBlank(icebergRestPrefix)) {
icebergRestCatalogProperties.put(PREFIX_PROPERTY, icebergRestPrefix);
}
if (Strings.isNotBlank(icebergRestWarehouse)) {
icebergRestCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, icebergRestWarehouse);
}
if (isIcebergRestVendedCredentialsEnabled()) {
icebergRestCatalogProperties.put(VENDED_CREDENTIALS_HEADER, VENDED_CREDENTIALS_VALUE);
}
}
private void addAuthenticationProperties() {
Security security = Security.valueOf(icebergRestSecurityType.toUpperCase());
if (security == Security.OAUTH2) {
addOAuth2Properties();
}
}
private void addOAuth2Properties() {
if (Strings.isNotBlank(icebergRestOauth2Credential)) {
// Client Credentials Flow
icebergRestCatalogProperties.put(OAuth2Properties.CREDENTIAL, icebergRestOauth2Credential);
icebergRestCatalogProperties.put(OAuth2Properties.OAUTH2_SERVER_URI, icebergRestOauth2ServerUri);
if (Strings.isNotBlank(icebergRestOauth2Scope)) {
icebergRestCatalogProperties.put(OAuth2Properties.SCOPE, icebergRestOauth2Scope);
}
icebergRestCatalogProperties.put(OAuth2Properties.TOKEN_REFRESH_ENABLED,
icebergRestOauth2TokenRefreshEnabled);
} else {
// Pre-configured Token Flow
icebergRestCatalogProperties.put(OAuth2Properties.TOKEN, icebergRestOauth2Token);
}
}
public Map<String, String> getIcebergRestCatalogProperties() {
return Collections.unmodifiableMap(icebergRestCatalogProperties);
}
public boolean isIcebergRestVendedCredentialsEnabled() {
return Boolean.parseBoolean(icebergRestVendedCredentialsEnabled);
}
/**
* Unified method to configure FileIO properties for Iceberg catalog.
* This method handles all storage types (HDFS, S3, MinIO, etc.) and populates
* the fileIOProperties map and Configuration object accordingly.
*
* @param storagePropertiesMap Map of storage properties
* @param fileIOProperties Options map to be populated
* @param conf Configuration object to be populated (for HDFS), will be created if null and HDFS is used
*/
public void toFileIOProperties(Map<StorageProperties.Type, StorageProperties> storagePropertiesMap,
Map<String, String> fileIOProperties, Configuration conf) {
for (StorageProperties storageProperties : storagePropertiesMap.values()) {
if (storageProperties instanceof HdfsCompatibleProperties) {
storageProperties.getBackendConfigProperties().forEach(conf::set);
} else if (storageProperties instanceof AbstractS3CompatibleProperties) {
// For all S3-compatible storage types, put properties in fileIOProperties map
toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties);
} else {
// For other storage types, just use fileIOProperties map
fileIOProperties.putAll(storageProperties.getBackendConfigProperties());
}
}
}
/**
* Configure S3 FileIO properties for all S3-compatible storage types (S3, MinIO, etc.)
* This method provides a unified way to convert S3-compatible properties to Iceberg S3FileIO format.
*
* @param s3Properties S3-compatible properties
* @param options Options map to be populated with S3 FileIO properties
*/
public void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties, Map<String, String> options) {
// Common properties - only set if not blank
if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
options.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint());
}
if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
options.put(S3FileIOProperties.PATH_STYLE_ACCESS, s3Properties.getUsePathStyle());
}
if (StringUtils.isNotBlank(s3Properties.getRegion())) {
options.put(AwsClientProperties.CLIENT_REGION, s3Properties.getRegion());
}
if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
options.put(S3FileIOProperties.ACCESS_KEY_ID, s3Properties.getAccessKey());
}
if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
options.put(S3FileIOProperties.SECRET_ACCESS_KEY, s3Properties.getSecretKey());
}
if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
options.put(S3FileIOProperties.SESSION_TOKEN, s3Properties.getSessionToken());
}
}
public enum Security {
NONE,
OAUTH2,
}
}