PropertyConverter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property;
import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.InitCatalogLog.Type;
import org.apache.doris.datasource.property.constants.AzureProperties;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.GCSProperties;
import org.apache.doris.datasource.property.constants.MinioProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.CosFileSystem;
import org.apache.hadoop.fs.CosNConfigKeys;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.obs.OBSConstants;
import org.apache.hadoop.fs.obs.OBSFileSystem;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
public class PropertyConverter {
private static final Logger LOG = LogManager.getLogger(PropertyConverter.class);
public static final String USE_PATH_STYLE = "use_path_style";
/**
* Convert properties defined at doris to FE S3 client properties
* Support other cloud client here.
*/
public static Map<String, String> convertToHadoopFSProperties(Map<String, String> props) {
if (props.containsKey(ObsProperties.ENDPOINT)) {
return convertToOBSProperties(props, ObsProperties.getCredential(props));
} else if (props.containsKey(GCSProperties.ENDPOINT)) {
return convertToGCSProperties(props, GCSProperties.getCredential(props));
} else if (props.containsKey(OssProperties.ENDPOINT)) {
return convertToOSSProperties(props, OssProperties.getCredential(props));
} else if (props.containsKey(CosProperties.ENDPOINT)) {
return convertToCOSProperties(props, CosProperties.getCredential(props));
} else if (props.containsKey(MinioProperties.ENDPOINT)) {
return convertToMinioProperties(props, MinioProperties.getCredential(props));
} else if (props.containsKey(AzureProperties.ENDPOINT)) {
return convertToAzureProperties(props, AzureProperties.getCredential(props));
} else if (props.containsKey(S3Properties.ENDPOINT)) {
CloudCredential s3Credential = S3Properties.getCredential(props);
Map<String, String> s3Properties = convertToS3Properties(props, s3Credential);
String s3CliEndpoint = props.get(S3Properties.ENDPOINT);
return convertToCompatibleS3Properties(props, s3CliEndpoint, s3Credential, s3Properties);
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
// checkout env in the end
// compatible with the s3,obs,oss,cos when they use aws client.
CloudCredentialWithEndpoint envCredentials = S3Properties.getEnvironmentCredentialWithEndpoint(props);
Map<String, String> s3Properties = convertToS3EnvProperties(props, envCredentials, false);
String s3CliEndpoint = envCredentials.getEndpoint();
return convertToCompatibleS3Properties(props, s3CliEndpoint, envCredentials, s3Properties);
}
return props;
}
private static Map<String, String> convertToAzureProperties(Map<String, String> props, CloudCredential credential) {
return null;
}
private static Map<String, String> convertToCompatibleS3Properties(Map<String, String> props,
String s3CliEndpoint,
CloudCredential credential,
Map<String, String> s3Properties) {
Map<String, String> heteroProps = new HashMap<>(s3Properties);
Map<String, String> copiedProps = new HashMap<>(props);
if (s3CliEndpoint.contains(CosProperties.COS_PREFIX)) {
copiedProps.putIfAbsent(CosProperties.ENDPOINT, s3CliEndpoint);
// CosN is not compatible with S3, when use s3 properties, will convert to cosn properties.
heteroProps.putAll(convertToCOSProperties(copiedProps, credential));
} else if (s3CliEndpoint.contains(ObsProperties.OBS_PREFIX)) {
copiedProps.putIfAbsent(ObsProperties.ENDPOINT, s3CliEndpoint);
heteroProps.putAll(convertToOBSProperties(copiedProps, credential));
} else if (s3CliEndpoint.contains(OssProperties.OSS_REGION_PREFIX)) {
copiedProps.putIfAbsent(OssProperties.ENDPOINT, s3CliEndpoint);
heteroProps.putAll(convertToOSSProperties(copiedProps, credential));
}
return heteroProps;
}
private static Map<String, String> convertToOBSProperties(Map<String, String> props,
CloudCredential credential) {
Map<String, String> obsProperties = Maps.newHashMap();
obsProperties.put(OBSConstants.ENDPOINT, props.get(ObsProperties.ENDPOINT));
obsProperties.put("fs.obs.impl", getHadoopFSImplByScheme("obs"));
if (credential.isWhole()) {
obsProperties.put(OBSConstants.ACCESS_KEY, credential.getAccessKey());
obsProperties.put(OBSConstants.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
obsProperties.put(ObsProperties.FS.SESSION_TOKEN, credential.getSessionToken());
}
for (Map.Entry<String, String> entry : props.entrySet()) {
if (entry.getKey().startsWith(ObsProperties.OBS_FS_PREFIX)) {
obsProperties.put(entry.getKey(), entry.getValue());
}
}
return obsProperties;
}
public static String getHadoopFSImplByScheme(String fsScheme) {
if (fsScheme.equalsIgnoreCase("obs")) {
return OBSFileSystem.class.getName();
} else if (fsScheme.equalsIgnoreCase("file")) {
return LocalFileSystem.class.getName();
} else if (fsScheme.equalsIgnoreCase("oss")) {
return AliyunOSSFileSystem.class.getName();
} else if (fsScheme.equalsIgnoreCase("cosn") || fsScheme.equalsIgnoreCase("lakefs")) {
return CosFileSystem.class.getName();
} else {
return S3AFileSystem.class.getName();
}
}
private static Map<String, String> convertToS3EnvProperties(Map<String, String> properties,
CloudCredentialWithEndpoint credential,
boolean isMeta) {
// Old properties to new properties
properties.put(S3Properties.ENDPOINT, credential.getEndpoint());
properties.put(S3Properties.REGION,
checkRegion(credential.getEndpoint(), credential.getRegion(), S3Properties.Env.REGION));
properties.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
properties.put(S3Properties.SECRET_KEY, credential.getSecretKey());
if (properties.containsKey(S3Properties.Env.TOKEN)) {
properties.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
}
if (properties.containsKey(S3Properties.Env.MAX_CONNECTIONS)) {
properties.put(S3Properties.MAX_CONNECTIONS, properties.get(S3Properties.Env.MAX_CONNECTIONS));
}
if (properties.containsKey(S3Properties.Env.REQUEST_TIMEOUT_MS)) {
properties.put(S3Properties.REQUEST_TIMEOUT_MS, properties.get(S3Properties.Env.REQUEST_TIMEOUT_MS));
}
if (properties.containsKey(S3Properties.Env.CONNECTION_TIMEOUT_MS)) {
properties.put(S3Properties.REQUEST_TIMEOUT_MS, properties.get(S3Properties.Env.CONNECTION_TIMEOUT_MS));
}
if (properties.containsKey(S3Properties.Env.ROLE_ARN)) {
properties.put(S3Properties.ROLE_ARN, properties.get(S3Properties.Env.ROLE_ARN));
}
if (properties.containsKey(S3Properties.Env.EXTERNAL_ID)) {
properties.put(S3Properties.EXTERNAL_ID, properties.get(S3Properties.Env.EXTERNAL_ID));
}
if (isMeta) {
return properties;
}
return convertToS3Properties(properties, credential);
}
private static Map<String, String> convertToS3Properties(Map<String, String> properties,
CloudCredential credential) {
// s3 property in paimon is personalized
String type = properties.get(CatalogMgr.CATALOG_TYPE_PROP);
if (type != null && type.equalsIgnoreCase(Type.PAIMON.toString())) {
return PaimonProperties.convertToS3Properties(properties, credential);
}
Map<String, String> s3Properties = Maps.newHashMap();
String endpoint = properties.get(S3Properties.ENDPOINT);
s3Properties.put(Constants.ENDPOINT, endpoint);
s3Properties.put(Constants.AWS_REGION,
checkRegion(endpoint, properties.get(S3Properties.REGION), S3Properties.REGION));
if (properties.containsKey(S3Properties.MAX_CONNECTIONS)) {
s3Properties.put(Constants.MAXIMUM_CONNECTIONS, properties.get(S3Properties.MAX_CONNECTIONS));
}
if (properties.containsKey(S3Properties.REQUEST_TIMEOUT_MS)) {
s3Properties.put(Constants.REQUEST_TIMEOUT, properties.get(S3Properties.REQUEST_TIMEOUT_MS));
}
if (properties.containsKey(S3Properties.CONNECTION_TIMEOUT_MS)) {
s3Properties.put(Constants.SOCKET_TIMEOUT, properties.get(S3Properties.CONNECTION_TIMEOUT_MS));
}
setS3FsAccess(s3Properties, properties, credential);
s3Properties.putAll(properties);
// remove extra meta properties
S3Properties.FS_KEYS.forEach(s3Properties::remove);
if (LOG.isDebugEnabled()) {
LOG.debug("s3Properties:{}\nproperties:{}", s3Properties, properties);
}
return s3Properties;
}
public static String checkRegion(String endpoint, String region, String regionKey) {
if (Strings.isNullOrEmpty(region)) {
region = S3Properties.getRegionOfEndpoint(endpoint);
}
if (Strings.isNullOrEmpty(region)) {
String errorMsg = String.format("No '%s' info found, using SDK default region: us-east-1", regionKey);
LOG.warn(errorMsg);
return "us-east-1";
}
return region;
}
private static void setS3FsAccess(Map<String, String> s3Properties, Map<String, String> properties,
CloudCredential credential) {
s3Properties.put(Constants.MAX_ERROR_RETRIES, "2");
s3Properties.putIfAbsent("fs.s3.impl", S3AFileSystem.class.getName());
String credentialsProviders = getAWSCredentialsProviders(properties);
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, credentialsProviders);
if (credential.isWhole()) {
s3Properties.put(Constants.ACCESS_KEY, credential.getAccessKey());
s3Properties.put(Constants.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
s3Properties.put(Constants.SESSION_TOKEN, credential.getSessionToken());
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, TemporaryAWSCredentialsProvider.class.getName());
}
s3Properties.put(Constants.PATH_STYLE_ACCESS, properties.getOrDefault(USE_PATH_STYLE, "false"));
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(S3Properties.S3_FS_PREFIX)) {
s3Properties.put(entry.getKey(), entry.getValue());
}
}
if (properties.containsKey(S3Properties.ROLE_ARN)
&& !Strings.isNullOrEmpty(properties.get(S3Properties.ROLE_ARN))) {
// refer to https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/assumed_roles.html
// https://issues.apache.org/jira/browse/HADOOP-19201
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.class.getName());
s3Properties.put(Constants.ASSUMED_ROLE_ARN, properties.get(S3Properties.ROLE_ARN));
s3Properties.put(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER,
InstanceProfileCredentialsProvider.class.getName());
if (properties.containsKey(S3Properties.EXTERNAL_ID)
&& !Strings.isNullOrEmpty(properties.get(S3Properties.EXTERNAL_ID))) {
LOG.warn("External ID is not supported for assumed role credential provider");
}
}
}
public static String getAWSCredentialsProviders(Map<String, String> properties) {
String credentialsProviders;
String hadoopCredProviders = properties.get(Constants.AWS_CREDENTIALS_PROVIDER);
if (hadoopCredProviders != null) {
credentialsProviders = hadoopCredProviders;
} else {
String defaultProviderList = String.join(",", S3Properties.AWS_CREDENTIALS_PROVIDERS);
credentialsProviders = properties.getOrDefault(S3Properties.CREDENTIALS_PROVIDER, defaultProviderList);
}
return credentialsProviders;
}
private static Map<String, String> convertToGCSProperties(Map<String, String> props, CloudCredential credential) {
// Now we use s3 client to access
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
}
private static Map<String, String> convertToOSSProperties(Map<String, String> props, CloudCredential credential) {
Map<String, String> ossProperties = Maps.newHashMap();
String endpoint = props.get(OssProperties.ENDPOINT);
if (endpoint.startsWith(OssProperties.OSS_PREFIX)) {
// may use oss.oss-cn-beijing.aliyuncs.com
endpoint = endpoint.replace(OssProperties.OSS_PREFIX, "");
}
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ENDPOINT_KEY, endpoint);
boolean hdfsEnabled = Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED, "false"));
if (LocationPath.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
// use endpoint or enable hdfs
rewriteHdfsOnOssProperties(ossProperties, endpoint);
} else {
ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss"));
}
if (credential.isWhole()) {
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID, credential.getAccessKey());
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET, credential.getSecretKey());
}
if (credential.isTemporary()) {
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN, credential.getSessionToken());
}
for (Map.Entry<String, String> entry : props.entrySet()) {
if (entry.getKey().startsWith(OssProperties.OSS_FS_PREFIX)) {
ossProperties.put(entry.getKey(), entry.getValue());
}
}
return ossProperties;
}
private static void rewriteHdfsOnOssProperties(Map<String, String> ossProperties, String endpoint) {
if (!LocationPath.isHdfsOnOssEndpoint(endpoint)) {
// just for robustness here, avoid wrong endpoint when oss-hdfs is enabled.
// convert "oss-cn-beijing.aliyuncs.com" to "cn-beijing.oss-dls.aliyuncs.com"
// reference link: https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
String[] endpointSplit = endpoint.split("\\.");
if (endpointSplit.length > 0) {
String region = endpointSplit[0].replace("oss-", "").replace("-internal", "");
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ENDPOINT_KEY,
region + ".oss-dls.aliyuncs.com");
}
}
ossProperties.put("fs.oss.impl", "com.aliyun.jindodata.oss.JindoOssFileSystem");
ossProperties.put("fs.AbstractFileSystem.oss.impl", "com.aliyun.jindodata.oss.OSS");
}
private static Map<String, String> convertToCOSProperties(Map<String, String> props, CloudCredential credential) {
Map<String, String> cosProperties = Maps.newHashMap();
cosProperties.put(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY, props.get(CosProperties.ENDPOINT));
cosProperties.put("fs.cosn.impl", getHadoopFSImplByScheme("cosn"));
cosProperties.put("fs.lakefs.impl", getHadoopFSImplByScheme("lakefs"));
if (credential.isWhole()) {
cosProperties.put(CosNConfigKeys.COSN_USERINFO_SECRET_ID_KEY, credential.getAccessKey());
cosProperties.put(CosNConfigKeys.COSN_USERINFO_SECRET_KEY_KEY, credential.getSecretKey());
}
// session token is unsupported
for (Map.Entry<String, String> entry : props.entrySet()) {
if (entry.getKey().startsWith(CosProperties.COS_FS_PREFIX)) {
cosProperties.put(entry.getKey(), entry.getValue());
}
}
return cosProperties;
}
private static Map<String, String> convertToMinioProperties(Map<String, String> props, CloudCredential credential) {
if (!props.containsKey(MinioProperties.REGION)) {
props.put(MinioProperties.REGION, MinioProperties.DEFAULT_REGION);
}
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
}
}