JdbcTable.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.catalog.Resource.ResourceType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.thrift.TJdbcTable;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Setter
public class JdbcTable extends Table {
    private static final Logger LOG = LogManager.getLogger(JdbcTable.class);

    private static final ObjectMapper objectMapper = new ObjectMapper();

    private static final String CATALOG_ID = "catalog_id";
    private static final String TABLE = "table";
    private static final String REMOTE_DATABASE = "remote_database";
    private static final String REMOTE_TABLE = "remote_table";
    private static final String REMOTE_COLUMNS = "remote_columns";
    private static final String RESOURCE = "resource";
    private static final String TABLE_TYPE = "table_type";
    private static final String URL = "jdbc_url";
    private static final String USER = "user";
    private static final String PASSWORD = "password";
    private static final String DRIVER_CLASS = "driver_class";
    private static final String DRIVER_URL = "driver_url";
    private static final String CHECK_SUM = "checksum";
    private static Map<String, TOdbcTableType> TABLE_TYPE_MAP;
    @SerializedName("rn")
    private String resourceName;
    @SerializedName("etn")
    private String externalTableName;

    // real name only for jdbc catalog
    @SerializedName("rdn")
    private String remoteDatabaseName;
    @SerializedName("rtn")
    private String remoteTableName;
    @SerializedName("rcn")
    private Map<String, String> remoteColumnNames;

    @SerializedName("jtn")
    private String jdbcTypeName;

    @SerializedName("jurl")
    private String jdbcUrl;
    @SerializedName("jusr")
    private String jdbcUser;
    @SerializedName("jpwd")
    private String jdbcPasswd;
    @SerializedName("dc")
    private String driverClass;
    @SerializedName("du")
    private String driverUrl;
    @SerializedName("cs")
    private String checkSum;

    @SerializedName("cid")
    private long catalogId = -1;

    private int connectionPoolMinSize;
    private int connectionPoolMaxSize;
    private int connectionPoolMaxWaitTime;
    private int connectionPoolMaxLifeTime;
    private boolean connectionPoolKeepAlive;

    static {
        Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap();
        tempMap.put("mysql", TOdbcTableType.MYSQL);
        tempMap.put("postgresql", TOdbcTableType.POSTGRESQL);
        tempMap.put("sqlserver", TOdbcTableType.SQLSERVER);
        tempMap.put("oracle", TOdbcTableType.ORACLE);
        tempMap.put("clickhouse", TOdbcTableType.CLICKHOUSE);
        tempMap.put("sap_hana", TOdbcTableType.SAP_HANA);
        tempMap.put("trino", TOdbcTableType.TRINO);
        tempMap.put("presto", TOdbcTableType.PRESTO);
        tempMap.put("oceanbase", TOdbcTableType.OCEANBASE);
        tempMap.put("oceanbase_oracle", TOdbcTableType.OCEANBASE_ORACLE);
        tempMap.put("db2", TOdbcTableType.DB2);
        tempMap.put("gbase", TOdbcTableType.GBASE);
        TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
    }

    public JdbcTable() {
        super(TableType.JDBC);
    }

    public JdbcTable(long id, String name, List<Column> schema, Map<String, String> properties)
            throws DdlException {
        super(id, name, TableType.JDBC, schema);
        validate(properties);
    }

    public JdbcTable(long id, String name, List<Column> schema, TableType type) {
        super(id, name, type, schema);
    }

    public String getInsertSql(List<String> insertCols) {
        StringBuilder sb = new StringBuilder("INSERT INTO ");
        sb.append(getProperRemoteFullTableName(TABLE_TYPE_MAP.get(getTableTypeName())));
        sb.append("(");
        List<String> transformedInsertCols = insertCols.stream()
                .map(col -> getProperRemoteColumnName(TABLE_TYPE_MAP.get(getTableTypeName()), col))
                .collect(Collectors.toList());
        sb.append(String.join(",", transformedInsertCols));
        sb.append(")");
        sb.append(" VALUES (");
        for (int i = 0; i < insertCols.size(); ++i) {
            if (i != 0) {
                sb.append(", ");
            }
            sb.append("?");
        }
        sb.append(")");
        return sb.toString();
    }

    public String getCheckSum() {
        return checkSum;
    }

    public String getExternalTableName() {
        return externalTableName;
    }

    public String getJdbcTypeName() {
        return jdbcTypeName;
    }

    public String getJdbcUrl() {
        return getFromJdbcResourceOrDefault(JdbcResource.JDBC_URL, jdbcUrl);
    }

    public String getJdbcUser() {
        return getFromJdbcResourceOrDefault(JdbcResource.USER, jdbcUser);
    }

    public String getJdbcPasswd() {
        return getFromJdbcResourceOrDefault(JdbcResource.PASSWORD, jdbcPasswd);
    }

    public String getDriverClass() {
        return getFromJdbcResourceOrDefault(JdbcResource.DRIVER_CLASS, driverClass);
    }

    public String getDriverUrl() {
        return getFromJdbcResourceOrDefault(JdbcResource.DRIVER_URL, driverUrl);
    }

    public long getCatalogId() {
        return catalogId;
    }

    public int getConnectionPoolMinSize() {
        return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE,
                String.valueOf(connectionPoolMinSize)));
    }

    public int getConnectionPoolMaxSize() {
        return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE,
                String.valueOf(connectionPoolMaxSize)));
    }

    public int getConnectionPoolMaxWaitTime() {
        return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME,
                String.valueOf(connectionPoolMaxWaitTime)));
    }

    public int getConnectionPoolMaxLifeTime() {
        return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME,
                String.valueOf(connectionPoolMaxLifeTime)));
    }

    public boolean isConnectionPoolKeepAlive() {
        return Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
                String.valueOf(connectionPoolKeepAlive)));
    }

    private String getFromJdbcResourceOrDefault(String key, String defaultVal) {
        if (Strings.isNullOrEmpty(resourceName)) {
            return defaultVal;
        }
        Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(resourceName);
        if (resource instanceof JdbcResource) {
            return ((JdbcResource) resource).getProperty(key);
        }
        return defaultVal;
    }

    @Override
    public TTableDescriptor toThrift() {
        TJdbcTable tJdbcTable = new TJdbcTable();
        tJdbcTable.setCatalogId(catalogId);
        tJdbcTable.setJdbcUrl(getJdbcUrl());
        tJdbcTable.setJdbcUser(getJdbcUser());
        tJdbcTable.setJdbcPassword(getJdbcPasswd());
        tJdbcTable.setJdbcTableName(externalTableName);
        tJdbcTable.setJdbcDriverClass(getDriverClass());
        tJdbcTable.setJdbcDriverUrl(getDriverUrl());
        tJdbcTable.setJdbcResourceName(resourceName);
        tJdbcTable.setJdbcDriverChecksum(checkSum);
        tJdbcTable.setConnectionPoolMinSize(getConnectionPoolMinSize());
        tJdbcTable.setConnectionPoolMaxSize(getConnectionPoolMaxSize());
        tJdbcTable.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime());
        tJdbcTable.setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime());
        tJdbcTable.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0,
                getName(), "");
        tTableDescriptor.setJdbcTable(tJdbcTable);
        return tTableDescriptor;
    }

    @Deprecated
    @Override
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);

        int size = in.readInt();
        Map<String, String> serializeMap = Maps.newHashMap();
        for (int i = 0; i < size; i++) {
            String key = Text.readString(in);
            String value = Text.readString(in);
            serializeMap.put(key, value);
        }
        catalogId = serializeMap.get(CATALOG_ID) != null ? Long.parseLong(serializeMap.get(CATALOG_ID)) : -1;
        externalTableName = serializeMap.get(TABLE);
        resourceName = serializeMap.get(RESOURCE);
        jdbcTypeName = serializeMap.get(TABLE_TYPE);
        jdbcUrl = serializeMap.get(URL);
        jdbcUser = serializeMap.get(USER);
        jdbcPasswd = serializeMap.get(PASSWORD);
        driverClass = serializeMap.get(DRIVER_CLASS);
        driverUrl = serializeMap.get(DRIVER_URL);
        checkSum = serializeMap.get(CHECK_SUM);
        remoteDatabaseName = serializeMap.get(REMOTE_DATABASE);
        remoteTableName = serializeMap.get(REMOTE_TABLE);
        String realColumnNamesJson = serializeMap.get(REMOTE_COLUMNS);
        if (realColumnNamesJson != null) {
            remoteColumnNames = objectMapper.readValue(realColumnNamesJson, new TypeReference<Map<String, String>>() {
            });
        } else {
            remoteColumnNames = Maps.newHashMap();
        }
    }

    public String getResourceName() {
        return resourceName;
    }

    public String getJdbcTable() {
        return externalTableName;
    }

    public String getRemoteDatabaseName() {
        return remoteDatabaseName;
    }

    public String getRemoteTableName() {
        return remoteTableName;
    }

    public String getProperRemoteFullTableName(TOdbcTableType tableType) {
        if (remoteDatabaseName == null || remoteTableName == null) {
            return databaseProperName(tableType, externalTableName);
        } else {
            return properNameWithRemoteName(tableType, remoteDatabaseName) + "." + properNameWithRemoteName(tableType,
                    remoteTableName);
        }
    }

    public String getProperRemoteColumnName(TOdbcTableType tableType, String columnName) {
        if (remoteColumnNames == null || remoteColumnNames.isEmpty() || !remoteColumnNames.containsKey(columnName)) {
            return databaseProperName(tableType, columnName);
        } else {
            return properNameWithRemoteName(tableType, remoteColumnNames.get(columnName));
        }
    }

    public String getTableTypeName() {
        return jdbcTypeName;
    }

    public TOdbcTableType getJdbcTableType() {
        return TABLE_TYPE_MAP.get(getTableTypeName());
    }

    @Override
    public String getSignature(int signatureVersion) {
        StringBuilder sb = new StringBuilder(signatureVersion);
        sb.append(name);
        sb.append(type);
        sb.append(resourceName);
        sb.append(externalTableName);
        sb.append(jdbcUrl);
        sb.append(jdbcUser);
        sb.append(jdbcPasswd);
        sb.append(driverClass);
        sb.append(driverUrl);
        sb.append(checkSum);

        String md5 = DigestUtils.md5Hex(sb.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString());
        }
        return md5;
    }

    @Override
    public JdbcTable clone() {
        JdbcTable copied = DeepCopy.copy(this, JdbcTable.class, FeConstants.meta_version);
        if (copied == null) {
            LOG.warn("failed to copy jdbc table: " + getName());
            return null;
        }
        return copied;
    }

    private void validate(Map<String, String> properties) throws DdlException {
        if (properties == null) {
            throw new DdlException("Please set properties of jdbc table, "
                    + "they are: host, port, user, password, database and table");
        }

        externalTableName = properties.get(TABLE);
        if (Strings.isNullOrEmpty(externalTableName)) {
            throw new DdlException("property " + TABLE + " must be set");
        }

        resourceName = properties.get(RESOURCE);
        if (Strings.isNullOrEmpty(resourceName)) {
            throw new DdlException("property " + RESOURCE + " must be set");
        }

        jdbcTypeName = properties.get(TABLE_TYPE);
        if (Strings.isNullOrEmpty(jdbcTypeName)) {
            throw new DdlException("property " + TABLE_TYPE + " must be set");
        }

        if (!TABLE_TYPE_MAP.containsKey(jdbcTypeName.toLowerCase())) {
            throw new DdlException("Unknown jdbc table type: " + jdbcTypeName);
        }

        Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(resourceName);
        if (resource == null) {
            throw new DdlException("jdbc resource [" + resourceName + "] not exists");
        }
        if (resource.getType() != ResourceType.JDBC) {
            throw new DdlException("resource [" + resourceName + "] is not jdbc resource");
        }

        JdbcResource jdbcResource = (JdbcResource) resource;
        jdbcUrl = jdbcResource.getProperty(URL);
        jdbcUser = jdbcResource.getProperty(USER);
        jdbcPasswd = jdbcResource.getProperty(PASSWORD);
        driverClass = jdbcResource.getProperty(DRIVER_CLASS);
        driverUrl = jdbcResource.getProperty(DRIVER_URL);
        checkSum = jdbcResource.getProperty(CHECK_SUM);
        connectionPoolMinSize = Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE));
        connectionPoolMaxSize = Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_SIZE));
        connectionPoolMaxWaitTime = Integer.parseInt(
                jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME));
        connectionPoolMaxLifeTime = Integer.parseInt(
                jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME));
        connectionPoolKeepAlive = Boolean.parseBoolean(
                jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE));

        String urlType = jdbcUrl.split(":")[1];
        if (!jdbcTypeName.equalsIgnoreCase(urlType)) {
            if (!(jdbcTypeName.equalsIgnoreCase("oceanbase_oracle") && urlType.equalsIgnoreCase("oceanbase"))
                    && !(jdbcTypeName.equalsIgnoreCase("sap_hana") && urlType.equalsIgnoreCase("sap"))) {
                throw new DdlException("property " + TABLE_TYPE + " must be same with resource url");
            }
        }
    }

    /**
     * Formats the provided name (for example, a database, table, or schema name) according to the specified parameters.
     *
     * @param name The name to be formatted.
     * @param wrapStart The character(s) to be added at the start of each name component.
     * @param wrapEnd The character(s) to be added at the end of each name component.
     * @param toUpperCase If true, convert the name to upper case.
     * @param toLowerCase If true, convert the name to lower case.
     *         <p>
     *         Note: If both toUpperCase and toLowerCase are true, the name will ultimately be converted to lower case.
     *         <p>
     *         The name is expected to be in the format of 'schemaName.tableName'. If there is no '.',
     *         the function will treat the entire string as one name component.
     *         If there is a '.', the function will treat the string before the first '.' as the schema name
     *         and the string after the '.' as the table name.
     * @return The formatted name.
     */
    public static String formatName(String name, String wrapStart, String wrapEnd, boolean toUpperCase,
            boolean toLowerCase) {
        int index = name.indexOf(".");
        if (index == -1) { // No dot in the name
            String newName = toUpperCase ? name.toUpperCase() : name;
            newName = toLowerCase ? newName.toLowerCase() : newName;
            return wrapStart + newName + wrapEnd;
        } else {
            String schemaName = toUpperCase ? name.substring(0, index).toUpperCase() : name.substring(0, index);
            schemaName = toLowerCase ? schemaName.toLowerCase() : schemaName;
            String tableName = toUpperCase ? name.substring(index + 1).toUpperCase() : name.substring(index + 1);
            tableName = toLowerCase ? tableName.toLowerCase() : tableName;
            return wrapStart + schemaName + wrapEnd + "." + wrapStart + tableName + wrapEnd;
        }
    }

    /**
     * Formats a database name according to the database type.
     * <p>
     * Rules:
     * - MYSQL, OCEANBASE: Wrap with backticks (`), case unchanged. Example: mySchema.myTable -> `mySchema.myTable`
     * - SQLSERVER: Wrap with square brackets ([]), case unchanged. Example: mySchema.myTable -> [mySchema].[myTable]
     * - POSTGRESQL, CLICKHOUSE, TRINO, OCEANBASE_ORACLE, SAP_HANA: Wrap with double quotes ("), case unchanged.
     * Example: mySchema.myTable -> "mySchema"."myTable"
     * - ORACLE: Wrap with double quotes ("), convert to upper case. Example: mySchema.myTable -> "MYSCHEMA"."MYTABLE"
     * For other types, the name is returned as is.
     *
     * @param tableType The database type.
     * @param name The name to be formatted, expected in 'schemaName.tableName' format. If no '.', treats entire string
     *         as one name component. If '.', treats string before first '.' as schema name and after as table name.
     * @return The formatted name.
     */
    public static String databaseProperName(TOdbcTableType tableType, String name) {
        switch (tableType) {
            case MYSQL:
            case OCEANBASE:
            case GBASE:
                return formatName(name, "`", "`", false, false);
            case SQLSERVER:
                return formatName(name, "[", "]", false, false);
            case POSTGRESQL:
            case CLICKHOUSE:
            case TRINO:
            case PRESTO:
            case OCEANBASE_ORACLE:
            case SAP_HANA:
                return formatName(name, "\"", "\"", false, false);
            case ORACLE:
            case DB2:
                return formatName(name, "\"", "\"", true, false);
            default:
                return name;
        }
    }

    public static String properNameWithRemoteName(TOdbcTableType tableType, String remoteName) {
        switch (tableType) {
            case MYSQL:
            case OCEANBASE:
            case GBASE:
                return formatNameWithRemoteName(remoteName, "`", "`");
            case SQLSERVER:
                return formatNameWithRemoteName(remoteName, "[", "]");
            case POSTGRESQL:
            case CLICKHOUSE:
            case TRINO:
            case PRESTO:
            case OCEANBASE_ORACLE:
            case ORACLE:
            case SAP_HANA:
            case DB2:
                return formatNameWithRemoteName(remoteName, "\"", "\"");
            default:
                return remoteName;
        }
    }

    public static String formatNameWithRemoteName(String remoteName, String wrapStart, String wrapEnd) {
        return wrapStart + remoteName + wrapEnd;
    }
}