FileSystemFactory.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.fs;

import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.filesystem.spi.FileSystemProvider;
import org.apache.doris.fs.remote.RemoteFileSystem;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;

/**
 * Factory for filesystem instances.
 *
 * <h2>Two APIs</h2>
 * <ul>
 *   <li><b>Legacy API</b> ({@code get(StorageProperties)} / {@code get(FileSystemType, Map)}) —
 *       delegates to {@link StorageTypeMapper}; returns {@link RemoteFileSystem}. All existing
 *       fe-core callers continue to use this path unchanged.</li>
 *   <li><b>SPI API</b> ({@code getFileSystem(Map)} / {@code getFileSystem(StorageProperties)}) —
 *       delegates to {@link FileSystemPluginManager}; returns
 *       {@code org.apache.doris.filesystem.spi.FileSystem}. New code should use this path.</li>
 * </ul>
 *
 * <p>Call {@link #initPluginManager(FileSystemPluginManager)} at FE startup before any
 * {@code getFileSystem()} call. In production, providers are loaded from the plugin directory
 * configured via {@code filesystem_plugin_root}. In unit tests, providers are discovered from
 * the test classpath via ServiceLoader.
 */
public final class FileSystemFactory {

    private static final Logger LOG = LogManager.getLogger(FileSystemFactory.class);

    // Plugin manager singleton, set at FE startup
    private static volatile FileSystemPluginManager pluginManager;

    // Fallback provider cache for non-initialized environments (tests, migration phase)
    private static volatile List<FileSystemProvider> cachedProviders = null;

    private FileSystemFactory() {}

    // =========================================================
    // Legacy API — backward compatible, returns RemoteFileSystem
    // =========================================================

    /**
     * Legacy entry point. Returns a {@link RemoteFileSystem} via {@link StorageTypeMapper}.
     *
     * @deprecated New code should use {@link #getFileSystem(Map)} instead.
     */
    @Deprecated
    public static RemoteFileSystem get(StorageProperties storageProperties) {
        return StorageTypeMapper.create(storageProperties);
    }

    /**
     * Legacy entry point by enum type. Returns a {@link RemoteFileSystem} via
     * {@link StorageTypeMapper}.
     *
     * @deprecated New code should use {@link #getFileSystem(Map)} instead.
     */
    @Deprecated
    public static RemoteFileSystem get(FileSystemType fileSystemType, Map<String, String> properties)
            throws UserException {
        List<StorageProperties> storagePropertiesList = StorageProperties.createAll(properties);
        for (StorageProperties storageProperties : storagePropertiesList) {
            if (storageProperties.getStorageName().equalsIgnoreCase(fileSystemType.name())) {
                return StorageTypeMapper.create(storageProperties);
            }
        }
        throw new RuntimeException("Unsupported file system type: " + fileSystemType);
    }

    // =========================================================
    // SPI API — returns spi.FileSystem
    // =========================================================

    /**
     * Sets the plugin manager singleton. Called once at FE startup before any
     * {@code getFileSystem()} invocation.
     */
    public static void initPluginManager(FileSystemPluginManager manager) {
        pluginManager = manager;
    }

    /**
     * SPI entry point: selects a provider and creates the filesystem.
     *
     * <p>If {@link #initPluginManager} has been called (production path),
     * delegates to {@link FileSystemPluginManager#createFileSystem}.
     * Otherwise falls back to ServiceLoader discovery (unit-test / migration path).
     *
     * @param properties key-value storage configuration
     * @return initialized {@code org.apache.doris.filesystem.spi.FileSystem}
     * @throws IOException if no provider matches or creation fails
     */
    public static org.apache.doris.filesystem.spi.FileSystem getFileSystem(Map<String, String> properties)
            throws IOException {
        FileSystemPluginManager mgr = pluginManager;
        if (mgr != null) {
            return mgr.createFileSystem(properties);
        }
        // Fallback: ServiceLoader discovery (unit-test / migration path)
        List<FileSystemProvider> providers = getProviders();
        List<String> tried = new ArrayList<>();
        for (FileSystemProvider provider : providers) {
            if (provider.supports(properties)) {
                LOG.debug("FileSystemFactory: selected SPI provider '{}' for keys={}",
                        provider.name(), properties.keySet());
                return provider.create(properties);
            }
            tried.add(provider.name());
        }
        throw new IOException(String.format(
                "No FileSystemProvider found for properties %s. Tried: %s. "
                        + "Ensure the corresponding fe-filesystem-xxx jar is on the classpath.",
                properties.keySet(), tried));
    }

    /**
     * SPI entry point accepting legacy {@link StorageProperties}.
     * Converts via {@link StoragePropertiesConverter} then delegates to
     * {@link #getFileSystem(Map)}.
     */
    public static org.apache.doris.filesystem.spi.FileSystem getFileSystem(StorageProperties storageProperties)
            throws IOException {
        return getFileSystem(StoragePropertiesConverter.toMap(storageProperties));
    }

    /**
     * Returns all discovered SPI providers via ServiceLoader. Uses cached list after first load.
     * Used only in the fallback path when pluginManager is not initialized.
     * Package-private for testing.
     */
    static List<FileSystemProvider> getProviders() {
        if (cachedProviders == null) {
            synchronized (FileSystemFactory.class) {
                if (cachedProviders == null) {
                    List<FileSystemProvider> providers = new ArrayList<>();
                    ServiceLoader<FileSystemProvider> loader = ServiceLoader.load(
                            FileSystemProvider.class,
                            Thread.currentThread().getContextClassLoader());
                    loader.forEach(providers::add);
                    LOG.info("FileSystemFactory: loaded {} SPI provider(s): {}",
                            providers.size(),
                            providers.stream().map(FileSystemProvider::name)
                                    .collect(java.util.stream.Collectors.joining(", ")));
                    cachedProviders = providers;
                }
            }
        }
        return cachedProviders;
    }

    /**
     * Clears the SPI provider cache and plugin manager. For testing only.
     */
    static void clearProviderCache() {
        cachedProviders = null;
        pluginManager = null;
    }

    /**
     * Creates a broker-backed {@link org.apache.doris.filesystem.spi.FileSystem} using a
     * pre-resolved broker endpoint.
     *
     * <p>The caller is responsible for resolving the broker name to a live host:port via
     * {@code BrokerMgr.getBroker()} before calling this method. This keeps {@code BrokerMgr}
     * coupling in fe-core only; the {@code fe-filesystem-broker} module has zero fe-core dependency.
     *
     * @param host        live broker host (already resolved from BrokerMgr)
     * @param port        live broker Thrift port
     * @param clientId    FE identifier sent to broker for logging (e.g. "host:editLogPort")
     * @param brokerParams broker-specific params (username, password, hadoop config, ...)
     * @return initialized {@code org.apache.doris.filesystem.spi.FileSystem}
     * @throws IOException if the broker filesystem provider is not found or creation fails
     */
    public static org.apache.doris.filesystem.spi.FileSystem getBrokerFileSystem(
            String host, int port, String clientId, Map<String, String> brokerParams) throws IOException {
        Map<String, String> props = new HashMap<>(brokerParams);
        props.put("_STORAGE_TYPE_", "BROKER");
        props.put("BROKER_HOST", host);
        props.put("BROKER_PORT", String.valueOf(port));
        props.put("BROKER_CLIENT_ID", clientId);
        return getFileSystem(props);
    }
}