MaxComputeExternalCatalog.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.maxcompute;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.constants.MCProperties;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.Partition;
import com.aliyun.odps.Project;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AccountFormat;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.security.SecurityManager;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.utils.StringUtils;
import com.google.common.collect.ImmutableList;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.log4j.Logger;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MaxComputeExternalCatalog extends ExternalCatalog {
    private static final Logger LOG = Logger.getLogger(MaxComputeExternalCatalog.class);
    // you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints
    private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
    private Odps odps;
    private String accessKey;
    private String secretKey;
    private String endpoint;
    private String defaultProject;
    private String quota;
    private EnvironmentSettings settings;
    private String catalogOwner;
    private String splitStrategy;
    private SplitOptions splitOptions;
    private long splitRowCount;
    private long splitByteSize;
    private int connectTimeout;
    private int readTimeout;
    private int retryTimes;
    public boolean dateTimePredicatePushDown;
    AccountFormat accountFormat = AccountFormat.DISPLAYNAME;
    private static final Map<String, ZoneId> REGION_ZONE_MAP;
    private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
            MCProperties.PROJECT,
            MCProperties.ENDPOINT
    );
    static {
        Map<String, ZoneId> map = new HashMap<>();
        map.put("cn-hangzhou", ZoneId.of("Asia/Shanghai"));
        map.put("cn-shanghai", ZoneId.of("Asia/Shanghai"));
        map.put("cn-shanghai-finance-1", ZoneId.of("Asia/Shanghai"));
        map.put("cn-beijing", ZoneId.of("Asia/Shanghai"));
        map.put("cn-north-2-gov-1", ZoneId.of("Asia/Shanghai"));
        map.put("cn-zhangjiakou", ZoneId.of("Asia/Shanghai"));
        map.put("cn-wulanchabu", ZoneId.of("Asia/Shanghai"));
        map.put("cn-shenzhen", ZoneId.of("Asia/Shanghai"));
        map.put("cn-shenzhen-finance-1", ZoneId.of("Asia/Shanghai"));
        map.put("cn-chengdu", ZoneId.of("Asia/Shanghai"));
        map.put("cn-hongkong", ZoneId.of("Asia/Shanghai"));
        map.put("ap-southeast-1", ZoneId.of("Asia/Singapore"));
        map.put("ap-southeast-2", ZoneId.of("Australia/Sydney"));
        map.put("ap-southeast-3", ZoneId.of("Asia/Kuala_Lumpur"));
        map.put("ap-southeast-5", ZoneId.of("Asia/Jakarta"));
        map.put("ap-northeast-1", ZoneId.of("Asia/Tokyo"));
        map.put("eu-central-1", ZoneId.of("Europe/Berlin"));
        map.put("eu-west-1", ZoneId.of("Europe/London"));
        map.put("us-west-1", ZoneId.of("America/Los_Angeles"));
        map.put("us-east-1", ZoneId.of("America/New_York"));
        map.put("me-east-1", ZoneId.of("Asia/Dubai"));
        REGION_ZONE_MAP = Collections.unmodifiableMap(map);
    }
    public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
                                     String comment) {
        super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE, comment);
        catalogProperty = new CatalogProperty(resource, props);
    }
    //Compatible with existing catalogs in previous versions.
    protected void generatorEndpoint() {
        Map<String, String> props = catalogProperty.getProperties();
        if (props.containsKey(MCProperties.ENDPOINT)) {
            // This is a new version of the property, so no parsing conversion is required.
            endpoint = props.get(MCProperties.ENDPOINT);
        } else if (props.containsKey(MCProperties.TUNNEL_SDK_ENDPOINT)) {
            // If customized `mc.tunnel_endpoint` before,
            // need to convert the value of this property because used the `tunnel API` before.
            String tunnelEndpoint = props.get(MCProperties.TUNNEL_SDK_ENDPOINT);
            endpoint = tunnelEndpoint.replace("//dt", "//service") + "/api";
        } else if (props.containsKey(MCProperties.ODPS_ENDPOINT)) {
            // If you customized `mc.odps_endpoint` before,
            // this value is equivalent to the new version of `mc.endpoint`, so you can use it directly
            endpoint = props.get(MCProperties.ODPS_ENDPOINT);
        } else if (props.containsKey(MCProperties.REGION)) {
            //Copied from original logic.
            String region = props.get(MCProperties.REGION);
            if (region.startsWith("oss-")) {
                // may use oss-cn-beijing, ensure compatible
                region = region.replace("oss-", "");
            }
            boolean enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS,
                    MCProperties.DEFAULT_PUBLIC_ACCESS));
            endpoint = endpointTemplate.replace("{}", region);
            if (enablePublicAccess) {
                endpoint = endpoint.replace("-inc", "");
            }
        }
        /*
            Since MCProperties.REGION is a REQUIRED_PROPERTIES in previous versions
            and MCProperties.ENDPOINT is a REQUIRED_PROPERTIES in current versions,
            `else {}` is not needed here.
         */
    }
    @Override
    protected void initLocalObjectsImpl() {
        Map<String, String> props = catalogProperty.getProperties();
        generatorEndpoint();
        defaultProject = props.get(MCProperties.PROJECT);
        quota = props.getOrDefault(MCProperties.QUOTA, MCProperties.DEFAULT_QUOTA);
        boolean splitCrossPartition =
                Boolean.parseBoolean(props.getOrDefault(MCProperties.SPLIT_CROSS_PARTITION,
                MCProperties.DEFAULT_SPLIT_CROSS_PARTITION));
        splitStrategy = props.getOrDefault(MCProperties.SPLIT_STRATEGY, MCProperties.DEFAULT_SPLIT_STRATEGY);
        if (splitStrategy.equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
            splitByteSize = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_BYTE_SIZE,
                    MCProperties.DEFAULT_SPLIT_BYTE_SIZE));
            splitOptions = SplitOptions.newBuilder()
                    .SplitByByteSize(splitByteSize)
                    .withCrossPartition(splitCrossPartition)
                    .build();
        } else {
            splitRowCount = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_ROW_COUNT,
                    MCProperties.DEFAULT_SPLIT_ROW_COUNT));
            splitOptions = SplitOptions.newBuilder()
                    .SplitByRowOffset()
                    .withCrossPartition(splitCrossPartition)
                    .build();
        }
        connectTimeout = Integer.parseInt(
                props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT));
        readTimeout = Integer.parseInt(
                props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT));
        retryTimes = Integer.parseInt(
                props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT));
        RestOptions restOptions = RestOptions.newBuilder()
                .withConnectTimeout(connectTimeout)
                .withReadTimeout(readTimeout)
                .withRetryTimes(retryTimes).build();
        CloudCredential credential = MCProperties.getCredential(props);
        accessKey = credential.getAccessKey();
        secretKey = credential.getSecretKey();
        dateTimePredicatePushDown = Boolean.parseBoolean(
                props.getOrDefault(MCProperties.DATETIME_PREDICATE_PUSH_DOWN,
                        MCProperties.DEFAULT_DATETIME_PREDICATE_PUSH_DOWN));
        Account account = new AliyunAccount(accessKey, secretKey);
        this.odps = new Odps(account);
        odps.setDefaultProject(defaultProject);
        odps.setEndpoint(endpoint);
        String accountFormatProp = props.getOrDefault(MCProperties.ACCOUNT_FORMAT, MCProperties.DEFAULT_ACCOUNT_FORMAT);
        if (accountFormatProp.equals(MCProperties.ACCOUNT_FORMAT_NAME)) {
            accountFormat = AccountFormat.DISPLAYNAME;
        } else if (accountFormatProp.equals(MCProperties.ACCOUNT_FORMAT_ID)) {
            accountFormat = AccountFormat.ID;
        }
        odps.setAccountFormat(accountFormat);
        Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount())
                .withAppAccount(odps.getAppAccount()).build();
        settings = EnvironmentSettings.newBuilder()
                .withCredentials(credentials)
                .withServiceEndpoint(odps.getEndpoint())
                .withQuotaName(quota)
                .withRestOptions(restOptions)
                .build();
    }
    public Odps getClient() {
        makeSureInitialized();
        return odps;
    }
    protected List<String> listDatabaseNames() {
        List<String> result = new ArrayList<>();
        result.add(defaultProject);
        try {
            result.add(defaultProject);
            if (StringUtils.isNullOrEmpty(catalogOwner)) {
                SecurityManager sm = odps.projects().get().getSecurityManager();
                String whoami = sm.runQuery("whoami", false);
                JsonObject js = JsonParser.parseString(whoami).getAsJsonObject();
                catalogOwner = js.get("DisplayName").getAsString();
            }
            Iterator<Project> iterator = odps.projects().iterator(catalogOwner);
            while (iterator.hasNext()) {
                Project project = iterator.next();
                if (!project.getName().equals(defaultProject)) {
                    result.add(project.getName());
                }
            }
        } catch (OdpsException e) {
            throw new RuntimeException(e);
        }
        return result;
    }
    @Override
    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
        makeSureInitialized();
        try {
            return getClient().tables().exists(dbName, tblName);
        } catch (OdpsException e) {
            throw new RuntimeException(e);
        }
    }
    public List<String> listPartitionNames(String dbName, String tbl) {
        return listPartitionNames(dbName, tbl, 0, -1);
    }
    public List<String> listPartitionNames(String dbName, String tbl, long skip, long limit) {
        try {
            if (getClient().projects().exists(dbName)) {
                List<Partition> parts;
                if (limit < 0) {
                    parts = getClient().tables().get(dbName, tbl).getPartitions();
                } else {
                    skip = skip < 0 ? 0 : skip;
                    parts = new ArrayList<>();
                    Iterator<Partition> it = getClient().tables().get(dbName, tbl).getPartitionIterator();
                    int count = 0;
                    while (it.hasNext()) {
                        if (count < skip) {
                            count++;
                            it.next();
                        } else if (parts.size() >= limit) {
                            break;
                        } else {
                            parts.add(it.next());
                        }
                    }
                }
                return parts.stream().map(p -> p.getPartitionSpec().toString(false, true))
                        .collect(Collectors.toList());
            } else {
                throw new OdpsException("Max compute project: " + dbName + " not exists.");
            }
        } catch (OdpsException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public List<String> listTableNames(SessionContext ctx, String dbName) {
        makeSureInitialized();
        List<String> result = new ArrayList<>();
        getClient().tables().iterable(dbName).forEach(e -> result.add(e.getName()));
        return result;
    }
    public String getAccessKey() {
        makeSureInitialized();
        return accessKey;
    }
    public String getSecretKey() {
        makeSureInitialized();
        return secretKey;
    }
    public String getEndpoint() {
        makeSureInitialized();
        return endpoint;
    }
    public String getDefaultProject() {
        makeSureInitialized();
        return defaultProject;
    }
    public int getRetryTimes() {
        makeSureInitialized();
        return retryTimes;
    }
    public int getConnectTimeout() {
        makeSureInitialized();
        return connectTimeout;
    }
    public int getReadTimeout() {
        makeSureInitialized();
        return readTimeout;
    }
    public boolean getDateTimePredicatePushDown() {
        return dateTimePredicatePushDown;
    }
    public ZoneId getProjectDateTimeZone() {
        makeSureInitialized();
        String[] endpointSplit = endpoint.split("\\.");
        if (endpointSplit.length >= 2) {
            // http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api => cn-hangzhou-vpc
            String regionAndSuffix = endpointSplit[1];
            //remove `-vpc` and `-intranet` suffix.
            String region = regionAndSuffix.replace("-vpc", "").replace("-intranet", "");
            if (REGION_ZONE_MAP.containsKey(region)) {
                return REGION_ZONE_MAP.get(region);
            }
            LOG.warn("Not exist region. region = " + region + ". endpoint = " + endpoint + ". use systemDefault.");
            return ZoneId.systemDefault();
        }
        LOG.warn("Split EndPoint " + endpoint + "fill. use systemDefault.");
        return ZoneId.systemDefault();
    }
    public String getQuota() {
        return quota;
    }
    public SplitOptions getSplitOption() {
        return splitOptions;
    }
    public EnvironmentSettings getSettings() {
        return settings;
    }
    public String getSplitStrategy() {
        return splitStrategy;
    }
    public long getSplitRowCount() {
        return splitRowCount;
    }
    public long getSplitByteSize() {
        return splitByteSize;
    }
    @Override
    public void checkProperties() throws DdlException {
        super.checkProperties();
        Map<String, String> props = catalogProperty.getProperties();
        for (String requiredProperty : REQUIRED_PROPERTIES) {
            if (!props.containsKey(requiredProperty)) {
                throw new DdlException("Required property '" + requiredProperty + "' is missing");
            }
        }
        try {
            splitStrategy = props.getOrDefault(MCProperties.SPLIT_STRATEGY, MCProperties.DEFAULT_SPLIT_STRATEGY);
            if (splitStrategy.equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
                splitByteSize = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_BYTE_SIZE,
                        MCProperties.DEFAULT_SPLIT_BYTE_SIZE));
                if (splitByteSize < 10485760L) {
                    throw new DdlException(MCProperties.SPLIT_ROW_COUNT + " must be greater than or equal to 10485760");
                }
            } else if (splitStrategy.equals(MCProperties.SPLIT_BY_ROW_COUNT_STRATEGY)) {
                splitRowCount = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_ROW_COUNT,
                        MCProperties.DEFAULT_SPLIT_ROW_COUNT));
                if (splitRowCount <= 0) {
                    throw new DdlException(MCProperties.SPLIT_ROW_COUNT + " must be greater than 0");
                }
            } else {
                throw new DdlException("property " + MCProperties.SPLIT_STRATEGY + "must is "
                        + MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY + " or " + MCProperties.SPLIT_BY_ROW_COUNT_STRATEGY);
            }
        } catch (NumberFormatException e) {
            throw new DdlException("property " + MCProperties.SPLIT_BYTE_SIZE + "/"
                    + MCProperties.SPLIT_ROW_COUNT + "must be an integer");
        }
        String accountFormatProp = props.getOrDefault(MCProperties.ACCOUNT_FORMAT, MCProperties.DEFAULT_ACCOUNT_FORMAT);
        if (accountFormatProp.equals(MCProperties.ACCOUNT_FORMAT_NAME)) {
            accountFormat = AccountFormat.DISPLAYNAME;
        } else if (accountFormatProp.equals(MCProperties.ACCOUNT_FORMAT_ID)) {
            accountFormat = AccountFormat.ID;
        } else {
            throw new DdlException("property " + MCProperties.ACCOUNT_FORMAT + "only support name and id");
        }
        try {
            connectTimeout = Integer.parseInt(
                    props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT));
            readTimeout = Integer.parseInt(
                    props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT));
            retryTimes = Integer.parseInt(
                    props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT));
            if (connectTimeout <= 0) {
                throw new DdlException(MCProperties.CONNECT_TIMEOUT + " must be greater than 0");
            }
            if (readTimeout <= 0) {
                throw new DdlException(MCProperties.READ_TIMEOUT + " must be greater than 0");
            }
            if (retryTimes <= 0) {
                throw new DdlException(MCProperties.RETRY_COUNT + " must be greater than 0");
            }
        } catch (NumberFormatException e) {
            throw new DdlException("property " + MCProperties.CONNECT_TIMEOUT + "/"
                    + MCProperties.READ_TIMEOUT + "/" + MCProperties.RETRY_COUNT + "must be an integer");
        }
        CloudCredential credential = MCProperties.getCredential(props);
        if (!credential.isWhole()) {
            throw new DdlException("Max-Compute credential properties '"
                    + MCProperties.ACCESS_KEY + "' and  '" + MCProperties.SECRET_KEY + "' are required.");
        }
    }
}