PaimonJdbcMetaStoreProperties.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.metastore;

import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.storage.HdfsProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.jdbc.JdbcCatalogFactory;
import org.apache.paimon.options.CatalogOptions;

import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties {
    private static final Logger LOG = LogManager.getLogger(PaimonJdbcMetaStoreProperties.class);
    private static final String JDBC_PREFIX = "jdbc.";
    private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>();

    @ConnectorProperty(
            names = {"uri", "paimon.jdbc.uri"},
            required = true,
            description = "JDBC connection URI for the Paimon JDBC catalog."
    )
    private String uri = "";

    @ConnectorProperty(
            names = {"paimon.jdbc.user", "jdbc.user"},
            required = false,
            description = "Username for the Paimon JDBC catalog."
    )
    private String jdbcUser;

    @ConnectorProperty(
            names = {"paimon.jdbc.password", "jdbc.password"},
            required = false,
            sensitive = true,
            description = "Password for the Paimon JDBC catalog."
    )
    private String jdbcPassword;

    @ConnectorProperty(
            names = {"paimon.jdbc.driver_url", "jdbc.driver_url"},
            required = false,
            description = "JDBC driver JAR file path or URL. "
                    + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) "
                    + "or a full URL (http://, https://, file://)."
    )
    private String driverUrl;

    @ConnectorProperty(
            names = {"paimon.jdbc.driver_class", "jdbc.driver_class"},
            required = false,
            description = "JDBC driver class name. If specified with paimon.jdbc.driver_url, "
                    + "the driver will be loaded dynamically."
    )
    private String driverClass;

    protected PaimonJdbcMetaStoreProperties(Map<String, String> props) {
        super(props);
    }

    @Override
    public String getPaimonCatalogType() {
        return PaimonExternalCatalog.PAIMON_JDBC;
    }

    @Override
    protected void checkRequiredProperties() {
        super.checkRequiredProperties();
        if (StringUtils.isBlank(warehouse)) {
            throw new IllegalArgumentException("Property warehouse is required.");
        }
    }

    @Override
    public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
        buildCatalogOptions();
        Configuration conf = new Configuration();
        for (StorageProperties storageProperties : storagePropertiesList) {
            if (storageProperties.getHadoopStorageConfig() != null) {
                conf.addResource(storageProperties.getHadoopStorageConfig());
            }
            if (storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
                this.executionAuthenticator = new HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
                        .getHadoopAuthenticator());
            }
        }
        appendUserHadoopConfig(conf);
        if (StringUtils.isNotBlank(driverUrl)) {
            registerJdbcDriver(driverUrl, driverClass);
            LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: {}", driverUrl);
        }
        CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf);
        try {
            return this.executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext));
        } catch (Exception e) {
            throw new RuntimeException("Failed to create Paimon catalog with JDBC metastore: " + e.getMessage(), e);
        }
    }

    @Override
    protected void appendCustomCatalogOptions() {
        catalogOptions.set(CatalogOptions.URI.key(), uri);
        addIfNotBlank("jdbc.user", jdbcUser);
        addIfNotBlank("jdbc.password", jdbcPassword);
        appendRawJdbcCatalogOptions();
    }

    @Override
    protected String getMetastoreType() {
        return JdbcCatalogFactory.IDENTIFIER;
    }

    private void addIfNotBlank(String key, String value) {
        if (StringUtils.isNotBlank(value)) {
            catalogOptions.set(key, value);
        }
    }

    private void appendRawJdbcCatalogOptions() {
        origProps.forEach((key, value) -> {
            if (key != null && key.startsWith(JDBC_PREFIX) && !catalogOptions.keySet().contains(key)) {
                catalogOptions.set(key, value);
            }
        });
    }

    /**
     * Register JDBC driver with DriverManager.
     * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader.
     */
    private void registerJdbcDriver(String driverUrl, String driverClassName) {
        try {
            String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
            URL url = new URL(fullDriverUrl);

            ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
                ClassLoader parent = getClass().getClassLoader();
                return URLClassLoader.newInstance(new URL[] {u}, parent);
            });

            if (StringUtils.isBlank(driverClassName)) {
                throw new IllegalArgumentException(
                        "jdbc.driver_class or paimon.jdbc.driver_class is required when jdbc.driver_url "
                                + "or paimon.jdbc.driver_url is specified");
            }

            Class<?> loadedDriverClass = Class.forName(driverClassName, true, classLoader);
            java.sql.Driver driver = (java.sql.Driver) loadedDriverClass.getDeclaredConstructor().newInstance();
            java.sql.DriverManager.registerDriver(new DriverShim(driver));
            LOG.info("Successfully registered JDBC driver for Paimon catalog: {} from {}",
                    driverClassName, fullDriverUrl);
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException("Invalid driver URL: " + driverUrl, e);
        } catch (ClassNotFoundException e) {
            throw new IllegalArgumentException("Failed to load JDBC driver class: " + driverClassName, e);
        } catch (IllegalArgumentException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException("Failed to register JDBC driver: " + driverClassName, e);
        }
    }

    private static class DriverShim implements java.sql.Driver {
        private final java.sql.Driver delegate;

        DriverShim(java.sql.Driver delegate) {
            this.delegate = delegate;
        }

        @Override
        public java.sql.Connection connect(String url, java.util.Properties info) throws java.sql.SQLException {
            return delegate.connect(url, info);
        }

        @Override
        public boolean acceptsURL(String url) throws java.sql.SQLException {
            return delegate.acceptsURL(url);
        }

        @Override
        public java.sql.DriverPropertyInfo[] getPropertyInfo(String url, java.util.Properties info)
                throws java.sql.SQLException {
            return delegate.getPropertyInfo(url, info);
        }

        @Override
        public int getMajorVersion() {
            return delegate.getMajorVersion();
        }

        @Override
        public int getMinorVersion() {
            return delegate.getMinorVersion();
        }

        @Override
        public boolean jdbcCompliant() {
            return delegate.jdbcCompliant();
        }

        @Override
        public java.util.logging.Logger getParentLogger() throws java.sql.SQLFeatureNotSupportedException {
            return delegate.getParentLogger();
        }
    }
}