ConnectorPluginManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.connector;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.spi.ConnectorContext;
import org.apache.doris.connector.spi.ConnectorProvider;
import org.apache.doris.extension.loader.ClassLoadingPolicy;
import org.apache.doris.extension.loader.DirectoryPluginRuntimeManager;
import org.apache.doris.extension.loader.LoadFailure;
import org.apache.doris.extension.loader.LoadReport;
import org.apache.doris.extension.loader.PluginHandle;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Manages lifecycle of ConnectorProvider plugins.
*
* <p>Discovery order:
* 1. ServiceLoader scan (classpath-based built-ins / test overrides)
* 2. DirectoryPluginRuntimeManager scan (production plugin directories)
*
* <p>The first provider that returns {@code supports(catalogType, props) == true} is used.
* Classpath providers have higher priority than directory-loaded providers.
*
* <p>Unlike {@link org.apache.doris.fs.FileSystemPluginManager}, this class returns
* {@code null} from {@link #createConnector} when no provider matches, allowing
* fe-core to gracefully fall back to the existing hardcoded CatalogFactory logic
* during the migration period.
*/
public class ConnectorPluginManager {
private static final Logger LOG = LogManager.getLogger(ConnectorPluginManager.class);
/** The API version that this FE build supports. Increment on breaking SPI changes. */
static final int CURRENT_API_VERSION = 1;
// Connector SPI and filesystem SPI classes must be parent-first so that all
// instances of shared interfaces/classes are loaded by a single ClassLoader.
private static final List<String> CONNECTOR_PARENT_FIRST_PREFIXES =
Arrays.asList("org.apache.doris.connector.", "org.apache.doris.filesystem.");
private final List<ConnectorProvider> providers = new CopyOnWriteArrayList<>();
private final DirectoryPluginRuntimeManager<ConnectorProvider> runtimeManager =
new DirectoryPluginRuntimeManager<>();
private final ClassLoadingPolicy classLoadingPolicy =
new ClassLoadingPolicy(CONNECTOR_PARENT_FIRST_PREFIXES);
/** Called at FE startup to load built-in providers from classpath. */
public void loadBuiltins() {
ServiceLoader.load(ConnectorProvider.class)
.forEach(p -> {
providers.add(p);
LOG.info("Registered built-in connector provider: {}", p.getType());
});
}
/**
* Loads connector provider plugins from plugin root directories.
* Failures are logged as warnings; partial success is allowed.
*
* @param pluginRoots directories to scan for connector plugin subdirectories
*/
public void loadPlugins(List<Path> pluginRoots) {
LoadReport<ConnectorProvider> report = runtimeManager.loadAll(
pluginRoots,
ConnectorPluginManager.class.getClassLoader(),
ConnectorProvider.class,
classLoadingPolicy);
LOG.info("Connector plugin load summary: rootsScanned={}, dirsScanned={}, "
+ "successCount={}, failureCount={}",
report.getRootsScanned(), report.getDirsScanned(),
report.getSuccesses().size(), report.getFailures().size());
for (LoadFailure failure : report.getFailures()) {
LOG.warn("Connector plugin load failure: dir={}, stage={}, message={}, cause={}",
failure.getPluginDir(), failure.getStage(), failure.getMessage(),
failure.getCause());
}
for (PluginHandle<ConnectorProvider> handle : report.getSuccesses()) {
providers.add(handle.getFactory());
LOG.info("Loaded connector plugin: name={}, pluginDir={}, jarCount={}",
handle.getPluginName(), handle.getPluginDir(),
handle.getResolvedJars().size());
}
}
/**
* Creates a Connector for the given catalog type by selecting the first supporting provider.
*
* <p>Returns {@code null} if no provider supports the given catalog type.
* This allows fe-core to gracefully fall back to the existing hardcoded CatalogFactory
* switch-case during the migration period.
*
* @param catalogType the catalog type (e.g. "hive", "iceberg", "es")
* @param properties catalog configuration properties
* @param context runtime context provided by fe-core
* @return a ready-to-use Connector, or {@code null} if no provider matches
*/
public Connector createConnector(
String catalogType, Map<String, String> properties, ConnectorContext context) {
for (ConnectorProvider provider : providers) {
if (provider.supports(catalogType, properties)) {
int providerVersion = provider.apiVersion();
if (providerVersion != CURRENT_API_VERSION) {
LOG.warn("Skipping connector provider '{}': apiVersion={} (expected {})",
provider.getType(), providerVersion, CURRENT_API_VERSION);
continue;
}
LOG.info("Creating connector via provider '{}' for catalogType='{}'",
provider.getType(), catalogType);
return provider.create(properties, context);
}
}
LOG.debug("No ConnectorProvider supports catalogType='{}'. Registered: {}",
catalogType, providerNames());
return null;
}
/** Returns the type names of all registered providers. */
public List<String> getRegisteredTypes() {
List<String> types = new ArrayList<>();
for (ConnectorProvider p : providers) {
types.add(p.getType());
}
return types;
}
/**
* Validates catalog properties using the matching provider.
* Does nothing if no provider matches.
*
* @throws IllegalArgumentException if validation fails
*/
public void validateProperties(String catalogType, Map<String, String> properties) {
for (ConnectorProvider provider : providers) {
if (provider.supports(catalogType, properties)) {
if (provider.apiVersion() != CURRENT_API_VERSION) {
throw new IllegalArgumentException(
"Connector provider '" + provider.getType()
+ "' has incompatible API version " + provider.apiVersion()
+ " (expected " + CURRENT_API_VERSION + ")");
}
provider.validateProperties(properties);
return;
}
}
}
/** Registers a provider at highest priority (index 0). For testing overrides. */
public void registerProvider(ConnectorProvider provider) {
providers.add(0, provider);
}
private List<String> providerNames() {
List<String> names = new ArrayList<>();
for (ConnectorProvider p : providers) {
names.add(p.getType());
}
return names;
}
}