CatalogPluginLoader.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.spi;

import org.apache.doris.DorisFE;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.io.FilenameFilter;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Loads catalog plugins from the {@code connectors/} directory under Doris FE home.
 *
 * <p>Each subdirectory under {@code connectors/} represents one catalog plugin.
 * All JAR files in that subdirectory are loaded with an isolated {@link URLClassLoader},
 * providing ClassLoader-level isolation between different catalog plugins to prevent
 * dependency conflicts.</p>
 *
 * <p>This loader must be invoked <b>before</b> {@code Env.loadImage()} to ensure
 * all catalog types are registered before EditLog replay.</p>
 *
 * <h3>Directory Layout</h3>
 * <pre>
 * output/fe/
 * ├── lib/                    # fe-core JARs
 * └── lib/
 *     ├── doris-fe.jar           # fe-core
 *     └── connectors/            # Connector plugins
 *         ├── es/
 *         │   └── doris-connector-es.jar
 *         ├── iceberg/
 *         │   └── doris-connector-iceberg.jar
 *         └── ...
 * </pre>
 */
public class CatalogPluginLoader {
    private static final Logger LOG = LogManager.getLogger(CatalogPluginLoader.class);

    private static final String CONNECTORS_DIR = "lib/connectors";

    /** ClassLoaders for each loaded plugin, keyed by catalog type */
    private static final Map<String, ClassLoader> PLUGIN_CLASSLOADERS = new ConcurrentHashMap<>();

    /**
     * Scan the catalogs/ directory and load all catalog plugins.
     * Each subdirectory is treated as a separate plugin with its own ClassLoader.
     */
    public static void loadPlugins() {
        String dorisHome = DorisFE.DORIS_HOME_DIR;
        if (dorisHome == null || dorisHome.isEmpty()) {
            LOG.warn("DORIS_HOME is not set. Skipping catalog plugin loading.");
            return;
        }

        File pluginDir = new File(dorisHome, CONNECTORS_DIR);
        if (!pluginDir.exists() || !pluginDir.isDirectory()) {
            LOG.info("No catalog plugin directory found at {}. "
                    + "External catalog plugins will not be loaded.", pluginDir.getAbsolutePath());
            return;
        }

        LOG.info("Loading catalog plugins from: {}", pluginDir.getAbsolutePath());

        File[] pluginDirs = pluginDir.listFiles(File::isDirectory);
        if (pluginDirs == null || pluginDirs.length == 0) {
            LOG.info("No catalog plugins found in {}", pluginDir.getAbsolutePath());
            return;
        }

        for (File dir : pluginDirs) {
            try {
                loadPlugin(dir);
            } catch (Exception e) {
                LOG.error("Failed to load catalog plugin from {}: {}", dir.getName(), e.getMessage(), e);
            }
        }

        LOG.info("Catalog plugin loading complete. Registered types: {}",
                CatalogProviderRegistry.getAllProviders().keySet());
    }

    private static void loadPlugin(File pluginDir) throws Exception {
        URL[] jarUrls = findJars(pluginDir);
        if (jarUrls.length == 0) {
            LOG.warn("No JAR files found in plugin directory: {}", pluginDir.getName());
            return;
        }

        LOG.info("Loading catalog plugin from '{}' with {} JAR(s)", pluginDir.getName(), jarUrls.length);

        // Create isolated ClassLoader with fe-core as parent
        URLClassLoader pluginClassLoader = new URLClassLoader(
                jarUrls, CatalogPluginLoader.class.getClassLoader());

        ServiceLoader<CatalogProvider> providers = ServiceLoader.load(
                CatalogProvider.class, pluginClassLoader);

        int count = 0;
        for (CatalogProvider provider : providers) {
            CatalogProviderRegistry.register(provider);
            PLUGIN_CLASSLOADERS.put(provider.getType(), pluginClassLoader);
            count++;
        }

        if (count == 0) {
            LOG.warn("Plugin directory '{}' contains JAR files but no CatalogProvider implementation. "
                    + "Ensure META-INF/services/{} is correctly configured.",
                    pluginDir.getName(), CatalogProvider.class.getName());
            pluginClassLoader.close();
        }
    }

    private static URL[] findJars(File dir) throws Exception {
        FilenameFilter jarFilter = (d, name) -> name.endsWith(".jar");
        File[] jarFiles = dir.listFiles(jarFilter);
        if (jarFiles == null) {
            return new URL[0];
        }

        List<URL> urls = new ArrayList<>(jarFiles.length);
        for (File jar : jarFiles) {
            urls.add(jar.toURI().toURL());
        }
        return urls.toArray(new URL[0]);
    }

    /**
     * Get the ClassLoader for a specific catalog type.
     *
     * @param type catalog type
     * @return the plugin ClassLoader, or null if not loaded via plugin
     */
    public static ClassLoader getPluginClassLoader(String type) {
        return PLUGIN_CLASSLOADERS.get(type);
    }
}