JdbcExternalCatalog.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.jdbc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
import org.apache.doris.datasource.mapping.IdentifierMapping;
import org.apache.doris.datasource.mapping.JdbcIdentifierMapping;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Getter
public class JdbcExternalCatalog extends ExternalCatalog {
private static final Logger LOG = LogManager.getLogger(JdbcExternalCatalog.class);
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
JdbcResource.JDBC_URL,
JdbcResource.DRIVER_URL,
JdbcResource.DRIVER_CLASS
);
// Must add "transient" for Gson to ignore this field,
// or Gson will throw exception with HikariCP
private transient JdbcClient jdbcClient;
private IdentifierMapping identifierMapping;
public JdbcExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment)
throws DdlException {
super(catalogId, name, InitCatalogLog.Type.JDBC, comment);
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
this.identifierMapping = new JdbcIdentifierMapping(
(Env.isTableNamesCaseInsensitive() || Env.isStoredTableNamesLowerCase()),
Boolean.parseBoolean(getLowerCaseMetaNames()),
getMetaNamesMapping());
}
@Override
public void checkProperties() throws DdlException {
super.checkProperties();
for (String requiredProperty : REQUIRED_PROPERTIES) {
if (!catalogProperty.getProperties().containsKey(requiredProperty)) {
throw new DdlException("Required property '" + requiredProperty + "' is missing");
}
}
JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, getOnlySpecifiedDatabase());
JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_META_NAMES, getLowerCaseMetaNames());
JdbcResource.checkBooleanProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
String.valueOf(isConnectionPoolKeepAlive()));
JdbcResource.checkBooleanProperty(JdbcResource.TEST_CONNECTION, String.valueOf(isTestConnection()));
JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), getIncludeDatabaseMap(),
getExcludeDatabaseMap());
JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), getConnectionPoolMaxSize(),
getConnectionPoolMaxWaitTime(), getConnectionPoolMaxLifeTime());
}
@Override
public void setDefaultPropsIfMissing(boolean isReplay) {
super.setDefaultPropsIfMissing(isReplay);
// Modify lower_case_table_names to lower_case_meta_names if it exists
if (catalogProperty.getProperties().containsKey("lower_case_table_names") && isReplay) {
String lowerCaseTableNamesValue = catalogProperty.getProperties().get("lower_case_table_names");
catalogProperty.addProperty("lower_case_meta_names", lowerCaseTableNamesValue);
catalogProperty.deleteProperty("lower_case_table_names");
LOG.info("Modify lower_case_table_names to lower_case_meta_names, value: {}", lowerCaseTableNamesValue);
} else if (catalogProperty.getProperties().containsKey("lower_case_table_names") && !isReplay) {
throw new IllegalArgumentException("Jdbc catalog property lower_case_table_names is not supported,"
+ " please use lower_case_meta_names instead.");
}
}
@Override
public void resetToUninitialized(boolean invalidCache) {
super.resetToUninitialized(invalidCache);
this.identifierMapping = new JdbcIdentifierMapping(
(Env.isTableNamesCaseInsensitive() || Env.isStoredTableNamesLowerCase()),
Boolean.parseBoolean(getLowerCaseMetaNames()),
getMetaNamesMapping());
}
@Override
public void onClose() {
super.onClose();
if (jdbcClient != null) {
jdbcClient.closeClient();
jdbcClient = null;
}
}
protected Map<String, String> processCompatibleProperties(Map<String, String> props)
throws DdlException {
Map<String, String> properties = Maps.newHashMap();
for (Map.Entry<String, String> kv : props.entrySet()) {
properties.put(StringUtils.removeStart(kv.getKey(), JdbcResource.JDBC_PROPERTIES_PREFIX), kv.getValue());
}
String jdbcUrl = properties.getOrDefault(JdbcResource.JDBC_URL, "");
if (!Strings.isNullOrEmpty(jdbcUrl)) {
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl);
properties.put(JdbcResource.JDBC_URL, jdbcUrl);
}
return properties;
}
public String getDatabaseTypeName() {
return jdbcClient.getDbType();
}
public String getJdbcUser() {
return catalogProperty.getOrDefault(JdbcResource.USER, "");
}
public String getJdbcPasswd() {
return catalogProperty.getOrDefault(JdbcResource.PASSWORD, "");
}
public String getJdbcUrl() {
return catalogProperty.getOrDefault(JdbcResource.JDBC_URL, "");
}
public String getDriverUrl() {
return catalogProperty.getOrDefault(JdbcResource.DRIVER_URL, "");
}
public String getDriverClass() {
return catalogProperty.getOrDefault(JdbcResource.DRIVER_CLASS, "");
}
public String getCheckSum() {
return catalogProperty.getOrDefault(JdbcResource.CHECK_SUM, "");
}
public String getOnlySpecifiedDatabase() {
return catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, JdbcResource.getDefaultPropertyValue(
JdbcResource.ONLY_SPECIFIED_DATABASE));
}
public int getConnectionPoolMinSize() {
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE, JdbcResource
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE)));
}
public int getConnectionPoolMaxSize() {
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE, JdbcResource
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_SIZE)));
}
public int getConnectionPoolMaxWaitTime() {
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME, JdbcResource
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME)));
}
public int getConnectionPoolMaxLifeTime() {
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME, JdbcResource
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME)));
}
public boolean isConnectionPoolKeepAlive() {
return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE, JdbcResource
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE)));
}
public boolean isTestConnection() {
return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.TEST_CONNECTION, JdbcResource
.getDefaultPropertyValue(JdbcResource.TEST_CONNECTION)));
}
@Override
protected void initLocalObjectsImpl() {
jdbcClient = createJdbcClient();
}
private JdbcClient createJdbcClient() {
JdbcClientConfig jdbcClientConfig = new JdbcClientConfig()
.setCatalog(this.name)
.setUser(getJdbcUser())
.setPassword(getJdbcPasswd())
.setJdbcUrl(getJdbcUrl())
.setDriverUrl(getDriverUrl())
.setDriverClass(getDriverClass())
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
.setExcludeDatabaseMap(getExcludeDatabaseMap())
.setConnectionPoolMinSize(getConnectionPoolMinSize())
.setConnectionPoolMaxSize(getConnectionPoolMaxSize())
.setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime())
.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime())
.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
return JdbcClient.createJdbcClient(jdbcClientConfig);
}
@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
if (this.identifierMapping == null) {
identifierMapping = new JdbcIdentifierMapping(
(Env.isTableNamesCaseInsensitive() || Env.isStoredTableNamesLowerCase()),
Boolean.parseBoolean(getLowerCaseMetaNames()),
getMetaNamesMapping());
}
}
@Override
public List<String> listDatabaseNames() {
return jdbcClient.getDatabaseNameList();
}
@Override
public String fromRemoteDatabaseName(String remoteDatabaseName) {
return identifierMapping.fromRemoteDatabaseName(remoteDatabaseName);
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
return jdbcClient.getTablesNameList(dbName);
}
@Override
public String fromRemoteTableName(String remoteDatabaseName, String remoteTableName) {
return identifierMapping.fromRemoteTableName(remoteDatabaseName, remoteTableName);
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
ExternalDatabase<?> database = this.getDbNullable(dbName);
if (database == null) {
return false;
}
ExternalTable tbl = database.getTableNullable(tblName);
if (tbl == null) {
return false;
}
String remoteDbName = ((ExternalDatabase<?>) tbl.getDatabase()).getRemoteName();
String remoteTblName = tbl.getRemoteName();
return jdbcClient.isTableExist(remoteDbName, remoteTblName);
}
public List<Column> listColumns(String remoteDbName, String remoteTblName) {
makeSureInitialized();
return jdbcClient.getColumnsFromJdbc(remoteDbName, remoteTblName);
}
@Override
public void checkWhenCreating() throws DdlException {
super.checkWhenCreating();
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(JdbcResource.DRIVER_URL)) {
String computedChecksum = JdbcResource.computeObjectChecksum(properties.get(JdbcResource.DRIVER_URL));
if (properties.containsKey(JdbcResource.CHECK_SUM)) {
String providedChecksum = properties.get(JdbcResource.CHECK_SUM);
if (!providedChecksum.equals(computedChecksum)) {
throw new DdlException(
"The provided checksum (" + providedChecksum
+ ") does not match the computed checksum (" + computedChecksum
+ ") for the driver_url."
);
}
} else {
catalogProperty.addProperty(JdbcResource.CHECK_SUM, computedChecksum);
}
}
testJdbcConnection();
}
/**
* Execute stmt direct via jdbc
*
* @param stmt, the raw stmt string
*/
public void executeStmt(String stmt) {
makeSureInitialized();
jdbcClient.executeStmt(stmt);
}
/**
* Get columns from query
*
* @param query, the query string
* @return the columns
*/
public List<Column> getColumnsFromQuery(String query) {
makeSureInitialized();
return jdbcClient.getColumnsFromQuery(query);
}
public void configureJdbcTable(JdbcTable jdbcTable, String tableName) {
makeSureInitialized();
setCommonJdbcTableProperties(jdbcTable, tableName, this.jdbcClient);
}
private void setCommonJdbcTableProperties(JdbcTable jdbcTable, String tableName, JdbcClient jdbcClient) {
jdbcTable.setCatalogId(this.getId());
jdbcTable.setExternalTableName(tableName);
jdbcTable.setJdbcTypeName(jdbcClient.getDbType());
jdbcTable.setJdbcUrl(this.getJdbcUrl());
jdbcTable.setJdbcUser(this.getJdbcUser());
jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
jdbcTable.setDriverClass(this.getDriverClass());
jdbcTable.setDriverUrl(this.getDriverUrl());
jdbcTable.setCheckSum(this.getCheckSum());
jdbcTable.setResourceName(this.getResource());
jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
}
private void testJdbcConnection() throws DdlException {
if (FeConstants.runningUnitTest) {
// skip test connection in unit test
return;
}
if (isTestConnection()) {
JdbcClient testClient = null;
try {
testClient = createJdbcClient();
testFeToJdbcConnection(testClient);
testBeToJdbcConnection(testClient);
} finally {
if (testClient != null) {
testClient.closeClient();
testClient = null;
}
}
}
}
private void testFeToJdbcConnection(JdbcClient testClient) throws DdlException {
try {
testClient.testConnection();
} catch (JdbcClientException e) {
String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage();
LOG.warn(errorMessage, e);
throw new DdlException(errorMessage, e);
}
}
private void testBeToJdbcConnection(JdbcClient testClient) throws DdlException {
Backend aliveBe = null;
try {
for (Backend be : Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values()) {
if (be.isAlive()) {
aliveBe = be;
}
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
if (aliveBe == null) {
throw new DdlException("Test BE Connection to JDBC Failed: No Alive backends");
}
TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort());
try {
JdbcTable testTable = getTestConnectionJdbcTable(testClient);
PJdbcTestConnectionRequest request = InternalService.PJdbcTestConnectionRequest.newBuilder()
.setJdbcTable(ByteString.copyFrom(new TSerializer().serialize(testTable.toThrift())))
.setJdbcTableType(testTable.getJdbcTableType().getValue())
.setQueryStr(testClient.getTestQuery()).build();
InternalService.PJdbcTestConnectionResult result = null;
Future<PJdbcTestConnectionResult> future = BackendServiceProxy.getInstance()
.testJdbcConnection(address, request);
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new DdlException("Test BE Connection to JDBC Failed: " + result.getStatus().getErrorMsgs(0));
}
} catch (TException | RpcException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private JdbcTable getTestConnectionJdbcTable(JdbcClient testClient) throws DdlException {
JdbcTable testTable = new JdbcTable(0, "test_jdbc_connection", Lists.newArrayList(),
TableType.JDBC_EXTERNAL_TABLE);
setCommonJdbcTableProperties(testTable, "test_jdbc_connection", testClient);
// Special checksum computation
testTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl()));
return testTable;
}
}