S3Properties.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.constants;

import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.CredProviderTypePB;
import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB.Provider;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.credentials.DataLakeAWSCredentialsProvider;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.thrift.TCredProviderType;
import org.apache.doris.thrift.TS3StorageParam;

import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

public class S3Properties extends BaseProperties {

    public static final String S3_PREFIX = "s3.";
    public static final String S3_FS_PREFIX = "fs.s3";

    public static final String CREDENTIALS_PROVIDER = "s3.credentials.provider";
    public static final String ENDPOINT = "s3.endpoint";
    public static final String EXTERNAL_ENDPOINT = "s3.external_endpoint";
    public static final String REGION = "s3.region";
    public static final String ACCESS_KEY = "s3.access_key";
    public static final String SECRET_KEY = "s3.secret_key";
    public static final String SESSION_TOKEN = "s3.session_token";

    public static final String ROLE_ARN = "s3.role_arn";
    public static final String EXTERNAL_ID = "s3.external_id";

    public static final String MAX_CONNECTIONS = "s3.connection.maximum";
    public static final String REQUEST_TIMEOUT_MS = "s3.connection.request.timeout";
    public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout";
    public static final String S3_PROVIDER = "S3";

    // required by storage policy
    public static final String ROOT_PATH = "s3.root.path";
    public static final String BUCKET = "s3.bucket";
    public static final String VALIDITY_CHECK = "s3_validity_check";
    public static final String PROVIDER = "provider";
    public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT);
    public static final List<String> TVF_REQUIRED_FIELDS = Arrays.asList(ACCESS_KEY, SECRET_KEY);
    public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN,
            ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);

    public static final List<String> PROVIDERS = Arrays.asList("COS", "OSS", "S3", "OBS", "BOS", "AZURE", "GCP");

    public static final List<String> AWS_CREDENTIALS_PROVIDERS = Arrays.asList(
            DataLakeAWSCredentialsProvider.class.getName(),
            TemporaryAWSCredentialsProvider.class.getName(),
            SimpleAWSCredentialsProvider.class.getName(),
            EnvironmentVariableCredentialsProvider.class.getName(),
            SystemPropertiesCredentialsProvider.class.getName(),
            InstanceProfileCredentialsProvider.class.getName(),
            ProfileCredentialsProvider.class.getName(),
            WebIdentityTokenCredentialsProvider.class.getName(),
            IAMInstanceCredentialsProvider.class.getName());

    private static final Pattern IPV4_PORT_PATTERN = Pattern.compile("((?:\\d{1,3}\\.){3}\\d{1,3}:\\d{1,5})");

    public static Map<String, String> credentialToMap(CloudCredentialWithEndpoint credential) {
        Map<String, String> resMap = new HashMap<>();
        resMap.put(S3Properties.ENDPOINT, credential.getEndpoint());
        resMap.put(S3Properties.REGION, credential.getRegion());
        if (credential.isWhole()) {
            resMap.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
            resMap.put(S3Properties.SECRET_KEY, credential.getSecretKey());
        }
        if (credential.isTemporary()) {
            resMap.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
        }
        return resMap;
    }

    public static class Env {
        public static final String PROPERTIES_PREFIX = "AWS";
        // required
        public static final String ENDPOINT = "AWS_ENDPOINT";
        public static final String REGION = "AWS_REGION";
        public static final String ACCESS_KEY = "AWS_ACCESS_KEY";
        public static final String SECRET_KEY = "AWS_SECRET_KEY";
        public static final String TOKEN = "AWS_TOKEN";
        // required by storage policy
        public static final String ROOT_PATH = "AWS_ROOT_PATH";
        public static final String BUCKET = "AWS_BUCKET";
        // optional
        public static final String MAX_CONNECTIONS = "AWS_MAX_CONNECTIONS";
        public static final String REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
        public static final String CONNECTION_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS";
        public static final String DEFAULT_MAX_CONNECTIONS = "50";
        public static final String DEFAULT_REQUEST_TIMEOUT_MS = "3000";
        public static final String DEFAULT_CONNECTION_TIMEOUT_MS = "1000";
        public static final String NEED_OVERRIDE_ENDPOINT = "AWS_NEED_OVERRIDE_ENDPOINT";

        public static final String ROLE_ARN = "AWS_ROLE_ARN";
        public static final String EXTERNAL_ID = "AWS_EXTERNAL_ID";

        public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT);
        public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, TOKEN,
                ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
    }

    public static CloudCredential getCredential(Map<String, String> props) {
        return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
    }

    public static CloudCredentialWithEndpoint getEnvironmentCredentialWithEndpoint(Map<String, String> props) {
        CloudCredential credential = getCloudCredential(props, Env.ACCESS_KEY, Env.SECRET_KEY,
                Env.TOKEN);
        if (!props.containsKey(Env.ENDPOINT)) {
            throw new IllegalArgumentException("Missing 'AWS_ENDPOINT' property. ");
        }
        String endpoint = props.get(Env.ENDPOINT);
        String region = props.getOrDefault(Env.REGION, S3Properties.getRegionOfEndpoint(endpoint));
        props.putIfAbsent(Env.REGION, PropertyConverter.checkRegion(endpoint, region, Env.REGION));
        return new CloudCredentialWithEndpoint(endpoint, region, credential);
    }

    public static String getRegionOfEndpoint(String endpoint) {
        if (IPV4_PORT_PATTERN.matcher(endpoint).find()) {
            // if endpoint contains '192.168.0.1:8999', return null region
            return null;
        }
        String[] endpointSplit = endpoint.replace("http://", "")
                .replace("https://", "")
                .split("\\.");
        if (endpointSplit.length < 2) {
            return null;
        }
        if (endpointSplit[0].contains("oss-")) {
            // compatible with the endpoint: oss-cn-bejing.aliyuncs.com
            return endpointSplit[0];
        }
        return endpointSplit[1];
    }

    public static Map<String, String> prefixToS3(Map<String, String> properties) {
        Map<String, String> s3Properties = Maps.newHashMap();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            if (entry.getKey().startsWith(OssProperties.OSS_PREFIX)) {
                String s3Key = entry.getKey().replace(OssProperties.OSS_PREFIX, S3Properties.S3_PREFIX);
                s3Properties.put(s3Key, entry.getValue());
            } else if (entry.getKey().startsWith(GCSProperties.GCS_PREFIX)) {
                String s3Key = entry.getKey().replace(GCSProperties.GCS_PREFIX, S3Properties.S3_PREFIX);
                s3Properties.put(s3Key, entry.getValue());
            }  else if (entry.getKey().startsWith(CosProperties.COS_PREFIX)) {
                String s3Key = entry.getKey().replace(CosProperties.COS_PREFIX, S3Properties.S3_PREFIX);
                s3Properties.put(s3Key, entry.getValue());
            } else if (entry.getKey().startsWith(ObsProperties.OBS_PREFIX)) {
                String s3Key = entry.getKey().replace(ObsProperties.OBS_PREFIX, S3Properties.S3_PREFIX);
                s3Properties.put(s3Key, entry.getValue());
            } else if (entry.getKey().startsWith(MinioProperties.MINIO_PREFIX)) {
                String s3Key = entry.getKey().replace(MinioProperties.MINIO_PREFIX, S3Properties.S3_PREFIX);
                s3Properties.put(s3Key, entry.getValue());
            } else {
                s3Properties.put(entry.getKey(), entry.getValue());
            }
        }
        return s3Properties;
    }

    public static Map<String, String> requiredS3TVFProperties(Map<String, String> properties)
            throws AnalysisException {
        try {
            for (String field : S3Properties.TVF_REQUIRED_FIELDS) {
                checkRequiredProperty(properties, field);
            }
        } catch (DdlException e) {
            throw new AnalysisException(e.getMessage(), e);
        }
        return properties;
    }

    private static void checkProvider(Map<String, String> properties) throws DdlException {
        if (properties.containsKey(PROVIDER)) {
            properties.put(PROVIDER, properties.get(PROVIDER).toUpperCase());
            // S3 Provider properties should be case insensitive.
            if (!PROVIDERS.stream().anyMatch(s -> s.equals(properties.get(PROVIDER).toUpperCase()))) {
                throw new DdlException("Provider must be one of OSS, OBS, AZURE, BOS, COS, S3, GCP");
            }
        }
    }

    public static void requiredS3Properties(Map<String, String> properties) throws DdlException {
        // Try to convert env properties to uniform properties
        // compatible with old version
        S3Properties.convertToStdProperties(properties);
        if (properties.containsKey(S3Properties.Env.ENDPOINT)
                    && !properties.containsKey(S3Properties.ENDPOINT)) {
            for (String field : S3Properties.Env.REQUIRED_FIELDS) {
                checkRequiredProperty(properties, field);
            }
        } else {
            for (String field : S3Properties.REQUIRED_FIELDS) {
                checkRequiredProperty(properties, field);
            }
        }
        checkProvider(properties);
    }

    public static void requiredS3PingProperties(Map<String, String> properties) throws DdlException {
        requiredS3Properties(properties);
        checkRequiredProperty(properties, S3Properties.BUCKET);
    }

    public static void checkRequiredProperty(Map<String, String> properties, String propertyKey)
            throws DdlException {
        String value = properties.get(propertyKey);
        if (Strings.isNullOrEmpty(value)) {
            throw new DdlException("Missing [" + propertyKey + "] in properties.");
        }
    }

    public static void optionalS3Property(Map<String, String> properties) {
        properties.putIfAbsent(S3Properties.MAX_CONNECTIONS, S3Properties.Env.DEFAULT_MAX_CONNECTIONS);
        properties.putIfAbsent(S3Properties.REQUEST_TIMEOUT_MS, S3Properties.Env.DEFAULT_REQUEST_TIMEOUT_MS);
        properties.putIfAbsent(S3Properties.CONNECTION_TIMEOUT_MS, S3Properties.Env.DEFAULT_CONNECTION_TIMEOUT_MS);
        // compatible with old version
        properties.putIfAbsent(S3Properties.Env.MAX_CONNECTIONS, S3Properties.Env.DEFAULT_MAX_CONNECTIONS);
        properties.putIfAbsent(S3Properties.Env.REQUEST_TIMEOUT_MS, S3Properties.Env.DEFAULT_REQUEST_TIMEOUT_MS);
        properties.putIfAbsent(S3Properties.Env.CONNECTION_TIMEOUT_MS, S3Properties.Env.DEFAULT_CONNECTION_TIMEOUT_MS);
    }

    public static void convertToStdProperties(Map<String, String> properties) {
        if (properties.containsKey(S3Properties.Env.ENDPOINT)) {
            properties.putIfAbsent(S3Properties.ENDPOINT, properties.get(S3Properties.Env.ENDPOINT));
        }
        if (properties.containsKey(S3Properties.Env.REGION)) {
            properties.putIfAbsent(S3Properties.REGION, properties.get(S3Properties.Env.REGION));
        }
        if (properties.containsKey(S3Properties.Env.ACCESS_KEY)) {
            properties.putIfAbsent(S3Properties.ACCESS_KEY, properties.get(S3Properties.Env.ACCESS_KEY));
        }
        if (properties.containsKey(S3Properties.Env.SECRET_KEY)) {
            properties.putIfAbsent(S3Properties.SECRET_KEY, properties.get(S3Properties.Env.SECRET_KEY));
        }
        if (properties.containsKey(S3Properties.Env.TOKEN)) {
            properties.putIfAbsent(S3Properties.SESSION_TOKEN, properties.get(S3Properties.Env.TOKEN));
        }
        if (properties.containsKey(S3Properties.Env.MAX_CONNECTIONS)) {
            properties.putIfAbsent(S3Properties.MAX_CONNECTIONS, properties.get(S3Properties.Env.MAX_CONNECTIONS));
        }
        if (properties.containsKey(S3Properties.Env.REQUEST_TIMEOUT_MS)) {
            properties.putIfAbsent(S3Properties.REQUEST_TIMEOUT_MS,
                    properties.get(S3Properties.Env.REQUEST_TIMEOUT_MS));

        }
        if (properties.containsKey(S3Properties.Env.CONNECTION_TIMEOUT_MS)) {
            properties.putIfAbsent(S3Properties.CONNECTION_TIMEOUT_MS,
                    properties.get(S3Properties.Env.CONNECTION_TIMEOUT_MS));
        }
        if (properties.containsKey(S3Properties.Env.ROOT_PATH)) {
            properties.putIfAbsent(S3Properties.ROOT_PATH, properties.get(S3Properties.Env.ROOT_PATH));
        }
        if (properties.containsKey(S3Properties.Env.BUCKET)) {
            properties.putIfAbsent(S3Properties.BUCKET, properties.get(S3Properties.Env.BUCKET));
        }
        if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
            properties.putIfAbsent(PropertyConverter.USE_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE));
        }

        if (properties.containsKey(S3Properties.Env.ROLE_ARN)) {
            properties.putIfAbsent(S3Properties.ROLE_ARN, properties.get(S3Properties.Env.ROLE_ARN));
        }

        if (properties.containsKey(S3Properties.Env.EXTERNAL_ID)) {
            properties.putIfAbsent(S3Properties.EXTERNAL_ID, properties.get(S3Properties.Env.EXTERNAL_ID));
        }
    }

    public static TS3StorageParam getS3TStorageParam(Map<String, String> properties) {
        TS3StorageParam s3Info = new TS3StorageParam();

        if (properties.containsKey(S3Properties.ROLE_ARN)) {
            s3Info.setRoleArn(properties.get(S3Properties.ROLE_ARN));
            if (properties.containsKey(S3Properties.EXTERNAL_ID)) {
                s3Info.setExternalId(properties.get(S3Properties.EXTERNAL_ID));
            }
            s3Info.setCredProviderType(TCredProviderType.INSTANCE_PROFILE);
        }

        s3Info.setEndpoint(properties.get(S3Properties.ENDPOINT));
        s3Info.setRegion(properties.get(S3Properties.REGION));
        s3Info.setAk(properties.get(S3Properties.ACCESS_KEY));
        s3Info.setSk(properties.get(S3Properties.SECRET_KEY));
        s3Info.setToken(properties.get(S3Properties.SESSION_TOKEN));

        s3Info.setRootPath(properties.get(S3Properties.ROOT_PATH));
        s3Info.setBucket(properties.get(S3Properties.BUCKET));
        String maxConnections = properties.get(S3Properties.MAX_CONNECTIONS);
        s3Info.setMaxConn(Integer.parseInt(maxConnections == null
                ? S3Properties.Env.DEFAULT_MAX_CONNECTIONS : maxConnections));
        String requestTimeoutMs = properties.get(S3Properties.REQUEST_TIMEOUT_MS);
        s3Info.setRequestTimeoutMs(Integer.parseInt(requestTimeoutMs == null
                ? S3Properties.Env.DEFAULT_REQUEST_TIMEOUT_MS : requestTimeoutMs));
        String connTimeoutMs = properties.get(S3Properties.CONNECTION_TIMEOUT_MS);
        s3Info.setConnTimeoutMs(Integer.parseInt(connTimeoutMs == null
                ? S3Properties.Env.DEFAULT_CONNECTION_TIMEOUT_MS : connTimeoutMs));
        String usePathStyle = properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
        s3Info.setUsePathStyle(Boolean.parseBoolean(usePathStyle));
        return s3Info;
    }

    public static Cloud.ObjectStoreInfoPB.Builder getObjStoreInfoPB(Map<String, String> properties) {
        Cloud.ObjectStoreInfoPB.Builder builder = Cloud.ObjectStoreInfoPB.newBuilder();
        if (properties.containsKey(S3Properties.ENDPOINT)) {
            builder.setEndpoint(properties.get(S3Properties.ENDPOINT));
        }
        if (properties.containsKey(S3Properties.REGION)) {
            builder.setRegion(properties.get(S3Properties.REGION));
        }
        if (properties.containsKey(S3Properties.ACCESS_KEY)) {
            builder.setAk(properties.get(S3Properties.ACCESS_KEY));
        }
        if (properties.containsKey(S3Properties.SECRET_KEY)) {
            builder.setSk(properties.get(S3Properties.SECRET_KEY));
        }
        if (properties.containsKey(S3Properties.ROOT_PATH)) {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(properties.get(S3Properties.ROOT_PATH)),
                    "%s cannot be empty", S3Properties.ROOT_PATH);
            builder.setPrefix(properties.get(S3Properties.ROOT_PATH));
        }
        if (properties.containsKey(S3Properties.BUCKET)) {
            builder.setBucket(properties.get(S3Properties.BUCKET));
        }
        if (properties.containsKey(S3Properties.EXTERNAL_ENDPOINT)) {
            builder.setExternalEndpoint(properties.get(S3Properties.EXTERNAL_ENDPOINT));
        }
        if (properties.containsKey(S3Properties.PROVIDER)) {
            // S3 Provider properties should be case insensitive.
            builder.setProvider(Provider.valueOf(properties.get(S3Properties.PROVIDER).toUpperCase()));
        }

        if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
            String value = properties.get(PropertyConverter.USE_PATH_STYLE);
            Preconditions.checkArgument(!Strings.isNullOrEmpty(value), "use_path_style cannot be empty");
            Preconditions.checkArgument(value.equalsIgnoreCase("true")
                    || value.equalsIgnoreCase("false"),
                    "Invalid use_path_style value: %s only 'true' or 'false' is acceptable", value);
            builder.setUsePathStyle(value.equalsIgnoreCase("true"));
        }

        if (properties.containsKey(S3Properties.ROLE_ARN)) {
            builder.setRoleArn(properties.get(S3Properties.ROLE_ARN));
            if (properties.containsKey(S3Properties.EXTERNAL_ID)) {
                builder.setExternalId(properties.get(S3Properties.EXTERNAL_ID));
            }
            builder.setCredProviderType(CredProviderTypePB.INSTANCE_PROFILE);
        }

        return builder;
    }
}