IcebergJdbcMetaStoreProperties.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.metastore;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties {
private static final Logger LOG = LogManager.getLogger(IcebergJdbcMetaStoreProperties.class);
private static final String JDBC_PREFIX = "jdbc.";
private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>();
private Map<String, String> icebergJdbcCatalogProperties;
@ConnectorProperty(
names = {"uri", "iceberg.jdbc.uri"},
required = true,
description = "JDBC connection URI for the Iceberg JDBC catalog."
)
private String uri = "";
@ConnectorProperty(
names = {"jdbc.user"},
required = false,
description = "Username for the Iceberg JDBC catalog."
)
private String jdbcUser;
@ConnectorProperty(
names = {"jdbc.password"},
required = false,
sensitive = true,
description = "Password for the Iceberg JDBC catalog."
)
private String jdbcPassword;
@ConnectorProperty(
names = {"jdbc.init-catalog-tables"},
required = false,
description = "Whether to create catalog tables if they do not exist."
)
private String jdbcInitCatalogTables;
@ConnectorProperty(
names = {"jdbc.schema-version"},
required = false,
description = "Iceberg JDBC catalog schema version (V0/V1)."
)
private String jdbcSchemaVersion;
@ConnectorProperty(
names = {"jdbc.strict-mode"},
required = false,
description = "Whether to enforce strict JDBC catalog schema checks."
)
private String jdbcStrictMode;
@ConnectorProperty(
names = {"driver_url"},
required = false,
description = "JDBC driver JAR file path or URL. "
+ "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) "
+ "or a full URL (http://, https://, file://)."
)
private String driverUrl;
@ConnectorProperty(
names = {"driver_class"},
required = false,
description = "JDBC driver class name. If not specified, will be auto-detected from the JDBC URI."
)
private String driverClass;
public IcebergJdbcMetaStoreProperties(Map<String, String> props) {
super(props);
}
@Override
public String getIcebergCatalogType() {
return IcebergExternalCatalog.ICEBERG_JDBC;
}
@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
initIcebergJdbcCatalogProperties();
}
@Override
protected void checkRequiredProperties() {
super.checkRequiredProperties();
if (StringUtils.isBlank(warehouse)) {
throw new IllegalArgumentException("Property warehouse is required.");
}
}
@Override
public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
List<StorageProperties> storagePropertiesList) {
Map<String, String> fileIOProperties = Maps.newHashMap();
Configuration conf = new Configuration();
toFileIOProperties(storagePropertiesList, fileIOProperties, conf);
Map<String, String> options = Maps.newHashMap(getIcebergJdbcCatalogProperties());
options.putAll(fileIOProperties);
// Support dynamic JDBC driver loading
// We need to register the driver with DriverManager because Iceberg uses DriverManager.getConnection()
// which doesn't respect Thread.contextClassLoader
if (StringUtils.isNotBlank(driverUrl)) {
registerJdbcDriver(driverUrl, driverClass);
LOG.info("Using dynamic JDBC driver from: {}", driverUrl);
}
return CatalogUtil.buildIcebergCatalog(catalogName, options, conf);
}
/**
* Register JDBC driver with DriverManager.
* This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader,
* it uses the caller's ClassLoader. By registering the driver, DriverManager can find it.
*
* @param driverUrl Path or URL to the JDBC driver JAR
* @param driverClassName Driver class name to register
*/
private void registerJdbcDriver(String driverUrl, String driverClassName) {
try {
String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
URL url = new URL(fullDriverUrl);
ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
ClassLoader parent = getClass().getClassLoader();
return URLClassLoader.newInstance(new URL[]{u}, parent);
});
if (StringUtils.isBlank(driverClassName)) {
throw new IllegalArgumentException("driver_class is required when driver_url is specified");
}
// Load the driver class and register it with DriverManager
Class<?> driverClass = Class.forName(driverClassName, true, classLoader);
java.sql.Driver driver = (java.sql.Driver) driverClass.getDeclaredConstructor().newInstance();
// Wrap with a shim driver because DriverManager refuses to use a driver not loaded by system classloader
java.sql.DriverManager.registerDriver(new DriverShim(driver));
LOG.info("Successfully registered JDBC driver: {} from {}", driverClassName, fullDriverUrl);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid driver URL: " + driverUrl, e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Failed to load JDBC driver class: " + driverClassName, e);
} catch (Exception e) {
throw new RuntimeException("Failed to register JDBC driver: " + driverClassName, e);
}
}
/**
* A shim driver that wraps the actual driver loaded from a custom ClassLoader.
* This is needed because DriverManager refuses to use a driver that wasn't loaded by the system classloader.
*/
private static class DriverShim implements java.sql.Driver {
private final java.sql.Driver delegate;
DriverShim(java.sql.Driver delegate) {
this.delegate = delegate;
}
@Override
public java.sql.Connection connect(String url, java.util.Properties info) throws java.sql.SQLException {
return delegate.connect(url, info);
}
@Override
public boolean acceptsURL(String url) throws java.sql.SQLException {
return delegate.acceptsURL(url);
}
@Override
public java.sql.DriverPropertyInfo[] getPropertyInfo(String url, java.util.Properties info)
throws java.sql.SQLException {
return delegate.getPropertyInfo(url, info);
}
@Override
public int getMajorVersion() {
return delegate.getMajorVersion();
}
@Override
public int getMinorVersion() {
return delegate.getMinorVersion();
}
@Override
public boolean jdbcCompliant() {
return delegate.jdbcCompliant();
}
@Override
public java.util.logging.Logger getParentLogger() throws java.sql.SQLFeatureNotSupportedException {
return delegate.getParentLogger();
}
}
public Map<String, String> getIcebergJdbcCatalogProperties() {
return Collections.unmodifiableMap(icebergJdbcCatalogProperties);
}
private void initIcebergJdbcCatalogProperties() {
icebergJdbcCatalogProperties = new HashMap<>();
icebergJdbcCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC);
icebergJdbcCatalogProperties.put(CatalogProperties.URI, uri);
if (StringUtils.isNotBlank(warehouse)) {
icebergJdbcCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
}
addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.user", jdbcUser);
addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.password", jdbcPassword);
addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.init-catalog-tables", jdbcInitCatalogTables);
addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.schema-version", jdbcSchemaVersion);
addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.strict-mode", jdbcStrictMode);
if (origProps != null) {
for (Map.Entry<String, String> entry : origProps.entrySet()) {
String key = entry.getKey();
if (key != null && key.startsWith(JDBC_PREFIX)
&& !icebergJdbcCatalogProperties.containsKey(key)) {
icebergJdbcCatalogProperties.put(key, entry.getValue());
}
}
}
}
private static void addIfNotBlank(Map<String, String> props, String key, String value) {
if (StringUtils.isNotBlank(value)) {
props.put(key, value);
}
}
private static void toFileIOProperties(List<StorageProperties> storagePropertiesList,
Map<String, String> fileIOProperties, Configuration conf) {
for (StorageProperties storageProperties : storagePropertiesList) {
if (storageProperties instanceof AbstractS3CompatibleProperties) {
toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties);
}
if (storageProperties.getHadoopStorageConfig() != null) {
conf.addResource(storageProperties.getHadoopStorageConfig());
}
}
}
private static void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties,
Map<String, String> options) {
if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
options.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint());
}
if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
options.put(S3FileIOProperties.PATH_STYLE_ACCESS, s3Properties.getUsePathStyle());
}
if (StringUtils.isNotBlank(s3Properties.getRegion())) {
options.put(AwsClientProperties.CLIENT_REGION, s3Properties.getRegion());
}
if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
options.put(S3FileIOProperties.ACCESS_KEY_ID, s3Properties.getAccessKey());
}
if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
options.put(S3FileIOProperties.SECRET_ACCESS_KEY, s3Properties.getSecretKey());
}
if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
options.put(S3FileIOProperties.SESSION_TOKEN, s3Properties.getSessionToken());
}
}
}