JdbcResource.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.EnvUtils;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.binary.Hex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* External JDBC Catalog resource for external table query.
* <p>
* create external resource jdbc_mysql
* properties (
* "type"="jdbc",
* "user"="root",
* "password"="123456",
* "jdbc_url"="jdbc:mysql://127.0.0.1:3306/test",
* "driver_url"="http://127.0.0.1:8888/mysql-connector-java-5.1.47.jar",
* "driver_class"="com.mysql.jdbc.Driver"
* );
* <p>
* DROP RESOURCE "jdbc_mysql";
*/
public class JdbcResource extends Resource {
private static final Logger LOG = LogManager.getLogger(JdbcResource.class);
public static final String JDBC_MYSQL = "jdbc:mysql";
public static final String JDBC_MARIADB = "jdbc:mariadb";
public static final String JDBC_POSTGRESQL = "jdbc:postgresql";
public static final String JDBC_ORACLE = "jdbc:oracle";
public static final String JDBC_SQLSERVER = "jdbc:sqlserver";
public static final String JDBC_CLICKHOUSE = "jdbc:clickhouse";
public static final String JDBC_SAP_HANA = "jdbc:sap";
public static final String JDBC_TRINO = "jdbc:trino";
public static final String JDBC_PRESTO = "jdbc:presto";
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
public static final String JDBC_DB2 = "jdbc:db2";
public static final String JDBC_GBASE = "jdbc:gbase";
public static final String MYSQL = "MYSQL";
public static final String POSTGRESQL = "POSTGRESQL";
public static final String ORACLE = "ORACLE";
public static final String SQLSERVER = "SQLSERVER";
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String SAP_HANA = "SAP_HANA";
public static final String TRINO = "TRINO";
public static final String PRESTO = "PRESTO";
public static final String OCEANBASE = "OCEANBASE";
public static final String OCEANBASE_ORACLE = "OCEANBASE_ORACLE";
public static final String DB2 = "DB2";
public static final String GBASE = "GBASE";
public static final String JDBC_PROPERTIES_PREFIX = "jdbc.";
public static final String JDBC_URL = "jdbc_url";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String DRIVER_CLASS = "driver_class";
public static final String DRIVER_URL = "driver_url";
public static final String TYPE = "type";
public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database";
public static final String CONNECTION_POOL_MIN_SIZE = "connection_pool_min_size";
public static final String CONNECTION_POOL_MAX_SIZE = "connection_pool_max_size";
public static final String CONNECTION_POOL_MAX_WAIT_TIME = "connection_pool_max_wait_time";
public static final String CONNECTION_POOL_MAX_LIFE_TIME = "connection_pool_max_life_time";
public static final String CONNECTION_POOL_KEEP_ALIVE = "connection_pool_keep_alive";
public static final String CHECK_SUM = "checksum";
public static final String CREATE_TIME = "create_time";
public static final String TEST_CONNECTION = "test_connection";
private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
USER,
PASSWORD,
DRIVER_CLASS,
DRIVER_URL,
TYPE,
CREATE_TIME,
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_META_NAMES,
META_NAMES_MAPPING,
INCLUDE_DATABASE_LIST,
EXCLUDE_DATABASE_LIST,
CONNECTION_POOL_MIN_SIZE,
CONNECTION_POOL_MAX_SIZE,
CONNECTION_POOL_MAX_LIFE_TIME,
CONNECTION_POOL_MAX_WAIT_TIME,
CONNECTION_POOL_KEEP_ALIVE,
TEST_CONNECTION,
ExternalCatalog.USE_META_CACHE
).build();
// The default value of optional properties
// if one optional property is not specified, will use default value
private static final Map<String, String> OPTIONAL_PROPERTIES_DEFAULT_VALUE = Maps.newHashMap();
static {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_META_NAMES, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(META_NAMES_MAPPING, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "30");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "1800000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ExternalCatalog.USE_META_CACHE,
String.valueOf(ExternalCatalog.DEFAULT_USE_META_CACHE));
}
// timeout for both connection and read. 10 seconds is long enough.
private static final int HTTP_TIMEOUT_MS = 10000;
@SerializedName(value = "configs")
private Map<String, String> configs;
public JdbcResource() {
super();
}
public JdbcResource(String name) {
this(name, Maps.newHashMap());
}
public JdbcResource(String name, Map<String, String> configs) {
super(name, ResourceType.JDBC);
this.configs = configs;
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
// modify properties
for (String propertyKey : ALL_PROPERTIES) {
replaceIfEffectiveValue(this.configs, propertyKey, properties.get(propertyKey));
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
super.modifyProperties(properties);
}
@Override
public void checkProperties(Map<String, String> properties) throws AnalysisException {
Map<String, String> copiedProperties = Maps.newHashMap(properties);
// check properties
for (String propertyKey : ALL_PROPERTIES) {
copiedProperties.remove(propertyKey);
}
if (!copiedProperties.isEmpty()) {
throw new AnalysisException("Unknown JDBC catalog resource properties: " + copiedProperties);
}
}
@Override
protected void setProperties(ImmutableMap<String, String> properties) throws DdlException {
Preconditions.checkState(properties != null);
this.configs = Maps.newHashMap(properties);
validateProperties(this.configs);
applyDefaultProperties();
String currentDateTime = LocalDateTime.now(ZoneId.systemDefault()).toString().replace("T", " ");
configs.put(CREATE_TIME, currentDateTime);
// check properties
for (String property : ALL_PROPERTIES) {
String value = configs.get(property);
if (value == null) {
throw new DdlException("JdbcResource Missing " + property + " in properties");
}
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL)));
}
/**
* This function used to handle optional arguments
* eg: only_specified_databasećlower_case_table_names
*/
@Override
public void applyDefaultProperties() {
for (String s : OPTIONAL_PROPERTIES_DEFAULT_VALUE.keySet()) {
if (!configs.containsKey(s)) {
configs.put(s, OPTIONAL_PROPERTIES_DEFAULT_VALUE.get(s));
}
}
}
@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(configs);
}
@Override
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();
for (Map.Entry<String, String> entry : configs.entrySet()) {
// it's dangerous to show password in show jdbc resource
// so we use empty string to replace the real password
if (entry.getKey().equals(PASSWORD)) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), ""));
} else {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
}
public String getProperty(String propertiesKey) {
// check the properties key
return configs.get(propertiesKey);
}
public static String computeObjectChecksum(String driverPath) throws DdlException {
if (FeConstants.runningUnitTest) {
// skip checking checksum when running ut
return "";
}
String fullDriverUrl = getFullDriverUrl(driverPath);
try (InputStream inputStream =
Util.getInputStreamFromUrl(fullDriverUrl, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS)) {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buf = new byte[4096];
int bytesRead = 0;
do {
bytesRead = inputStream.read(buf);
if (bytesRead < 0) {
break;
}
digest.update(buf, 0, bytesRead);
} while (true);
return Hex.encodeHexString(digest.digest());
} catch (IOException e) {
throw new DdlException("compute driver checksum from url: " + driverPath
+ " meet an IOException: " + e.getMessage());
} catch (NoSuchAlgorithmException e) {
throw new DdlException("compute driver checksum from url: " + driverPath
+ " could not find algorithm: " + e.getMessage());
}
}
private static void checkCloudWhiteList(String driverUrl) throws IllegalArgumentException {
// For compatibility with cloud mode, we use both `jdbc_driver_url_white_list`
// and jdbc_driver_secure_path to check whitelist
List<String> cloudWhiteList = new ArrayList<>(Arrays.asList(Config.jdbc_driver_url_white_list));
cloudWhiteList.removeIf(String::isEmpty);
if (!cloudWhiteList.isEmpty() && !cloudWhiteList.contains(driverUrl)) {
throw new IllegalArgumentException("Driver URL does not match any allowed paths" + driverUrl);
}
}
public static String getFullDriverUrl(String driverUrl) throws IllegalArgumentException {
if (!(driverUrl.startsWith("file://") || driverUrl.startsWith("http://")
|| driverUrl.startsWith("https://") || driverUrl.matches("^[^:/]+\\.jar$"))) {
throw new IllegalArgumentException("Invalid driver URL format. Supported formats are: "
+ "file://xxx.jar, http://xxx.jar, https://xxx.jar, or xxx.jar (without prefix).");
}
try {
URI uri = new URI(driverUrl);
String schema = uri.getScheme();
checkCloudWhiteList(driverUrl);
if (schema == null && !driverUrl.startsWith("/")) {
return checkAndReturnDefaultDriverUrl(driverUrl);
}
if ("*".equals(Config.jdbc_driver_secure_path)) {
return driverUrl;
}
boolean isAllowed = Arrays.stream(Config.jdbc_driver_secure_path.split(";"))
.anyMatch(allowedPath -> driverUrl.startsWith(allowedPath.trim()));
if (!isAllowed) {
throw new IllegalArgumentException("Driver URL does not match any allowed paths: " + driverUrl);
}
return driverUrl;
} catch (URISyntaxException e) {
LOG.warn("invalid jdbc driver url: " + driverUrl);
return driverUrl;
}
}
private static String checkAndReturnDefaultDriverUrl(String driverUrl) {
final String defaultDriverUrl = EnvUtils.getDorisHome() + "/plugins/jdbc_drivers";
final String defaultOldDriverUrl = EnvUtils.getDorisHome() + "/jdbc_drivers";
if (Config.jdbc_drivers_dir.equals(defaultDriverUrl)) {
// If true, which means user does not set `jdbc_drivers_dir` and use the default one.
// Because in new version, we change the default value of `jdbc_drivers_dir`
// from `DORIS_HOME/jdbc_drivers` to `DORIS_HOME/plugins/jdbc_drivers`,
// so we need to check the old default dir for compatibility.
File file = new File(defaultDriverUrl + "/" + driverUrl);
if (file.exists()) {
return "file://" + defaultDriverUrl + "/" + driverUrl;
} else {
// use old one
return "file://" + defaultOldDriverUrl + "/" + driverUrl;
}
} else {
// Return user specified driver url directly.
return "file://" + Config.jdbc_drivers_dir + "/" + driverUrl;
}
}
public static String parseDbType(String url) throws DdlException {
if (url.startsWith(JDBC_MYSQL) || url.startsWith(JDBC_MARIADB)) {
return MYSQL;
} else if (url.startsWith(JDBC_POSTGRESQL)) {
return POSTGRESQL;
} else if (url.startsWith(JDBC_ORACLE)) {
return ORACLE;
} else if (url.startsWith(JDBC_SQLSERVER)) {
return SQLSERVER;
} else if (url.startsWith(JDBC_CLICKHOUSE)) {
return CLICKHOUSE;
} else if (url.startsWith(JDBC_SAP_HANA)) {
return SAP_HANA;
} else if (url.startsWith(JDBC_TRINO)) {
return TRINO;
} else if (url.startsWith(JDBC_PRESTO)) {
return PRESTO;
} else if (url.startsWith(JDBC_OCEANBASE)) {
return OCEANBASE;
} else if (url.startsWith(JDBC_DB2)) {
return DB2;
} else if (url.startsWith(JDBC_GBASE)) {
return GBASE;
}
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
}
public static String handleJdbcUrl(String jdbcUrl) throws DdlException {
// delete all space in jdbcUrl
String newJdbcUrl = jdbcUrl.replaceAll(" ", "");
String dbType = parseDbType(newJdbcUrl);
if (dbType.equals(MYSQL) || dbType.equals(OCEANBASE)) {
// `yearIsDateType` is a parameter of JDBC, and the default is true.
// We force the use of `yearIsDateType=false`
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "yearIsDateType", "true", "false");
// MySQL Types and Return Values for GetColumnTypeName and GetColumnClassName
// are presented in https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html
// When mysql's tinyint stores non-0 or 1, we need to read the data correctly,
// so we need tinyInt1isBit=false
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "tinyInt1isBit", "true", "false");
// set useUnicode and characterEncoding to false and utf-8
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "useUnicode", "false", "true");
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "rewriteBatchedStatements", "false", "true");
newJdbcUrl = checkAndSetJdbcParam(dbType, newJdbcUrl, "characterEncoding", "utf-8");
if (dbType.equals(OCEANBASE)) {
// set useCursorFetch to true
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "useCursorFetch", "false", "true");
}
}
if (dbType.equals(POSTGRESQL)) {
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "reWriteBatchedInserts", "false", "true");
}
if (dbType.equals(SQLSERVER)) {
if (Config.force_sqlserver_jdbc_encrypt_false) {
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "encrypt", "true", "false");
}
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "useBulkCopyForBatchInsert", "false", "true");
}
return newJdbcUrl;
}
/**
* Check jdbcUrl param, if the param is not set, set it to the expected value.
* If the param is set to an unexpected value, replace it with the expected value.
* If the param is set to the expected value, do nothing.
*
* @param jdbcUrl
* @param params
* @param unexpectedVal
* @param expectedVal
* @return
*/
private static String checkAndSetJdbcBoolParam(String dbType, String jdbcUrl, String params, String unexpectedVal,
String expectedVal) {
String delimiter = getDelimiter(jdbcUrl, dbType);
String unexpectedParams = params + "=" + unexpectedVal;
String expectedParams = params + "=" + expectedVal;
if (jdbcUrl.contains(expectedParams)) {
return jdbcUrl;
} else if (jdbcUrl.contains(unexpectedParams)) {
jdbcUrl = jdbcUrl.replaceAll(unexpectedParams, expectedParams);
} else {
if (!jdbcUrl.endsWith(delimiter)) {
jdbcUrl += delimiter;
}
jdbcUrl += expectedParams;
}
return jdbcUrl;
}
/**
* Check jdbcUrl param, if the param is set, do thing.
* If the param is not set, set it to expected value.
*
* @param jdbcUrl
* @param params
* @return
*/
private static String checkAndSetJdbcParam(String dbType, String jdbcUrl, String params, String expectedVal) {
String delimiter = getDelimiter(jdbcUrl, dbType);
String expectedParams = params + "=" + expectedVal;
if (jdbcUrl.contains(expectedParams)) {
return jdbcUrl;
} else {
if (!jdbcUrl.endsWith(delimiter)) {
jdbcUrl += delimiter;
}
jdbcUrl += expectedParams;
}
return jdbcUrl;
}
private static String getDelimiter(String jdbcUrl, String dbType) {
if (dbType.equals(SQLSERVER) || dbType.equals(DB2)) {
return ";";
} else if (jdbcUrl.contains("?")) {
return "&";
} else {
return "?";
}
}
public static String getDefaultPropertyValue(String propertyName) {
return OPTIONAL_PROPERTIES_DEFAULT_VALUE.getOrDefault(propertyName, "");
}
public static void validateProperties(Map<String, String> properties) throws DdlException {
for (String key : properties.keySet()) {
if (!ALL_PROPERTIES.contains(key)) {
throw new DdlException("JDBC resource Property of " + key + " is unknown");
}
}
}
public static void checkBooleanProperty(String propertyName, String propertyValue) throws DdlException {
if (!propertyValue.equalsIgnoreCase("true") && !propertyValue.equalsIgnoreCase("false")) {
throw new DdlException(propertyName + " must be true or false");
}
}
public static void checkDatabaseListProperties(String onlySpecifiedDatabase,
Map<String, Boolean> includeDatabaseList, Map<String, Boolean> excludeDatabaseList) throws DdlException {
if (!onlySpecifiedDatabase.equalsIgnoreCase("true")) {
if ((includeDatabaseList != null && !includeDatabaseList.isEmpty()) || (excludeDatabaseList != null
&& !excludeDatabaseList.isEmpty())) {
throw new DdlException(
"include_database_list and exclude_database_list "
+ "cannot be set when only_specified_database is false");
}
}
}
public static void checkConnectionPoolProperties(int minSize, int maxSize, int maxWaitTime, int maxLifeTime)
throws DdlException {
if (minSize < 0) {
throw new DdlException("connection_pool_min_size must be greater than or equal to 0");
}
if (maxSize < 1) {
throw new DdlException("connection_pool_max_size must be greater than or equal to 1");
}
if (maxSize < minSize) {
throw new DdlException(
"connection_pool_max_size must be greater than or equal to connection_pool_min_size");
}
if (maxWaitTime < 0) {
throw new DdlException("connection_pool_max_wait_time must be greater than or equal to 0");
}
if (maxWaitTime > 30000) {
throw new DdlException("connection_pool_max_wait_time must be less than or equal to 30000");
}
if (maxLifeTime < 150000) {
throw new DdlException("connection_pool_max_life_time must be greater than or equal to 150000");
}
}
}