AzureProperties.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.storage;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.ParamRules;
import org.apache.doris.datasource.property.storage.exception.AzureAuthType;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
/**
* AzureProperties is a specialized configuration class for accessing Azure Blob Storage
* using an S3-compatible interface.
*
* <p>This class extends {@link StorageProperties} and adapts Azure-specific properties
* to a format that is compatible with the backend engine (BE), which expects configurations
* similar to Amazon S3. This is necessary because the backend is designed to work with
* S3-style parameters regardless of the actual cloud provider.
*
* <p>Although Azure Blob Storage does not use all of the S3 parameters (e.g., region),
* this class maps and provides dummy or compatible values to satisfy the expected format.
* It also tags the provider as "azure" in the final configuration map.
*
* <p>The class supports common parameters like access key, secret key, endpoint, and
* path style access, while also ensuring compatibility with existing S3 processing
* logic by delegating some functionality to {@code S3PropertyUtils}.
*
* <p>Typical usage includes validation of required parameters, transformation to a
* backend-compatible configuration map, and conversion of URLs to storage paths.
*
* <p>Note: This class may evolve as the backend introduces native Azure support
* or adopts a more flexible configuration model.
*
* @see StorageProperties
* @see S3PropertyUtils
*/
public class AzureProperties extends StorageProperties {
@Getter
@ConnectorProperty(names = {"azure.endpoint", "s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT"},
description = "The endpoint of S3.")
protected String endpoint = "";
@Getter
@ConnectorProperty(names = {"azure.account_name", "azure.access_key", "s3.access_key",
"AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
required = false,
sensitive = true,
description = "The access key of S3.")
protected String accountName = "";
@Getter
@ConnectorProperty(names = {"azure.account_key", "azure.secret_key", "s3.secret_key",
"AWS_SECRET_KEY", "secret_key"},
sensitive = true,
required = false,
description = "The secret key of S3.")
protected String accountKey = "";
@ConnectorProperty(names = {"azure.oauth2_client_id"},
required = false,
description = "The client id of Azure AD application.")
private String clientId;
@ConnectorProperty(names = {"azure.oauth2_client_secret"},
required = false,
sensitive = true,
description = "The client secret of Azure AD application.")
private String clientSecret;
@ConnectorProperty(names = {"azure.oauth2_server_uri"},
required = false,
description = "The account host of Azure blob.")
private String oauthServerUri;
@ConnectorProperty(names = {"azure.oauth2_account_host"},
required = false,
description = "The account host of Azure blob.")
private String accountHost;
@ConnectorProperty(names = {"azure.auth_type"},
required = false,
description = "The auth type of Azure blob.")
private String azureAuthType = AzureAuthType.SharedKey.name();
@Getter
@ConnectorProperty(names = {"container", "azure.bucket", "s3.bucket"},
required = false,
description = "The container of Azure blob.")
protected String container = "";
/**
* Flag indicating whether to use path-style URLs for the object storage system.
* This value is optional and can be configured by the user.
*/
@Setter
@Getter
@ConnectorProperty(names = {"use_path_style", "s3.path-style-access"}, required = false,
description = "Whether to use path style URL for the storage.")
protected String usePathStyle = "false";
@ConnectorProperty(names = {"force_parsing_by_standard_uri"}, required = false,
description = "Whether to use path style URL for the storage.")
@Getter
protected String forceParsingByStandardUrl = "false";
public AzureProperties(Map<String, String> origProps) {
super(Type.AZURE, origProps);
}
private static final String AZURE_ENDPOINT_SUFFIX = ".blob.core.windows.net";
@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
//check endpoint
this.endpoint = formatAzureEndpoint(endpoint, accountName);
buildRules().validate();
if (AzureAuthType.OAuth2.name().equals(azureAuthType) && (!isIcebergRestCatalog())) {
throw new UnsupportedOperationException("OAuth2 auth type is only supported for iceberg rest catalog");
}
}
public static boolean guessIsMe(Map<String, String> origProps) {
boolean enable = origProps.containsKey(FS_PROVIDER_KEY)
&& origProps.get(FS_PROVIDER_KEY).equalsIgnoreCase("azure");
if (enable) {
return true;
}
String value = Stream.of("azure.endpoint", "s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT")
.map(origProps::get)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
if (!Strings.isNullOrEmpty(value)) {
return value.endsWith(AZURE_ENDPOINT_SUFFIX);
}
return false;
}
@Override
public Map<String, String> getBackendConfigProperties() {
if (!azureAuthType.equalsIgnoreCase("OAuth2")) {
Map<String, String> s3Props = new HashMap<>();
s3Props.put("AWS_ENDPOINT", endpoint);
s3Props.put("AWS_REGION", "dummy_region");
s3Props.put("AWS_ACCESS_KEY", accountName);
s3Props.put("AWS_SECRET_KEY", accountKey);
s3Props.put("AWS_NEED_OVERRIDE_ENDPOINT", "true");
s3Props.put("provider", "azure");
s3Props.put("use_path_style", usePathStyle);
return s3Props;
}
// oauth2 use hadoop config
Map<String, String> s3Props = new HashMap<>();
hadoopStorageConfig.forEach(entry -> {
String key = entry.getKey();
s3Props.put(key, entry.getValue());
});
return s3Props;
}
public static final String AZURE_ENDPOINT_TEMPLATE = "https://%s.blob.core.windows.net";
public static String formatAzureEndpoint(String endpoint, String accessKey) {
if (Config.force_azure_blob_global_endpoint) {
return String.format(AZURE_ENDPOINT_TEMPLATE, accessKey);
}
if (endpoint.contains("://")) {
return endpoint;
}
return "https://" + endpoint;
}
@Override
public String validateAndNormalizeUri(String url) throws UserException {
return AzurePropertyUtils.validateAndNormalizeUri(url);
}
@Override
public String validateAndGetUri(Map<String, String> loadProps) throws UserException {
return AzurePropertyUtils.validateAndGetUri(loadProps);
}
@Override
public String getStorageName() {
return "AZURE";
}
@Override
public void initializeHadoopStorageConfig() {
hadoopStorageConfig = new Configuration();
//disable azure cache
// Disable all Azure ABFS/WASB FileSystem caching to ensure fresh instances per configuration
for (String scheme : new String[]{"abfs", "abfss", "wasb", "wasbs"}) {
hadoopStorageConfig.set("fs." + scheme + ".impl.disable.cache", "true");
}
origProps.forEach((k, v) -> {
if (k.startsWith("fs.azure.")) {
hadoopStorageConfig.set(k, v);
}
});
if (azureAuthType != null && azureAuthType.equalsIgnoreCase("OAuth2")) {
setHDFSAzureOauth2Config(hadoopStorageConfig);
} else {
setHDFSAzureAccountKeys(hadoopStorageConfig, accountName, accountKey);
}
}
@Override
protected Set<String> schemas() {
return ImmutableSet.of("wasb", "wasbs", "abfs", "abfss");
}
private static void setHDFSAzureAccountKeys(Configuration conf, String accountName, String accountKey) {
String[] endpoints = {
"dfs.core.windows.net",
"blob.core.windows.net"
};
for (String endpoint : endpoints) {
String key = String.format("fs.azure.account.key.%s.%s", accountName, endpoint);
conf.set(key, accountKey);
}
conf.set("fs.azure.account.key", accountKey);
}
private void setHDFSAzureOauth2Config(Configuration conf) {
conf.set(String.format("fs.azure.account.auth.type.%s", accountHost), "OAuth");
conf.set(String.format("fs.azure.account.oauth.provider.type.%s", accountHost),
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider");
conf.set(String.format("fs.azure.account.oauth2.client.id.%s", accountHost), clientId);
conf.set(String.format("fs.azure.account.oauth2.client.secret.%s", accountHost), clientSecret);
conf.set(String.format("fs.azure.account.oauth2.client.endpoint.%s", accountHost), oauthServerUri);
}
private ParamRules buildRules() {
return new ParamRules()
// OAuth2 requires either credential or token, but not both
.requireIf(azureAuthType, AzureAuthType.OAuth2.name(), new String[]{accountHost,
clientId,
clientSecret,
oauthServerUri}, "When auth_type is OAuth2, oauth2_account_host, oauth2_client_id"
+ ", oauth2_client_secret, and oauth2_server_uri are required.")
.requireIf(azureAuthType, AzureAuthType.SharedKey.name(), new String[]{accountName, accountKey},
"When auth_type is SharedKey, account_name and account_key are required.");
}
// NB:Temporary check:
// Temporary check: Currently using OAuth2 for accessing Onalake storage via HDFS.
// In the future, OAuth2 will be supported via native SDK to reduce maintenance.
// For now, OAuth2 authentication is only allowed for Iceberg REST.
// TODO: Remove this temporary check later
private static final String ICEBERG_CATALOG_TYPE_KEY = "iceberg.catalog.type";
private static final String ICEBERG_CATALOG_TYPE_REST = "rest";
private static final String TYPE_KEY = "type";
private static final String ICEBERG_VALUE = "iceberg";
private boolean isIcebergRestCatalog() {
// check iceberg type
boolean hasIcebergType = origProps.entrySet().stream()
.anyMatch(entry -> TYPE_KEY.equalsIgnoreCase(entry.getKey())
&& ICEBERG_VALUE.equalsIgnoreCase(entry.getValue()));
if (!hasIcebergType && origProps.keySet().stream().anyMatch(TYPE_KEY::equalsIgnoreCase)) {
return false;
}
return origProps.entrySet().stream()
.anyMatch(entry -> ICEBERG_CATALOG_TYPE_KEY.equalsIgnoreCase(entry.getKey())
&& ICEBERG_CATALOG_TYPE_REST.equalsIgnoreCase(entry.getValue()));
}
}