SpiSwitchingFileSystem.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.filesystem.spi.DorisInputFile;
import org.apache.doris.filesystem.spi.DorisOutputFile;
import org.apache.doris.filesystem.spi.FileEntry;
import org.apache.doris.filesystem.spi.FileIterator;
import org.apache.doris.filesystem.spi.FileSystem;
import org.apache.doris.filesystem.spi.GlobListing;
import org.apache.doris.filesystem.spi.Location;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* SPI-compatible replacement for the legacy {@code SwitchingFileSystem}.
*
* <p>Implements {@link FileSystem} and routes each operation to the appropriate
* {@link FileSystem} based on the URI scheme / authority of the path operand. The
* storage-type → {@link StorageProperties} mapping comes from the catalog's
* {@code storagePropertiesMap}.
*
* <p>Resolved {@link FileSystem} instances are cached per {@link StorageProperties} reference
* (identity-based) to avoid recreating connections on every call.
*/
public class SpiSwitchingFileSystem implements FileSystem {
private final Map<StorageProperties.Type, StorageProperties> storagePropertiesMap;
/** Non-null only when created via the test constructor — all paths delegate here. */
private FileSystem testDelegate;
/**
* Cache: StorageProperties (identity) → spi.FileSystem.
* Using identity comparison is correct because the properties map values are stable
* objects owned by the catalog instance.
*/
private final Map<StorageProperties, FileSystem> cache = new ConcurrentHashMap<>();
public SpiSwitchingFileSystem(Map<StorageProperties.Type, StorageProperties> storagePropertiesMap) {
this.storagePropertiesMap = storagePropertiesMap;
}
/**
* Testing constructor: routes every path to the supplied {@link FileSystem} delegate.
* Should never be used in production code.
*/
@VisibleForTesting
public SpiSwitchingFileSystem(FileSystem testDelegate) {
this.storagePropertiesMap = java.util.Collections.emptyMap();
this.testDelegate = testDelegate;
}
/** Resolves the appropriate {@link FileSystem} for the given URI string. */
public FileSystem forPath(String uri) throws IOException {
if (testDelegate != null) {
return testDelegate;
}
LocationPath lp = LocationPath.of(uri, storagePropertiesMap);
StorageProperties sp = lp.getStorageProperties();
if (sp == null) {
throw new IOException("No StorageProperties found for path: " + uri);
}
return cache.computeIfAbsent(sp, props -> {
try {
return FileSystemFactory.getFileSystem(props);
} catch (IOException e) {
throw new RuntimeException("Failed to create FileSystem for path " + uri, e);
}
});
}
/** Resolves the appropriate {@link FileSystem} for the given {@link Location}. */
public FileSystem forLocation(Location location) throws IOException {
return forPath(location.uri());
}
// -----------------------------------------------------------------------
// FileSystem interface — each method delegates to the per-path filesystem
// -----------------------------------------------------------------------
@Override
public boolean exists(Location location) throws IOException {
return forLocation(location).exists(location);
}
@Override
public void mkdirs(Location location) throws IOException {
forLocation(location).mkdirs(location);
}
@Override
public void delete(Location location, boolean recursive) throws IOException {
forLocation(location).delete(location, recursive);
}
@Override
public void rename(Location src, Location dst) throws IOException {
forLocation(src).rename(src, dst);
}
@Override
public FileIterator list(Location location) throws IOException {
return forLocation(location).list(location);
}
@Override
public List<FileEntry> listFiles(Location dir) throws IOException {
return forLocation(dir).listFiles(dir);
}
@Override
public List<FileEntry> listFilesRecursive(Location dir) throws IOException {
return forLocation(dir).listFilesRecursive(dir);
}
@Override
public Set<String> listDirectories(Location dir) throws IOException {
return forLocation(dir).listDirectories(dir);
}
@Override
public void renameDirectory(Location src, Location dst, Runnable whenSrcNotExists)
throws IOException {
forLocation(src).renameDirectory(src, dst, whenSrcNotExists);
}
@Override
public DorisInputFile newInputFile(Location location) throws IOException {
return forLocation(location).newInputFile(location);
}
@Override
public DorisInputFile newInputFile(Location location, long length) throws IOException {
return forLocation(location).newInputFile(location, length);
}
@Override
public DorisOutputFile newOutputFile(Location location) throws IOException {
return forLocation(location).newOutputFile(location);
}
@Override
public GlobListing globListWithLimit(Location path, String startAfter, long maxBytes,
long maxFiles) throws IOException {
return forLocation(path).globListWithLimit(path, startAfter, maxBytes, maxFiles);
}
@Override
public void close() throws IOException {
// The cached FileSystem instances are shared; do not close here.
// Lifecycle is managed by the catalog.
}
}