OdbcTable.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TOdbcTable;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class OdbcTable extends Table {
    private static final Logger LOG = LogManager.getLogger(OlapTable.class);

    public static final String ODBC_CATALOG_RESOURCE = "odbc_catalog_resource";
    public static final String ODBC_HOST = "host";
    public static final String ODBC_PORT = "port";
    public static final String ODBC_USER = "user";
    public static final String ODBC_PASSWORD = "password";
    public static final String ODBC_DATABASE = "database";
    public static final String ODBC_TABLE = "table";
    public static final String ODBC_DRIVER = "driver";
    public static final String ODBC_TYPE = "odbc_type";
    public static final String ODBC_CHARSET = "charset";
    public static final String ODBC_EXTRA_PARAM = "extra_param";


    // map now odbc external table Doris support now
    private static Map<String, TOdbcTableType> TABLE_TYPE_MAP;

    static {
        Map<String, TOdbcTableType> tempMap = new HashMap<>();
        tempMap.put("oracle", TOdbcTableType.ORACLE);
        tempMap.put("mysql", TOdbcTableType.MYSQL);
        tempMap.put("postgresql", TOdbcTableType.POSTGRESQL);
        tempMap.put("sqlserver", TOdbcTableType.SQLSERVER);
        TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
    }

    @SerializedName("ocrn")
    private String odbcCatalogResourceName;
    @SerializedName("h")
    private String host;
    @SerializedName("p")
    private String port;
    @SerializedName("un")
    private String userName;
    @SerializedName("pwd")
    private String passwd;
    @SerializedName("odn")
    private String odbcDatabaseName;
    @SerializedName("otn")
    private String odbcTableName;
    @SerializedName("d")
    private String driver;
    @SerializedName("ottn")
    private String odbcTableTypeName;
    @SerializedName("c")
    private String charset;
    @SerializedName("ep")
    private String extraParam;
    private Map<String, String> resourceProperties;

    public OdbcTable() {
        super(TableType.ODBC);
    }

    public OdbcTable(long id, String name, List<Column> schema, Map<String, String> properties)
            throws DdlException {
        super(id, name, TableType.ODBC, schema);
        validate(properties);
    }

    private void validate(Map<String, String> properties) throws DdlException {
        if (properties == null) {
            throw new DdlException("Please set properties of odbc table, "
                    + "they are: odbc_catalog_resource or [host, port, user, password, driver, odbc_type]"
                    + " and database and table");
        }
        if (properties.containsKey(ODBC_CATALOG_RESOURCE)) {
            odbcCatalogResourceName = properties.get(ODBC_CATALOG_RESOURCE);

            // 1. check whether resource exist
            Resource oriResource = Env.getCurrentEnv().getResourceMgr().getResource(odbcCatalogResourceName);
            if (oriResource == null) {
                throw new DdlException("Resource does not exist. name: " + odbcCatalogResourceName);
            }

            // 2. check resource usage privilege
            if (!Env.getCurrentEnv().getAccessManager().checkResourcePriv(ConnectContext.get(),
                    odbcCatalogResourceName,
                    PrivPredicate.USAGE)) {
                throw new DdlException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser()
                        + "'@'" + ConnectContext.get().getRemoteIP()
                        + "' for resource '" + odbcCatalogResourceName + "'");
            }
            resourceProperties = new HashMap<>(oriResource.getCopiedProperties());
            resourceProperties.remove(ODBC_HOST);
            resourceProperties.remove(ODBC_PORT);
            resourceProperties.remove(ODBC_USER);
            resourceProperties.remove(ODBC_PASSWORD);
            resourceProperties.remove(ODBC_DRIVER);
            resourceProperties.remove(ODBC_CHARSET);
            resourceProperties.remove(ODBC_TYPE);
            resourceProperties.remove("type");
            resourceProperties.remove(ODBC_DATABASE);
        } else {
            Map<String, String> copiedProperties = new HashMap<>();
            copiedProperties.putAll(properties);
            // Set up
            host = properties.get(ODBC_HOST);
            if (Strings.isNullOrEmpty(host)) {
                throw new DdlException("Host of Odbc table is null. "
                        + "Please set proper resource or add properties('host'='xxx.xxx.xxx.xxx') when create table");
            }
            copiedProperties.remove(ODBC_HOST);

            port = properties.get(ODBC_PORT);
            if (Strings.isNullOrEmpty(port)) {
                // Maybe null pointer or number convert
                throw new DdlException("Port of Odbc table is null. "
                        + "Please set odbc_catalog_resource or add properties('port'='3306') when create table");
            } else {
                try {
                    Integer.valueOf(port);
                } catch (Exception e) {
                    throw new DdlException("Port of Odbc table must be a number."
                            + "Please set odbc_catalog_resource or add properties('port'='3306') when create table");

                }
            }
            copiedProperties.remove(ODBC_PORT);

            userName = properties.get(ODBC_USER);
            if (Strings.isNullOrEmpty(userName)) {
                throw new DdlException("User of Odbc table is null. "
                        + "Please set odbc_catalog_resource or add properties('user'='root') when create table");
            }
            copiedProperties.remove(ODBC_USER);

            passwd = properties.get(ODBC_PASSWORD);
            if (passwd == null) {
                throw new DdlException("Password of Odbc table is null. "
                        + "Please set odbc_catalog_resource or add properties('password'='xxxx') when create table");
            }
            copiedProperties.remove(ODBC_PASSWORD);

            driver = properties.get(ODBC_DRIVER);
            if (Strings.isNullOrEmpty(driver)) {
                throw new DdlException("Driver of Odbc table is null. "
                        + "Please set odbc_catalog_resource or add properties('diver'='xxxx') when create table");
            }
            copiedProperties.remove(ODBC_DRIVER);


            charset = properties.get(ODBC_CHARSET);
            copiedProperties.remove(ODBC_CHARSET);

            String tableType = properties.get(ODBC_TYPE);
            if (Strings.isNullOrEmpty(tableType)) {
                throw new DdlException("Type of Odbc table is null. "
                        + "Please set odbc_catalog_resource or add properties('odbc_type'='xxxx') when create table");
            } else {
                odbcTableTypeName = tableType.toLowerCase();
                if (!TABLE_TYPE_MAP.containsKey(odbcTableTypeName)) {
                    throw new DdlException("Invalid Odbc table type:" + tableType
                            + " Now Odbc table type only support:" + supportTableType());
                }
            }
            copiedProperties.remove(ODBC_TYPE);
            copiedProperties.remove(ODBC_DATABASE);
            copiedProperties.remove(ODBC_TABLE);
            extraParam = getExtraParameter(copiedProperties);
        }

        odbcDatabaseName = properties.get(ODBC_DATABASE);
        if (Strings.isNullOrEmpty(odbcDatabaseName)) {
            throw new DdlException("Database of Odbc table is null. "
                    + "Please add properties('database'='xxxx') when create table");
        }

        odbcTableName = properties.get(ODBC_TABLE);
        if (Strings.isNullOrEmpty(odbcTableName)) {
            throw new DdlException("Table of Odbc table is null. "
                    + "Please add properties('table'='xxxx') when create table");
        }
    }

    private String getPropertyFromResource(String propertyName) {
        OdbcCatalogResource odbcCatalogResource = (OdbcCatalogResource)
                (Env.getCurrentEnv().getResourceMgr().getResource(odbcCatalogResourceName));
        if (odbcCatalogResource == null) {
            throw new RuntimeException("Resource does not exist. name: " + odbcCatalogResourceName);
        }

        String property = odbcCatalogResource.getProperty(propertyName);
        if (property == null) {
            throw new RuntimeException("The property:" + propertyName
                    + " do not set in resource " + odbcCatalogResourceName);
        }
        return property;
    }

    public String getExtraParameter(Map<String, String> extraMap) {
        if (extraMap == null || extraMap.isEmpty()) {
            return "";
        }
        return ";" + extraMap.entrySet()
                .stream()
                .map(e -> e.getKey() + "=" + e.getValue())
                .collect(Collectors.joining(";"));
    }

    public String getExtraParam() {
        if (extraParam != null) {
            return extraParam;
        }
        return getExtraParameter(resourceProperties);
    }

    public String getOdbcCatalogResourceName() {
        return odbcCatalogResourceName;
    }

    public String getHost() {
        if (host != null) {
            return host;
        }
        return getPropertyFromResource(ODBC_HOST);
    }

    public String getPort() {
        if (port != null) {
            return port;
        }
        return getPropertyFromResource(ODBC_PORT);
    }

    public String getUserName() {
        if (userName != null) {
            return userName;
        }
        return getPropertyFromResource(ODBC_USER);
    }

    public String getPasswd() {
        if (passwd != null) {
            return passwd;
        }
        return getPropertyFromResource(ODBC_PASSWORD);
    }

    public String getOdbcDatabaseName() {
        return odbcDatabaseName;
    }

    public String getOdbcTableName() {
        return odbcTableName;
    }

    public String getOdbcDriver() {
        if (driver != null) {
            return driver;
        }
        return getPropertyFromResource(ODBC_DRIVER);
    }

    public String getCharset() {
        if (charset != null) {
            return charset;
        }
        String resourceCharset = "utf8";
        try {
            resourceCharset = getPropertyFromResource(ODBC_CHARSET);
        } catch (Exception e) {
            LOG.info(e.getMessage());
        }

        return resourceCharset;
    }

    public String getOdbcTableTypeName() {
        if (odbcTableTypeName != null) {
            return odbcTableTypeName;
        }
        return getPropertyFromResource(ODBC_TYPE);
    }

    public String getConnectString() {
        String connectString = "";
        // different database have different connection string
        switch (getOdbcTableType()) {
            case ORACLE:
                connectString = String.format("Driver=%s;Dbq=//%s:%s/%s;DataBase=%s;Uid=%s;Pwd=%s;charset=%s",
                        getOdbcDriver(),
                        getHost(),
                        getPort(),
                        getOdbcDatabaseName(),
                        getOdbcDatabaseName(),
                        getUserName(),
                        getPasswd(),
                        getCharset());
                break;
            case POSTGRESQL:
                connectString = String.format("Driver=%s;Server=%s;Port=%s;DataBase=%s;"
                                + "Uid=%s;Pwd=%s;charset=%s;UseDeclareFetch=1;Fetch=4096",
                        getOdbcDriver(),
                        getHost(),
                        getPort(),
                        getOdbcDatabaseName(),
                        getUserName(),
                        getPasswd(),
                        getCharset());
                break;
            case MYSQL:
                connectString = String.format("Driver=%s;Server=%s;Port=%s;DataBase=%s;"
                                + "Uid=%s;Pwd=%s;charset=%s;forward_cursor=1;no_cache=1",
                        getOdbcDriver(),
                        getHost(),
                        getPort(),
                        getOdbcDatabaseName(),
                        getUserName(),
                        getPasswd(),
                        getCharset());
                break;
            case SQLSERVER:
                connectString = String.format("Driver=%s;Server=%s,%s;DataBase=%s;Uid=%s;Pwd=%s",
                        getOdbcDriver(),
                        getHost(),
                        getPort(),
                        getOdbcDatabaseName(),
                        getUserName(),
                        getPasswd());
                break;
            default:
        }
        return connectString + getExtraParam();
    }

    public TOdbcTableType getOdbcTableType() {
        return TABLE_TYPE_MAP.get(getOdbcTableTypeName());
    }

    @Override
    public OdbcTable clone() {
        OdbcTable copied = DeepCopy.copy(this, OdbcTable.class, FeConstants.meta_version);
        if (copied == null) {
            LOG.warn("failed to copy odbc table: " + getName());
            return null;
        }
        return copied;
    }

    public void resetIdsForRestore(Env env) {
        id = env.getNextId();
    }

    public TTableDescriptor toThrift() {
        TOdbcTable tOdbcTable = new TOdbcTable();

        tOdbcTable.setHost(getHost());
        tOdbcTable.setPort(getPort());
        tOdbcTable.setUser(getUserName());
        tOdbcTable.setPasswd(getPasswd());
        tOdbcTable.setDb(getOdbcDatabaseName());
        tOdbcTable.setTable(getOdbcTableName());
        tOdbcTable.setDriver(getOdbcDriver());
        tOdbcTable.setType(getOdbcTableType());

        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ODBC_TABLE,
                fullSchema.size(), 0, getName(), "");
        tTableDescriptor.setOdbcTable(tOdbcTable);
        return tTableDescriptor;
    }

    @Override
    public String getSignature(int signatureVersion) {
        StringBuilder sb = new StringBuilder(signatureVersion);
        sb.append(name);
        sb.append(type);
        if (odbcCatalogResourceName != null) {
            sb.append(odbcCatalogResourceName);
            sb.append(odbcDatabaseName);
            sb.append(odbcTableName);
        } else {
            sb.append(host);
            sb.append(port);
            sb.append(userName);
            sb.append(passwd);
            sb.append(driver);
            sb.append(odbcTableTypeName);
            sb.append(charset);
            sb.append(extraParam);
        }
        String md5 = DigestUtils.md5Hex(sb.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString());
        }
        return md5;
    }

    @Deprecated
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);

        // Read Odbc meta
        int size = in.readInt();
        Map<String, String> serializeMap = Maps.newHashMap();
        for (int i = 0; i < size; i++) {
            String key = Text.readString(in);
            String value = Text.readString(in);
            serializeMap.put(key, value);
        }

        odbcCatalogResourceName = serializeMap.get(ODBC_CATALOG_RESOURCE);
        host = serializeMap.get(ODBC_HOST);
        port = serializeMap.get(ODBC_PORT);
        userName = serializeMap.get(ODBC_USER);
        passwd = serializeMap.get(ODBC_PASSWORD);
        odbcDatabaseName = serializeMap.get(ODBC_DATABASE);
        odbcTableName = serializeMap.get(ODBC_TABLE);
        driver = serializeMap.get(ODBC_DRIVER);
        odbcTableTypeName = serializeMap.get(ODBC_TYPE);
        charset = serializeMap.get(ODBC_CHARSET);
        extraParam = serializeMap.get(ODBC_EXTRA_PARAM);
    }

    public static String supportTableType() {
        String supportTable = "";
        for (String table : TABLE_TYPE_MAP.keySet()) {
            supportTable += table + " ";
        }
        return supportTable;
    }
}