FileSystemCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.filesystem.DorisInputFile;
import org.apache.doris.filesystem.DorisOutputFile;
import org.apache.doris.filesystem.FileEntry;
import org.apache.doris.filesystem.FileIterator;
import org.apache.doris.filesystem.FileSystem;
import org.apache.doris.filesystem.GlobListing;
import org.apache.doris.filesystem.Location;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class FileSystemCache {
private static final Logger LOG = LogManager.getLogger(FileSystemCache.class);
private final LoadingCache<FileSystemCacheKey, CachedFileSystem> fileSystemCache;
public FileSystemCache() {
// no need to set refreshAfterWrite, because the FileSystem is created once and never changed
CacheFactory fsCacheFactory = new CacheFactory(
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
OptionalLong.empty(),
Config.max_remote_file_system_cache_num,
false,
null);
fileSystemCache = fsCacheFactory.buildCacheWithSyncRemovalListener(
new CacheLoader<FileSystemCacheKey, CachedFileSystem>() {
@Override
public CachedFileSystem load(FileSystemCacheKey key) {
return loadFileSystem(key);
}
}, new RemovalListener<FileSystemCacheKey, CachedFileSystem>() {
@Override
public void onRemoval(FileSystemCacheKey key, CachedFileSystem cachedFs,
RemovalCause cause) {
if (cachedFs != null) {
try {
cachedFs.retire();
} catch (IOException e) {
LOG.warn("Failed to close evicted FileSystem for key: {}", key, e);
}
}
}
});
}
private CachedFileSystem loadFileSystem(FileSystemCacheKey key) {
try {
return new CachedFileSystem(FileSystemFactory.getFileSystem(key.properties));
} catch (IOException e) {
throw new RuntimeException("Failed to create filesystem for key: " + key, e);
}
}
public FileSystem getFileSystem(FileSystemCacheKey key) {
while (true) {
CachedFileSystem cachedFs = fileSystemCache.get(key);
FileSystem leasedFs = cachedFs.tryLease();
if (leasedFs != null) {
return leasedFs;
}
fileSystemCache.asMap().remove(key, cachedFs);
}
}
LoadingCache<FileSystemCacheKey, ?> getNativeCacheForTest() {
return fileSystemCache;
}
private static class CachedFileSystem {
private final FileSystem delegate;
private final AtomicInteger refCount = new AtomicInteger(0);
private final AtomicBoolean retiring = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private CachedFileSystem(FileSystem delegate) {
this.delegate = delegate;
}
private FileSystem tryLease() {
while (true) {
if (retiring.get() || closed.get()) {
return null;
}
int refs = refCount.get();
if (refCount.compareAndSet(refs, refs + 1)) {
if (retiring.get() || closed.get()) {
release();
return null;
}
return new LeasedFileSystem(this);
}
}
}
private void retire() throws IOException {
retiring.set(true);
closeIfIdle();
}
private void release() {
int refs = refCount.decrementAndGet();
if (refs < 0) {
throw new IllegalStateException("FileSystem cache lease released more than once");
}
if (refs == 0 && retiring.get()) {
try {
closeIfIdle();
} catch (IOException e) {
LOG.warn("Failed to close retired FileSystem", e);
}
}
}
private void closeIfIdle() throws IOException {
if (refCount.get() == 0 && closed.compareAndSet(false, true)) {
delegate.close();
}
}
}
private static class LeasedFileSystem implements FileSystem {
private final CachedFileSystem cachedFs;
private final AtomicBoolean closed = new AtomicBoolean(false);
private LeasedFileSystem(CachedFileSystem cachedFs) {
this.cachedFs = cachedFs;
}
@Override
public boolean exists(Location location) throws IOException {
return cachedFs.delegate.exists(location);
}
@Override
public void mkdirs(Location location) throws IOException {
cachedFs.delegate.mkdirs(location);
}
@Override
public void delete(Location location, boolean recursive) throws IOException {
cachedFs.delegate.delete(location, recursive);
}
@Override
public void deleteFiles(Collection<Location> locations) throws IOException {
cachedFs.delegate.deleteFiles(locations);
}
@Override
public void rename(Location src, Location dst) throws IOException {
cachedFs.delegate.rename(src, dst);
}
@Override
public FileIterator list(Location location) throws IOException {
return cachedFs.delegate.list(location);
}
@Override
public List<FileEntry> listFiles(Location dir) throws IOException {
return cachedFs.delegate.listFiles(dir);
}
@Override
public List<FileEntry> listFilesRecursive(Location dir) throws IOException {
return cachedFs.delegate.listFilesRecursive(dir);
}
@Override
public Set<String> listDirectories(Location dir) throws IOException {
return cachedFs.delegate.listDirectories(dir);
}
@Override
public void renameDirectory(Location src, Location dst, Runnable whenSrcNotExists)
throws IOException {
cachedFs.delegate.renameDirectory(src, dst, whenSrcNotExists);
}
@Override
public DorisInputFile newInputFile(Location location) throws IOException {
return cachedFs.delegate.newInputFile(location);
}
@Override
public DorisInputFile newInputFile(Location location, long length) throws IOException {
return cachedFs.delegate.newInputFile(location, length);
}
@Override
public DorisOutputFile newOutputFile(Location location) throws IOException {
return cachedFs.delegate.newOutputFile(location);
}
@Override
public GlobListing globListWithLimit(Location path, String startAfter, long maxBytes,
long maxFiles) throws IOException {
return cachedFs.delegate.globListWithLimit(path, startAfter, maxBytes, maxFiles);
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
cachedFs.release();
}
}
}
public static class FileSystemCacheKey {
// eg: hdfs://nameservices1
private final String fsIdent;
private final StorageProperties properties;
public FileSystemCacheKey(String fsIdent, StorageProperties properties) {
this.fsIdent = fsIdent;
this.properties = properties;
}
public StorageProperties getProperties() {
return properties;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
FileSystemCacheKey o = (FileSystemCacheKey) obj;
return fsIdent.equals(o.fsIdent)
&& properties.equals(o.properties);
}
@Override
public int hashCode() {
return Objects.hash(properties, fsIdent);
}
}
}