PointQueryVersionCache.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.catalog;

import org.apache.doris.catalog.Partition;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.VersionHelper;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

/**
 * A request-coalescing version cache for point queries in cloud mode.
 *
 * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch
 * the partition's visible version from MetaService. Under high concurrency, this causes
 * N RPCs for N concurrent point queries on the same partition.</p>
 *
 * <p>This cache optimizes the version fetching by:
 * <ul>
 *   <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration
 *       ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window,
 *       concurrent queries reuse the cached version.</li>
 *   <li><b>Request coalescing</b>: When the cache expires, only the first request issues
 *       the MetaService RPC. Concurrent requests for the same partition wait on the inflight
 *       result via a {@link CompletableFuture}.</li>
 * </ul>
 * </p>
 */
public class PointQueryVersionCache {
    private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class);

    private static volatile PointQueryVersionCache instance;

    /**
     * Cache entry holding the version and the timestamp when it was cached.
     */
    static class VersionEntry {
        final long version;
        final long cachedTimeMs;

        VersionEntry(long version, long cachedTimeMs) {
            this.version = version;
            this.cachedTimeMs = cachedTimeMs;
        }

        boolean isExpired(long ttlMs) {
            if (ttlMs <= 0) {
                return true;
            }
            return System.currentTimeMillis() - cachedTimeMs > ttlMs;
        }
    }

    // partitionId -> cached VersionEntry
    private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>();

    // partitionId -> inflight RPC future (for request coalescing)
    private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>();

    @VisibleForTesting
    public PointQueryVersionCache() {
    }

    public static PointQueryVersionCache getInstance() {
        if (instance == null) {
            synchronized (PointQueryVersionCache.class) {
                if (instance == null) {
                    instance = new PointQueryVersionCache();
                }
            }
        }
        return instance;
    }

    @VisibleForTesting
    public static void setInstance(PointQueryVersionCache cache) {
        instance = cache;
    }

    /**
     * Get the visible version for a partition, using TTL-based caching and request coalescing.
     *
     * @param partition  the cloud partition to get version for
     * @param ttlMs      TTL in milliseconds; 0 or negative disables caching
     * @return the visible version
     * @throws RpcException if the MetaService RPC fails
     */
    public long getVersion(CloudPartition partition, long ttlMs) throws RpcException {
        long partitionId = partition.getId();

        // If cache is disabled, fetch directly
        if (ttlMs <= 0) {
            return fetchVersionFromMs(partition);
        }

        // Check cache first
        VersionEntry entry = cache.get(partitionId);
        if (entry != null && !entry.isExpired(ttlMs)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version);
            }
            return entry.version;
        }

        // Cache miss or expired: use request coalescing
        return getVersionWithCoalescing(partition, partitionId, ttlMs);
    }

    private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs)
            throws RpcException {
        // Try to become the leader request for this partition
        CompletableFuture<Long> myFuture = new CompletableFuture<>();
        CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture);

        if (existingFuture != null) {
            // Another request is already in flight ��� wait for its result
            if (LOG.isDebugEnabled()) {
                LOG.debug("point query version coalescing, waiting for inflight request, partition={}",
                        partitionId);
            }
            try {
                return existingFuture.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RpcException("get version", "interrupted while waiting for coalesced request");
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof RpcException) {
                    throw (RpcException) cause;
                }
                throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage());
            }
        }

        // We are the leader ��� fetch version from MetaService
        try {
            long version = fetchVersionFromMs(partition);
            // Update cache
            cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis()));
            // Also update the partition's cached version
            partition.setCachedVisibleVersion(version, System.currentTimeMillis());
            // Complete the future so waiting requests get the result
            myFuture.complete(version);
            if (LOG.isDebugEnabled()) {
                LOG.debug("point query version fetched from MS, partition={}, version={}",
                        partitionId, version);
            }
            return version;
        } catch (Exception e) {
            // Complete exceptionally so waiting requests also get the error
            myFuture.completeExceptionally(e);
            if (e instanceof RpcException) {
                throw (RpcException) e;
            }
            throw new RpcException("get version", e.getMessage());
        } finally {
            // Remove the inflight request entry
            inflightRequests.remove(partitionId, myFuture);
        }
    }

    /**
     * Fetch visible version from MetaService for a single partition.
     * This method is package-private to allow mocking in tests.
     */
    @VisibleForTesting
    protected long fetchVersionFromMs(CloudPartition partition) throws RpcException {
        Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
                .setRequestIp(FrontendOptions.getLocalHostAddressCached())
                .setDbId(partition.getDbId())
                .setTableId(partition.getTableId())
                .setPartitionId(partition.getId())
                .setBatchMode(false)
                .build();

        Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
        if (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK) {
            return resp.getVersion();
        } else if (resp.getStatus().getCode() == Cloud.MetaServiceCode.VERSION_NOT_FOUND) {
            return Partition.PARTITION_INIT_VERSION;
        } else {
            throw new RpcException("get version", "unexpected status " + resp.getStatus());
        }
    }

    /**
     * Clear all cached entries. Primarily for testing.
     */
    @VisibleForTesting
    public void clear() {
        cache.clear();
        inflightRequests.clear();
    }

    /**
     * Get the number of cached entries. Primarily for testing.
     */
    @VisibleForTesting
    public int cacheSize() {
        return cache.size();
    }
}