FileSystemFactory.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.datasource.property.storage.BrokerProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.filesystem.FileSystemProvider;
import org.apache.doris.service.FrontendOptions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
/**
* Factory for filesystem instances.
*
* <p>Call {@link #initPluginManager(FileSystemPluginManager)} at FE startup before any
* {@code getFileSystem()} call. In production, providers are loaded from the plugin directory
* configured via {@code filesystem_plugin_root}. In unit tests, providers are discovered from
* the test classpath via ServiceLoader.
*/
public final class FileSystemFactory {
private static final Logger LOG = LogManager.getLogger(FileSystemFactory.class);
// Plugin manager singleton, set at FE startup
private static volatile FileSystemPluginManager pluginManager;
// Fallback provider cache for non-initialized environments (tests, migration phase)
private static volatile List<FileSystemProvider> cachedProviders = null;
private FileSystemFactory() {}
// =========================================================
// SPI API — returns spi.FileSystem
// =========================================================
/**
* Sets the plugin manager singleton. Called once at FE startup before any
* {@code getFileSystem()} invocation.
*/
public static void initPluginManager(FileSystemPluginManager manager) {
pluginManager = manager;
}
/**
* SPI entry point: selects a provider and creates the filesystem.
*
* <p>If {@link #initPluginManager} has been called (production path),
* delegates to {@link FileSystemPluginManager#createFileSystem}.
* Otherwise falls back to ServiceLoader discovery (unit-test / migration path).
*
* @param properties key-value storage configuration
* @return initialized {@code org.apache.doris.filesystem.FileSystem}
* @throws IOException if no provider matches or creation fails
*/
public static org.apache.doris.filesystem.FileSystem getFileSystem(Map<String, String> properties)
throws IOException {
FileSystemPluginManager mgr = pluginManager;
if (mgr != null) {
return mgr.createFileSystem(properties);
}
// Fallback: ServiceLoader discovery (unit-test / migration path)
List<FileSystemProvider> providers = getProviders();
List<String> tried = new ArrayList<>();
for (FileSystemProvider provider : providers) {
if (provider.supports(properties)) {
LOG.debug("FileSystemFactory: selected SPI provider '{}' for keys={}",
provider.name(), properties.keySet());
return provider.create(properties);
}
tried.add(provider.name());
}
throw new IOException(String.format(
"No FileSystemProvider found for properties %s. Tried: %s. "
+ "Ensure the corresponding fe-filesystem-xxx jar is on the classpath.",
properties.keySet(), tried));
}
/**
* SPI entry point accepting legacy {@link StorageProperties}.
* Converts via {@link StoragePropertiesConverter} then delegates to
* {@link #getFileSystem(Map)}.
*/
public static org.apache.doris.filesystem.FileSystem getFileSystem(StorageProperties storageProperties)
throws IOException {
return getFileSystem(StoragePropertiesConverter.toMap(storageProperties));
}
/**
* Returns all discovered SPI providers via ServiceLoader. Uses cached list after first load.
* Used only in the fallback path when pluginManager is not initialized.
* Package-private for testing.
*/
static List<FileSystemProvider> getProviders() {
if (cachedProviders == null) {
synchronized (FileSystemFactory.class) {
if (cachedProviders == null) {
List<FileSystemProvider> providers = new ArrayList<>();
ServiceLoader<FileSystemProvider> loader = ServiceLoader.load(
FileSystemProvider.class,
Thread.currentThread().getContextClassLoader());
loader.forEach(providers::add);
LOG.info("FileSystemFactory: loaded {} SPI provider(s): {}",
providers.size(),
providers.stream().map(FileSystemProvider::name)
.collect(java.util.stream.Collectors.joining(", ")));
cachedProviders = providers;
}
}
}
return cachedProviders;
}
/**
* Clears the SPI provider cache and plugin manager. For testing only.
*/
static void clearProviderCache() {
cachedProviders = null;
pluginManager = null;
}
/**
* Creates a {@link org.apache.doris.filesystem.FileSystem} for the given {@link BrokerDesc}.
*
* <p>For broker storage ({@link BrokerProperties}), resolves the live broker host:port via
* {@code BrokerMgr} and delegates to {@link #getBrokerFileSystem}. For all other storage
* types (HDFS, S3, etc.), converts via {@link StoragePropertiesConverter} and delegates to
* {@link #getFileSystem(StorageProperties)}.
*
* <p>This is the preferred entry point for fe-core code that holds a {@code BrokerDesc}
* and wants to perform filesystem operations (list, delete, read, write) via the unified
* {@code FileSystem} SPI without caring about the underlying storage type.
*
* @param brokerDesc descriptor for the target storage
* @return initialized {@link org.apache.doris.filesystem.FileSystem}; caller must close it
* @throws UserException if the broker cannot be resolved or filesystem creation fails
*/
public static org.apache.doris.filesystem.FileSystem getFileSystem(BrokerDesc brokerDesc)
throws UserException {
StorageProperties sp = brokerDesc.getStorageProperties();
if (sp instanceof BrokerProperties) {
BrokerProperties bp = (BrokerProperties) sp;
try {
String localIP = FrontendOptions.getLocalHostAddress();
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getBroker(bp.getBrokerName(), localIP);
String clientId = NetUtils.getHostPortInAccessibleFormat(localIP, Config.edit_log_port);
return getBrokerFileSystem(broker.host, broker.port, clientId, bp.getBrokerParams());
} catch (AnalysisException | IOException e) {
throw new UserException("Failed to create broker filesystem for '"
+ brokerDesc.getName() + "': " + e.getMessage(), e);
}
}
try {
return getFileSystem(sp);
} catch (IOException e) {
throw new UserException("Failed to create filesystem for '"
+ brokerDesc.getName() + "': " + e.getMessage(), e);
}
}
/**
* Creates a broker-backed {@link org.apache.doris.filesystem.FileSystem} using a
* pre-resolved broker endpoint.
*
* <p>The caller is responsible for resolving the broker name to a live host:port via
* {@code BrokerMgr.getBroker()} before calling this method. This keeps {@code BrokerMgr}
* coupling in fe-core only; the {@code fe-filesystem-broker} module has zero fe-core dependency.
*
* @param host live broker host (already resolved from BrokerMgr)
* @param port live broker Thrift port
* @param clientId FE identifier sent to broker for logging (e.g. "host:editLogPort")
* @param brokerParams broker-specific params (username, password, hadoop config, ...)
* @return initialized {@code org.apache.doris.filesystem.FileSystem}
* @throws IOException if the broker filesystem provider is not found or creation fails
*/
public static org.apache.doris.filesystem.FileSystem getBrokerFileSystem(
String host, int port, String clientId, Map<String, String> brokerParams) throws IOException {
Map<String, String> props = new HashMap<>(brokerParams);
props.put("_STORAGE_TYPE_", "BROKER");
props.put("BROKER_HOST", host);
props.put("BROKER_PORT", String.valueOf(port));
props.put("BROKER_CLIENT_ID", clientId);
return getFileSystem(props);
}
}