OSSProperties.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.storage;
import org.apache.doris.datasource.property.ConnectorPropertiesUtils;
import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Stream;
public class OSSProperties extends AbstractS3CompatibleProperties {
@Setter
@Getter
@ConnectorProperty(names = {"oss.endpoint", "s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT", "dlf.endpoint",
"dlf.catalog.endpoint"},
required = false,
description = "The endpoint of OSS.")
protected String endpoint = "";
@Getter
@ConnectorProperty(names = {"oss.access_key", "s3.access_key", "AWS_ACCESS_KEY", "access_key", "ACCESS_KEY",
"dlf.access_key", "dlf.catalog.accessKeyId"},
required = false,
description = "The access key of OSS.")
protected String accessKey = "";
@Getter
@ConnectorProperty(names = {"oss.secret_key", "s3.secret_key", "AWS_SECRET_KEY", "secret_key", "SECRET_KEY",
"dlf.secret_key", "dlf.catalog.secret_key"},
required = false,
description = "The secret key of OSS.")
protected String secretKey = "";
@Getter
@Setter
@ConnectorProperty(names = {"oss.region", "s3.region", "AWS_REGION", "region", "REGION", "dlf.region"},
required = false,
description = "The region of OSS.")
protected String region;
@ConnectorProperty(names = {"dlf.access.public", "dlf.catalog.accessPublic"},
required = false,
description = "Enable public access to Aliyun DLF.")
protected String dlfAccessPublic = "false";
@Getter
@ConnectorProperty(names = {"oss.session_token", "s3.session_token", "session_token"},
required = false,
description = "The session token of OSS.")
protected String sessionToken = "";
/**
* The maximum number of concurrent connections that can be made to the object storage system.
* This value is optional and can be configured by the user.
*/
@Getter
@ConnectorProperty(names = {"oss.connection.maximum", "s3.connection.maximum"}, required = false,
description = "Maximum number of connections.")
protected String maxConnections = "100";
/**
* The timeout (in milliseconds) for requests made to the object storage system.
* This value is optional and can be configured by the user.
*/
@Getter
@ConnectorProperty(names = {"oss.connection.request.timeout", "s3.connection.request.timeout"}, required = false,
description = "Request timeout in seconds.")
protected String requestTimeoutS = "10000";
/**
* The timeout (in milliseconds) for establishing a connection to the object storage system.
* This value is optional and can be configured by the user.
*/
@Getter
@ConnectorProperty(names = {"oss.connection.timeout", "s3.connection.timeout"}, required = false,
description = "Connection timeout in seconds.")
protected String connectionTimeoutS = "10000";
/**
* Flag indicating whether to use path-style URLs for the object storage system.
* This value is optional and can be configured by the user.
*/
@Setter
@Getter
@ConnectorProperty(names = {"oss.use_path_style", "use_path_style", "s3.path-style-access"}, required = false,
description = "Whether to use path style URL for the storage.")
protected String usePathStyle = "false";
@ConnectorProperty(names = {"oss.force_parsing_by_standard_uri", "force_parsing_by_standard_uri"}, required = false,
description = "Whether to use path style URL for the storage.")
@Setter
@Getter
protected String forceParsingByStandardUrl = "false";
/**
* Pattern to extract the region from an Alibaba Cloud OSS endpoint.
* <p>
* Supported formats: <a href="https://help.aliyun.com/zh/oss/user-guide/regions-and-endpoints">aliyun oss</a>?
* - oss-cn-hangzhou.aliyuncs.com => region = cn-hangzhou
* - <a href="https://oss-cn-shanghai.aliyuncs.com">...</a> => region = cn-shanghai
* - oss-cn-beijing-internal.aliyuncs.com => region = cn-beijing (internal endpoint)
* - <a href="http://oss-cn-shenzhen-internal.aliyuncs.com">...</a> => region = cn-shenzhen
* <p>
* Group(1) captures the region name (e.g., cn-hangzhou).
* <p>
* Support S3 compatible endpoints:<a href="https://help.aliyun.com/zh/oss/developer-reference/
* use-amazon-s3-sdks-to-access-oss">...</a>
* - s3.cn-hangzhou.aliyuncs.com => region = cn-hangzhou
* <p>
*/
public static final Set<Pattern> ENDPOINT_PATTERN = ImmutableSet.of(Pattern
.compile("^(?:https?://)?(?:s3\\.)?oss-([a-z0-9-]+?)(?:-internal)?\\.aliyuncs\\.com$"),
Pattern.compile("(?:https?://)?([a-z]{2}-[a-z0-9-]+)\\.oss-dls\\.aliyuncs\\.com"),
Pattern.compile("^(?:https?://)?dlf(?:-vpc)?\\.([a-z0-9-]+)\\.aliyuncs\\.com(?:/.*)?$"));
private static final List<String> URI_KEYWORDS = Arrays.asList("uri", "warehouse");
private static List<String> DLF_TYPE_KEYWORDS = Arrays.asList("hive.metastore.type",
"iceberg.catalog.type", "paimon.catalog.type");
protected OSSProperties(Map<String, String> origProps) {
super(Type.OSS, origProps);
}
public static OSSProperties of(Map<String, String> properties) {
OSSProperties propertiesObj = new OSSProperties(properties);
ConnectorPropertiesUtils.bindConnectorProperties(propertiesObj, properties);
propertiesObj.initNormalizeAndCheckProps();
propertiesObj.initializeHadoopStorageConfig();
return propertiesObj;
}
protected static boolean guessIsMe(Map<String, String> origProps) {
String value = Stream.of("oss.endpoint", "s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT",
"dlf.endpoint", "dlf.catalog.endpoint")
.map(origProps::get)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
if (StringUtils.isNotBlank(value)) {
return (value.contains("aliyuncs.com"));
}
value = Stream.of("oss.region")
.map(origProps::get)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
if (StringUtils.isNotBlank(value)) {
return true;
}
if (isDlfMSType(origProps)) {
return true;
}
Optional<String> uriValue = origProps.entrySet().stream()
.filter(e -> URI_KEYWORDS.stream()
.anyMatch(key -> key.equalsIgnoreCase(e.getKey())))
.map(Map.Entry::getValue)
.filter(Objects::nonNull)
.filter(OSSProperties::isKnownObjectStorage)
.findFirst();
return uriValue.filter(OSSProperties::isKnownObjectStorage).isPresent();
}
private static boolean isKnownObjectStorage(String value) {
if (value == null) {
return false;
}
if (value.startsWith("oss://")) {
return true;
}
if (!value.contains("aliyuncs.com")) {
return false;
}
boolean isAliyunOss = (value.contains("oss-"));
boolean isAmazonS3 = value.contains("s3.");
boolean isDls = value.contains("dls");
return isAliyunOss || isAmazonS3 || isDls;
}
private static boolean isDlfMSType(Map<String, String> params) {
return DLF_TYPE_KEYWORDS.stream()
.anyMatch(key -> params.containsKey(key) && StringUtils.isNotBlank(params.get(key))
&& StringUtils.equalsIgnoreCase("dlf", params.get(key)));
}
@Override
protected void setEndpointIfPossible() {
if (StringUtils.isBlank(this.endpoint) && StringUtils.isNotBlank(this.region)) {
if (isDlfMSType(origProps)) {
this.endpoint = getOssEndpoint(region, BooleanUtils.toBoolean(dlfAccessPublic));
} else {
Optional<String> uriValueOpt = origProps.entrySet().stream()
.filter(e -> URI_KEYWORDS.stream()
.anyMatch(key -> key.equalsIgnoreCase(e.getKey())))
.map(Map.Entry::getValue)
.filter(Objects::nonNull)
.filter(OSSProperties::isKnownObjectStorage)
.findFirst();
if (uriValueOpt.isPresent()) {
String uri = uriValueOpt.get();
// If the URI does not start with http(s), derive endpoint from region
// (http(s) URIs are handled by separate logic elsewhere)
if (!uri.startsWith("http://") && !uri.startsWith("https://")) {
this.endpoint = getOssEndpoint(region, BooleanUtils.toBoolean(dlfAccessPublic));
}
}
}
}
super.setEndpointIfPossible();
}
@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
if (endpoint.contains("dlf") || endpoint.contains("oss-dls")) {
this.endpoint = getOssEndpoint(region, BooleanUtils.toBoolean(dlfAccessPublic));
}
}
private static String getOssEndpoint(String region, boolean publicAccess) {
String prefix = "oss-";
String suffix = ".aliyuncs.com";
if (!publicAccess) {
suffix = "-internal" + suffix;
}
return prefix + region + suffix;
}
@Override
protected Set<Pattern> endpointPatterns() {
return ENDPOINT_PATTERN;
}
@Override
public AwsCredentialsProvider getAwsCredentialsProvider() {
AwsCredentialsProvider credentialsProvider = super.getAwsCredentialsProvider();
if (credentialsProvider != null) {
return credentialsProvider;
}
if (StringUtils.isBlank(accessKey) && StringUtils.isBlank(secretKey)) {
// For anonymous access (no credentials required)
return AnonymousCredentialsProvider.create();
}
return null;
}
@Override
public void initializeHadoopStorageConfig() {
super.initializeHadoopStorageConfig();
hadoopStorageConfig.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
hadoopStorageConfig.set("fs.oss.accessKeyId", accessKey);
hadoopStorageConfig.set("fs.oss.accessKeySecret", secretKey);
hadoopStorageConfig.set("fs.oss.endpoint", endpoint);
}
}