McStructureHelper.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.maxcompute;


import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.Partition;
import com.aliyun.odps.Project;
import com.aliyun.odps.Schema;
import com.aliyun.odps.Table;
import com.aliyun.odps.security.SecurityManager;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.utils.StringUtils;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;


/**
 * Due to the introduction of the `mc.enable.namespace.schema` property, most interfaces using the
 * ODPS client have changed, and the mapping structure between Doris and MaxCompute has also changed.
 * Different property values correspond to different implementation class.
 * It's important to note that when external functions are called through the interface, the structure
 * mapped by Doris (database/table) is used, and the MaxCompute concept does not need to be considered.
 */
public interface McStructureHelper {
    List<String> listTableNames(Odps mcClient, String dbName);

    List<String> listDatabaseNames(Odps mcClient, String defaultProject);

    boolean tableExist(Odps mcClient, String dbName, String tableName) throws RuntimeException;

    boolean databaseExist(Odps mcClient, String dbName);

    TableIdentifier getTableIdentifier(String dbName, String tableName);

    List<Partition> getPartitions(Odps mcClient, String dbName, String tableName);

    Iterator<Partition> getPartitionIterator(Odps mcClient, String dbName, String tableName);

    Table getOdpsTable(Odps mcClient, String dbName, String tableName);

    /**
     * `mc.enable.namespace.schema` = true.
     * mapping structure between Doris and MaxCompute:
     * Doris     : catalog, dbName, tableName
     * MaxCompute: project, schema, table
    */
    class ProjectSchemaTableHelper implements McStructureHelper {
        private String defaultProjectName = null;

        public ProjectSchemaTableHelper(String defaultProjectName) {
            this.defaultProjectName = defaultProjectName;
        }

        @Override
        public List<String> listTableNames(Odps mcClient, String dbName) {
            List<String> result = new ArrayList<>();
            mcClient.tables().iterable(defaultProjectName, dbName, null, false)
                    .forEach(e -> result.add(e.getName()));
            return result;
        }

        @Override
        public List<String> listDatabaseNames(Odps mcClient, String defaultProject) {
            List<String> result = new ArrayList<>();
            Iterator<Schema> iterator = mcClient.schemas().iterator(defaultProjectName);
            while (iterator.hasNext()) {
                Schema schema = iterator.next();
                result.add(schema.getName());
            }
            return result;
        }

        @Override
        public List<Partition> getPartitions(Odps mcClient, String dbName, String tableName) {
            return mcClient.tables().get(defaultProjectName, dbName, tableName).getPartitions();
        }

        @Override
        public Iterator<Partition> getPartitionIterator(Odps mcClient, String dbName, String tableName) {
            return mcClient.tables().get(defaultProjectName, dbName, tableName).getPartitions().iterator();
        }

        @Override
        public boolean tableExist(Odps mcClient, String dbName, String tableName) throws RuntimeException {
            try {
                return mcClient.tables().exists(defaultProjectName, dbName, tableName);
            } catch (OdpsException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public boolean databaseExist(Odps mcClient, String dbName) throws RuntimeException {
            try {
                return mcClient.schemas().exists(dbName);
            } catch (OdpsException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public TableIdentifier getTableIdentifier(String dbName, String tableName) {
            return TableIdentifier.of(defaultProjectName, dbName, tableName);
        }

        @Override
        public Table getOdpsTable(Odps mcClient, String dbName, String tableName) {
            return mcClient.tables().get(defaultProjectName, dbName, tableName);
        }
    }

    /**
     * `mc.enable.namespace.schema` = false.
     * mapping structure between Doris and MaxCompute:
     * Doris     : dbName, tableName
     * MaxCompute: project, table
     */
    class ProjectTableHelper implements McStructureHelper {
        private String catalogOwner = null;

        @Override
        public boolean tableExist(Odps mcClient, String dbName, String tableName) throws RuntimeException {
            try {
                return mcClient.tables().exists(dbName, tableName);
            } catch (OdpsException e) {
                throw new RuntimeException(e);
            }
        }


        @Override
        public List<String> listTableNames(Odps mcClient, String dbName) {
            List<String> result = new ArrayList<>();
            mcClient.tables().iterable(dbName).forEach(e -> result.add(e.getName()));
            return result;
        }

        @Override
        public List<String> listDatabaseNames(Odps mcClient, String defaultProject) {
            List<String> result = new ArrayList<>();
            result.add(defaultProject);
            try {
                result.add(defaultProject);
                if (StringUtils.isNullOrEmpty(catalogOwner)) {
                    SecurityManager sm = mcClient.projects().get().getSecurityManager();
                    String whoami = sm.runQuery("whoami", false);

                    JsonObject js = JsonParser.parseString(whoami).getAsJsonObject();
                    catalogOwner = js.get("DisplayName").getAsString();
                }
                Iterator<Project> iterator = mcClient.projects().iterator(catalogOwner);
                while (iterator.hasNext()) {
                    Project project = iterator.next();
                    if (!project.getName().equals(defaultProject)) {
                        result.add(project.getName());
                    }
                }
            } catch (OdpsException e) {
                throw new RuntimeException(e);
            }
            return result;
        }

        @Override
        public List<Partition> getPartitions(Odps mcClient, String dbName, String tableName) {
            return mcClient.tables().get(dbName, tableName).getPartitions();
        }

        @Override
        public Iterator<Partition> getPartitionIterator(Odps mcClient, String dbName, String tableName) {
            return mcClient.tables().get(dbName, tableName).getPartitions().iterator();
        }

        @Override
        public boolean databaseExist(Odps mcClient, String dbName) throws RuntimeException {
            try {
                return mcClient.projects().exists(dbName);
            } catch (OdpsException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public TableIdentifier getTableIdentifier(String dbName, String tableName) {
            return TableIdentifier.of(dbName, tableName);
        }


        @Override
        public Table getOdpsTable(Odps mcClient, String dbName, String tableName) {
            return mcClient.tables().get(dbName, tableName);
        }
    }

    static McStructureHelper getHelper(boolean isEnableNamespaceSchema, String defaultProjectName) {
        return isEnableNamespaceSchema
                ? new ProjectSchemaTableHelper(defaultProjectName)
                : new ProjectTableHelper();
    }
}