EvictableCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCache.java
// and modified by Doris
package org.apache.doris.common;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Verify;
import com.google.common.cache.AbstractLoadingCache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheStats;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalCause;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
/**
* A {@link Cache} and {@link LoadingCache} implementation similar to ones
* produced by {@link CacheBuilder#build()}, but one that does not
* exhibit <a href="https://github.com/google/guava/issues/1881">Guava issue #1881</a>:
* a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object, Callable)}
* is guaranteed to return fresh state after {@link #invalidate(Object)},
* {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
*
* @see EvictableCacheBuilder
*/
// @ElementTypesAreNonnullByDefault
class EvictableCache<K, V>
extends AbstractLoadingCache<K, V>
implements LoadingCache<K, V> {
// Invariant: for every (K, token) entry in the tokens map, there is a live
// cache entry (token, ?) in dataCache, that, upon eviction, will cause the tokens'
// entry to be removed.
private final ConcurrentHashMap<K, Token<K>> tokens = new ConcurrentHashMap<>();
// The dataCache can have entries with no corresponding tokens in the tokens map.
// For example, this can happen when invalidation concurs with load.
// The dataCache must be bounded.
private final LoadingCache<Token<K>, V> dataCache;
private final AtomicInteger invalidations = new AtomicInteger();
EvictableCache(CacheBuilder<? super Token<K>, ? super V> cacheBuilder, CacheLoader<? super K, V> cacheLoader) {
dataCache = buildUnsafeCache(
cacheBuilder
.<Token<K>, V>removalListener(removal -> {
Token<K> token = removal.getKey();
Verify.verify(token != null, "token is null");
if (removal.getCause() != RemovalCause.REPLACED) {
tokens.remove(token.getKey(), token);
}
}),
new TokenCacheLoader<>(cacheLoader));
}
// @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden,
// advising to use this class as a safety-adding wrapper.
private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? super K, ? super V> cacheBuilder,
CacheLoader<? super K, V> cacheLoader) {
return cacheBuilder.build(cacheLoader);
}
@Override
public V getIfPresent(Object key) {
@SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
Token<K> token = tokens.get(key);
if (token == null) {
return null;
}
return dataCache.getIfPresent(token);
}
@Override
public V get(K key, Callable<? extends V> valueLoader)
throws ExecutionException {
Token<K> newToken = new Token<>(key);
int invalidations = this.invalidations.get();
Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken);
try {
V value = dataCache.get(token, valueLoader);
if (invalidations == this.invalidations.get()) {
// Revive token if it got expired before reloading
if (tokens.putIfAbsent(key, token) == null) {
// Revived
if (!dataCache.asMap().containsKey(token)) {
// We revived, but the token does not correspond to a live entry anymore.
// It would stay in tokens forever, so let's remove it.
tokens.remove(key, token);
}
}
}
return value;
} catch (Throwable e) {
if (newToken == token) {
// Failed to load and it was our new token persisted in tokens map.
// No cache entry exists for the token (unless concurrent load happened),
// so we need to remove it.
tokens.remove(key, newToken);
}
throw e;
}
}
@Override
public V get(K key)
throws ExecutionException {
Token<K> newToken = new Token<>(key);
int invalidations = this.invalidations.get();
Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken);
try {
V value = dataCache.get(token);
if (invalidations == this.invalidations.get()) {
// Revive token if it got expired before reloading
if (tokens.putIfAbsent(key, token) == null) {
// Revived
if (!dataCache.asMap().containsKey(token)) {
// We revived, but the token does not correspond to a live entry anymore.
// It would stay in tokens forever, so let's remove it.
tokens.remove(key, token);
}
}
}
return value;
} catch (Throwable e) {
if (newToken == token) {
// Failed to load and it was our new token persisted in tokens map.
// No cache entry exists for the token (unless concurrent load happened),
// so we need to remove it.
tokens.remove(key, newToken);
}
throw e;
}
}
@Override
public ImmutableMap<K, V> getAll(Iterable<? extends K> keys)
throws ExecutionException {
List<Token<K>> newTokens = new ArrayList<>();
List<Token<K>> temporaryTokens = new ArrayList<>();
try {
Map<K, V> result = new LinkedHashMap<>();
for (K key : keys) {
if (result.containsKey(key)) {
continue;
}
// This is not bulk, but is fast local operation
Token<K> newToken = new Token<>(key);
Token<K> oldToken = tokens.putIfAbsent(key, newToken);
if (oldToken != null) {
// Token exists but a data may not exist (e.g. due to concurrent eviction)
V value = dataCache.getIfPresent(oldToken);
if (value != null) {
result.put(key, value);
continue;
}
// Old token exists but value wasn't found. This can happen when there is concurrent
// eviction/invalidation or when the value is still being loaded.
// The new token is not registered in tokens, so won't be used by subsequent invocations.
temporaryTokens.add(newToken);
}
newTokens.add(newToken);
}
Map<Token<K>, V> values = dataCache.getAll(newTokens);
for (Map.Entry<Token<K>, V> entry : values.entrySet()) {
Token<K> newToken = entry.getKey();
result.put(newToken.getKey(), entry.getValue());
}
return ImmutableMap.copyOf(result);
} catch (Throwable e) {
for (Token<K> token : newTokens) {
// Failed to load and it was our new token (potentially) persisted in tokens map.
// No cache entry exists for the token (unless concurrent load happened),
// so we need to remove it.
tokens.remove(token.getKey(), token);
}
throw e;
} finally {
dataCache.invalidateAll(temporaryTokens);
}
}
@Override
public void refresh(K key) {
// The refresh loads a new entry, if it wasn't in the cache yet. Thus, we would create a new Token.
// However, dataCache.refresh is asynchronous and may fail, so no cache entry may be created.
// In such case we would leak the newly created token.
throw new UnsupportedOperationException();
}
@Override
public long size() {
return dataCache.size();
}
@Override
public void cleanUp() {
dataCache.cleanUp();
}
@VisibleForTesting
int tokensCount() {
return tokens.size();
}
@Override
public void invalidate(Object key) {
invalidations.incrementAndGet();
@SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
Token<K> token = tokens.remove(key);
if (token != null) {
dataCache.invalidate(token);
}
}
@Override
public void invalidateAll() {
invalidations.incrementAndGet();
dataCache.invalidateAll();
tokens.clear();
}
// Not thread safe, test only.
@VisibleForTesting
void clearDataCacheOnly() {
Map<K, Token<K>> tokensCopy = new HashMap<>(tokens);
dataCache.asMap().clear();
Verify.verify(tokens.isEmpty(), "Clearing dataCache should trigger tokens eviction");
tokens.putAll(tokensCopy);
}
@Override
public CacheStats stats() {
return dataCache.stats();
}
@Override
public ConcurrentMap<K, V> asMap() {
return new ConcurrentMap<K, V>() {
private final ConcurrentMap<Token<K>, V> dataCacheMap = dataCache.asMap();
@Override
public V putIfAbsent(K key, V value) {
throw new UnsupportedOperationException("The operation is not supported,"
+ " as in inherently races with cache invalidation");
}
@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
// default implementation of ConcurrentMap#compute uses not supported putIfAbsent in some cases
throw new UnsupportedOperationException("The operation is not supported, as in inherently"
+ " races with cache invalidation");
}
@Override
public boolean remove(Object key, Object value) {
@SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
Token<K> token = tokens.get(key);
if (token != null) {
return dataCacheMap.remove(token, value);
}
return false;
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
Token<K> token = tokens.get(key);
if (token != null) {
return dataCacheMap.replace(token, oldValue, newValue);
}
return false;
}
@Override
public V replace(K key, V value) {
throw new UnsupportedOperationException("The operation is not supported, as in inherently races"
+ " with cache invalidation");
}
@Override
public int size() {
return dataCache.asMap().size();
}
@Override
public boolean isEmpty() {
return dataCache.asMap().isEmpty();
}
@Override
public boolean containsKey(Object key) {
return tokens.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return values().contains(value);
}
@Override
@Nullable
public V get(Object key) {
return getIfPresent(key);
}
@Override
public V put(K key, V value) {
throw new UnsupportedOperationException("The operation is not supported, as in inherently"
+ " races with cache invalidation. Use get(key, callable) instead.");
}
@Override
@Nullable
public V remove(Object key) {
Token<K> token = tokens.remove(key);
if (token != null) {
return dataCacheMap.remove(token);
}
return null;
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
throw new UnsupportedOperationException("The operation is not supported, as in inherently"
+ " races with cache invalidation. Use get(key, callable) instead.");
}
@Override
public void clear() {
dataCacheMap.clear();
tokens.clear();
}
@Override
public Set<K> keySet() {
return tokens.keySet();
}
@Override
public Collection<V> values() {
return dataCacheMap.values();
}
@Override
public Set<Map.Entry<K, V>> entrySet() {
throw new UnsupportedOperationException();
}
};
}
// instance-based equality
static final class Token<K> {
private final K key;
Token(K key) {
this.key = Objects.requireNonNull(key, "key is null");
}
K getKey() {
return key;
}
@Override
public String toString() {
return String.format("CacheToken(%s; %s)", Integer.toHexString(hashCode()), key);
}
}
private static class TokenCacheLoader<K, V>
extends CacheLoader<Token<K>, V> {
private final CacheLoader<? super K, V> delegate;
public TokenCacheLoader(CacheLoader<? super K, V> delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate is null");
}
@Override
public V load(Token<K> token)
throws Exception {
return delegate.load(token.getKey());
}
@Override
public ListenableFuture<V> reload(Token<K> token, V oldValue)
throws Exception {
return delegate.reload(token.getKey(), oldValue);
}
@Override
public Map<Token<K>, V> loadAll(Iterable<? extends Token<K>> tokens)
throws Exception {
List<Token<K>> tokenList = ImmutableList.copyOf(tokens);
List<K> keys = new ArrayList<>();
for (Token<K> token : tokenList) {
keys.add(token.getKey());
}
Map<? super K, V> values;
try {
values = delegate.loadAll(keys);
} catch (UnsupportedLoadingOperationException e) {
// Guava uses UnsupportedLoadingOperationException in LoadingCache.loadAll
// to fall back from bulk loading (without load sharing) to loading individual
// values (with load sharing). EvictableCache implementation does not currently
// support the fallback mechanism, so the individual values would be loaded
// without load sharing. This would be an unintentional and non-obvious behavioral
// discrepancy between EvictableCache and Guava Caches, so the mechanism is disabled.
throw new UnsupportedOperationException("LoadingCache.getAll() is not supported by EvictableCache"
+ " when CacheLoader.loadAll is not implemented", e);
}
ImmutableMap.Builder<Token<K>, V> result = ImmutableMap.builder();
for (int i = 0; i < tokenList.size(); i++) {
Token<K> token = tokenList.get(i);
K key = keys.get(i);
V value = values.get(key);
// CacheLoader.loadAll is not guaranteed to return values for all the keys
if (value != null) {
result.put(token, value);
}
}
return result.buildOrThrow();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.addValue(delegate)
.toString();
}
}
}