FileSystemCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.filesystem.FileSystem;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.function.Function;
public class FileSystemCache {
private static final Logger LOG = LogManager.getLogger(FileSystemCache.class);
private final LoadingCache<FileSystemCacheKey, FileSystemHolder> fileSystemCache;
private final Function<FileSystemCacheKey, FileSystem> loader;
public FileSystemCache() {
this(
Config.max_remote_file_system_cache_num,
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
FileSystemCache::loadFileSystem);
}
@VisibleForTesting
FileSystemCache(long maxSize, OptionalLong expireAfterAccessSec, Function<FileSystemCacheKey, FileSystem> loader) {
this.loader = Objects.requireNonNull(loader, "loader");
if (maxSize == 0) {
fileSystemCache = null;
return;
}
// no need to set refreshAfterWrite, because the FileSystem is created once and never changed
CacheFactory fsCacheFactory = new CacheFactory(
expireAfterAccessSec,
OptionalLong.empty(),
maxSize,
false,
null);
fileSystemCache = fsCacheFactory.buildCacheWithSyncRemovalListener(
key -> new FileSystemHolder(key, loader.apply(key)), (key, holder, cause) -> {
if (holder != null) {
holder.markEvicted();
}
});
}
private static FileSystem loadFileSystem(FileSystemCacheKey key) {
try {
return FileSystemFactory.getFileSystem(key.properties);
} catch (IOException e) {
throw new RuntimeException("Failed to create filesystem for key: " + key, e);
}
}
public FileSystemLease getFileSystem(FileSystemCacheKey key) {
if (fileSystemCache == null) {
return FileSystemLease.direct(key, loader.apply(key));
}
while (true) {
FileSystemHolder holder = fileSystemCache.get(key);
FileSystemLease lease = holder.acquire();
if (lease != null) {
return lease;
}
fileSystemCache.asMap().remove(key, holder);
}
}
@VisibleForTesting
void cleanUp() {
if (fileSystemCache != null) {
fileSystemCache.cleanUp();
}
}
private static final class FileSystemHolder {
private final FileSystemCacheKey key;
private final FileSystem fileSystem;
private int referenceCount = 0;
private boolean evicted = false;
private boolean closed = false;
private FileSystemHolder(FileSystemCacheKey key, FileSystem fileSystem) {
this.key = Objects.requireNonNull(key, "key");
this.fileSystem = Objects.requireNonNull(fileSystem, "fileSystem");
}
private synchronized FileSystemLease acquire() {
if (evicted || closed) {
return null;
}
referenceCount++;
return new FileSystemLease(this, fileSystem);
}
private synchronized void release() {
Preconditions.checkState(referenceCount > 0, "FileSystem lease has been released more than once");
referenceCount--;
closeIfIdle();
}
private synchronized void markEvicted() {
evicted = true;
closeIfIdle();
}
private void closeIfIdle() {
if (!evicted || referenceCount != 0 || closed) {
return;
}
closed = true;
try {
fileSystem.close();
} catch (IOException e) {
LOG.warn("Failed to close evicted FileSystem for key: {}", key, e);
}
}
}
public static final class FileSystemLease implements AutoCloseable {
private final FileSystemHolder holder;
private final FileSystem fileSystem;
private final FileSystemCacheKey directKey;
private boolean closed = false;
private FileSystemLease(FileSystemHolder holder, FileSystem fileSystem) {
this(holder, fileSystem, null);
}
private FileSystemLease(FileSystemHolder holder, FileSystem fileSystem, FileSystemCacheKey directKey) {
this.holder = holder;
this.fileSystem = fileSystem;
this.directKey = directKey;
}
private static FileSystemLease direct(FileSystemCacheKey key, FileSystem fileSystem) {
return new FileSystemLease(null, fileSystem, key);
}
public FileSystem fileSystem() {
return fileSystem;
}
@Override
public void close() {
if (closed) {
return;
}
closed = true;
if (holder != null) {
holder.release();
return;
}
try {
fileSystem.close();
} catch (IOException e) {
LOG.warn("Failed to close uncached FileSystem for key: {}", directKey, e);
}
}
}
public static class FileSystemCacheKey {
// eg: hdfs://nameservices1
private final String fsIdent;
private final StorageProperties properties;
public FileSystemCacheKey(String fsIdent, StorageProperties properties) {
this.fsIdent = fsIdent;
this.properties = properties;
}
public StorageProperties getProperties() {
return properties;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
FileSystemCacheKey o = (FileSystemCacheKey) obj;
return fsIdent.equals(o.fsIdent)
&& properties.equals(o.properties);
}
@Override
public int hashCode() {
return Objects.hash(properties, fsIdent);
}
}
}