TabletSlidingWindowAccessStats.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;

/**
 * Sliding window access statistics utility class.
 * Supports tracking access statistics for different types of IDs (tablet, replica, backend, etc.)
 */
public class TabletSlidingWindowAccessStats {
    private static final Logger LOG = LogManager.getLogger(TabletSlidingWindowAccessStats.class);

    private static volatile TabletSlidingWindowAccessStats instance;

    private static final HashFunction SHARD_HASH = Hashing.murmur3_128();

    // Sort active IDs by accessCount desc, then lastAccessTime desc
    private static final Comparator<AccessStatsResult> TOPN_ACTIVE_COMPARATOR =
            Comparator.comparingLong((AccessStatsResult r) -> r.accessCount).reversed()
                    .thenComparing(Comparator.comparingLong((AccessStatsResult r) -> r.lastAccessTime).reversed());

    // Time window in milliseconds (default: 1 hour)
    private final long timeWindowMs;

    // Bucket size in milliseconds (default: 1 minute)
    // The time window is divided into multiple buckets, each bucket stores access count for a time period.
    // For example: if timeWindowMs=1hour and bucketSizeMs=1minute, there will be 60 buckets.
    // Smaller bucket size = more accurate statistics but more memory usage.
    private final long bucketSizeMs;

    // Number of buckets in the sliding window
    private final int numBuckets;

    // Cleanup interval in milliseconds (default: 5 minutes)
    private final long cleanupIntervalMs;

    // Shard size to reduce lock contention
    private static final int SHARD_SIZE = 1024;

    /**
     * Sliding window counter for a single replica/tablet
     */
    private static class SlidingWindowCounter {
        // Each bucket stores count for a time period
        private final AtomicLongArray buckets;
        // Timestamp of each bucket (to detect expired buckets)
        private final AtomicLongArray bucketTimestamps;
        // Last access time (for TopN sorting)
        private volatile long lastAccessTime = 0;
        // Total count in current window (cached for performance)
        private volatile long cachedTotalCount = 0;
        private volatile long cachedCountTime = 0;

        SlidingWindowCounter(int numBuckets) {
            this.buckets = new AtomicLongArray(numBuckets);
            this.bucketTimestamps = new AtomicLongArray(numBuckets);
        }

        /**
         * Get current bucket index based on current time
         */
        private int getBucketIndex(long currentTimeMs, long bucketSizeMs, int numBuckets) {
            return (int) ((currentTimeMs / bucketSizeMs) % numBuckets);
        }

        /**
         * Add an access count
         */
        void add(long currentTimeMs, long bucketSizeMs, int numBuckets) {
            int bucketIndex = getBucketIndex(currentTimeMs, bucketSizeMs, numBuckets);
            long bucketStartTime = (currentTimeMs / bucketSizeMs) * bucketSizeMs;

            // Check if this bucket is expired (belongs to a different time window)
            long bucketTimestamp = bucketTimestamps.get(bucketIndex);
            if (bucketTimestamp != bucketStartTime) {
                // Reset expired bucket
                buckets.set(bucketIndex, 0);
                bucketTimestamps.set(bucketIndex, bucketStartTime);
            }

            // Increment count
            buckets.addAndGet(bucketIndex, 1);
            lastAccessTime = currentTimeMs;
            cachedTotalCount = -1; // Invalidate cache
        }

        /**
         * Get total count within the time window
         */
        long getCount(long currentTimeMs, long timeWindowMs, long bucketSizeMs, int numBuckets) {
            // Use cached value if still valid (within 1 second)
            if (cachedTotalCount >= 0 && (currentTimeMs - cachedCountTime) < 1000) {
                return cachedTotalCount;
            }

            long windowStart = currentTimeMs - timeWindowMs;
            long count = 0;

            for (int i = 0; i < numBuckets; i++) {
                long bucketTimestamp = bucketTimestamps.get(i);
                if (bucketTimestamp >= windowStart && bucketTimestamp > 0) {
                    count += buckets.get(i);
                }
            }

            cachedTotalCount = count;
            cachedCountTime = currentTimeMs;
            return count;
        }

        long getLastAccessTime() {
            return lastAccessTime;
        }

        /**
         * Clean up expired buckets
         */
        void cleanup(long currentTimeMs, long timeWindowMs) {
            long windowStart = currentTimeMs - timeWindowMs;
            for (int i = 0; i < buckets.length(); i++) {
                long bucketTimestamp = bucketTimestamps.get(i);
                if (bucketTimestamp > 0 && bucketTimestamp < windowStart) {
                    buckets.set(i, 0);
                    bucketTimestamps.set(i, 0);
                }
            }
            cachedTotalCount = -1; // Invalidate cache
        }

        /**
         * Check if this counter has any recent activity
         */
        boolean hasRecentActivity(long currentTimeMs, long timeWindowMs) {
            return lastAccessTime >= (currentTimeMs - timeWindowMs);
        }
    }

    /**
     * Shard structure to reduce lock contention
     */
    private static class AccessStatsShard {
        // ID counters: id -> SlidingWindowCounter
        private final ConcurrentHashMap<Long, SlidingWindowCounter> idCounters = new ConcurrentHashMap<>();
    }

    // Sharded access stats to reduce lock contention
    private final AccessStatsShard[] shards = new AccessStatsShard[SHARD_SIZE];

    // Access counter for monitoring
    private final AtomicLong totalAccessCount = new AtomicLong(0);

    // Aggregated stats cache for metrics/observability
    // - recentAccessCountInWindow: sum of accessCount of all active IDs in current time window
    // - activeIdsInWindow: number of IDs that have recent activity in current time window
    // These are computed on-demand with a TTL to avoid expensive full scans on every metric scrape.
    private static final long AGGREGATE_REFRESH_INTERVAL_MS = 10_000L;
    private final AtomicLong recentAccessCountInWindow = new AtomicLong(0);
    private final AtomicLong activeIdsInWindow = new AtomicLong(0);
    private final AtomicLong lastAggregateRefreshTimeMs = new AtomicLong(0);

    // Cleanup daemon
    private AccessStatsCleanupDaemon cleanupDaemon;

    // Thread pool for async recordAccess execution
    private ThreadPoolExecutor asyncExecutor;

    // Default bucket size: 1 minute (60 buckets for 1 hour window)
    private static final long DEFAULT_BUCKET_SIZE_SECOND = 60L;

    // Default cleanup interval: 5 minutes
    private static final long DEFAULT_CLEANUP_INTERVAL_SECOND = 300L;

    TabletSlidingWindowAccessStats() {
        this.timeWindowMs = Config.active_tablet_sliding_window_time_window_second * 1000L;
        this.bucketSizeMs = DEFAULT_BUCKET_SIZE_SECOND * 1000L;
        this.numBuckets = (int) (Config.active_tablet_sliding_window_time_window_second / DEFAULT_BUCKET_SIZE_SECOND);
        this.cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_SECOND * 1000L;

        // Initialize shards
        for (int i = 0; i < SHARD_SIZE; i++) {
            shards[i] = new AccessStatsShard();
        }

        // Start cleanup daemon and async executor if enabled
        if (Config.enable_active_tablet_sliding_window_access_stats) {
            this.cleanupDaemon = new AccessStatsCleanupDaemon();
            this.cleanupDaemon.start();
            // Initialize async executor for recordAccess
            // Use a small thread pool with bounded queue to avoid blocking
            // If queue is full, discard the task (statistics can tolerate some loss)
            this.asyncExecutor = new ThreadPoolExecutor(
                    2,  // core pool size
                    4,  // maximum pool size
                    60L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(1000), // queue capacity
                    r -> {
                        Thread t = new Thread(r, "sliding-window-access-stats-async-tablet-record");
                        t.setDaemon(true);
                        return t;
                    },
                    new ThreadPoolExecutor.DiscardPolicy() // discard when queue is full
            );

            LOG.info("SlidingWindowAccessStats initialized for type tablet: timeWindow={}ms, bucketSize={}ms, "
                    + "numBuckets={}, shardSize={}, cleanupInterval={}ms",
                    timeWindowMs, bucketSizeMs, numBuckets, SHARD_SIZE, cleanupIntervalMs);
        }
    }

    /**
     * Get shard index for a given ID
     */
    private int getShardIndex(long id) {
        int hash = SHARD_HASH.hashLong(id).asInt();
        return Math.floorMod(hash, SHARD_SIZE);
    }

    private void refreshAggregatesIfNeeded(long currentTimeMs) {
        long last = lastAggregateRefreshTimeMs.get();
        if (currentTimeMs - last < AGGREGATE_REFRESH_INTERVAL_MS) {
            return;
        }
        if (!lastAggregateRefreshTimeMs.compareAndSet(last, currentTimeMs)) {
            return;
        }

        int activeIds = 0;
        long totalAccess = 0;
        for (AccessStatsShard shard : shards) {
            for (SlidingWindowCounter counter : shard.idCounters.values()) {
                if (counter.hasRecentActivity(currentTimeMs, timeWindowMs)) {
                    activeIds++;
                    totalAccess += counter.getCount(currentTimeMs, timeWindowMs, bucketSizeMs, numBuckets);
                }
            }
        }
        activeIdsInWindow.set(activeIds);
        recentAccessCountInWindow.set(totalAccess);
    }

    /**
     * Get total access count within current time window across all IDs (cached).
     */
    public long getRecentAccessCountInWindow() {
        if (!Config.enable_active_tablet_sliding_window_access_stats) {
            return 0L;
        }
        long now = System.currentTimeMillis();
        refreshAggregatesIfNeeded(now);
        return recentAccessCountInWindow.get();
    }

    /**
     * Get number of active IDs within current time window (cached).
     */
    public long getActiveIdsInWindow() {
        if (!Config.enable_active_tablet_sliding_window_access_stats) {
            return 0L;
        }
        long now = System.currentTimeMillis();
        refreshAggregatesIfNeeded(now);
        return activeIdsInWindow.get();
    }

    /**
     * Get total access count since FE start (monotonic increasing while enabled).
     */
    public long getTotalAccessCount() {
        if (!Config.enable_active_tablet_sliding_window_access_stats) {
            return 0L;
        }
        return totalAccessCount.get();
    }

    /**
     * Record an access asynchronously
     * This method is non-blocking and should be used in high-frequency call paths
     * to avoid blocking the caller thread.
     */
    public void recordAccessAsync(long id) {
        if (asyncExecutor == null) {
            return;
        }

        try {
            asyncExecutor.execute(() -> {
                try {
                    recordAccess(id);
                } catch (Exception e) {
                    // Log but don't propagate exception to avoid affecting caller
                    LOG.warn("Failed to record access asynchronously for tablet id={}", id, e);
                }
            });
        } catch (Exception e) {
            // If executor is shutdown or queue is full, silently ignore
            // Statistics can tolerate some loss
            LOG.warn("Failed to submit async recordAccess task for tablet id={}", id, e);
        }
    }

    /**
     * Record an access
     */
    public void recordAccess(long id) {
        long currentTime = System.currentTimeMillis();
        int shardIndex = getShardIndex(id);
        AccessStatsShard shard = shards[shardIndex];

        SlidingWindowCounter counter = shard.idCounters.computeIfAbsent(id,
                k -> new SlidingWindowCounter(numBuckets));
        counter.add(currentTime, bucketSizeMs, numBuckets);
        totalAccessCount.incrementAndGet();
    }

    /**
     * Get access count for an ID within the time window
     */
    public AccessStatsResult getAccessInfo(long id) {
        if (!Config.enable_active_tablet_sliding_window_access_stats) {
            return null;
        }

        int shardIndex = getShardIndex(id);
        AccessStatsShard shard = shards[shardIndex];
        SlidingWindowCounter counter = shard.idCounters.get(id);

        if (counter == null) {
            return null;
        }

        long currentTime = System.currentTimeMillis();
        return new AccessStatsResult(
                id,
                counter.getCount(currentTime, timeWindowMs, bucketSizeMs, numBuckets),
                counter.getLastAccessTime());
    }

    /**
     * Result for top N query
     */
    public static class AccessStatsResult {
        public final long id;
        public final long accessCount;
        public final long lastAccessTime;

        public AccessStatsResult(long id, long accessCount, long lastAccessTime) {
            this.id = id;
            this.accessCount = accessCount;
            this.lastAccessTime = lastAccessTime;
        }

        @Override
        public String toString() {
            return "AccessStatsResult{"
                    + "id=" + id
                    + ", accessCount=" + accessCount
                    + ", lastAccessTime=" + lastAccessTime
                    + '}';
        }
    }

    /**
     * Get top N most active IDs
     * Uses a min-heap (PriorityQueue) to maintain TopN efficiently without sorting all results.
     */
    public List<AccessStatsResult> getTopNActive(int topN) {
        if (!Config.enable_active_tablet_sliding_window_access_stats) {
            return Collections.emptyList();
        }

        if (topN <= 0) {
            return Collections.emptyList();
        }

        long currentTime = System.currentTimeMillis();
        // Use a min-heap with reversed comparator to maintain TopN
        // The heap keeps the smallest element at the top, so we can efficiently replace it
        // when we find a larger element
        PriorityQueue<AccessStatsResult> minHeap = new PriorityQueue<>(
                topN + 1, // Initial capacity: topN + 1 to avoid resizing
                Collections.reverseOrder(TOPN_ACTIVE_COMPARATOR) // Reversed: min-heap for TopN
        );

        // Collect from all shards and maintain TopN using min-heap
        for (AccessStatsShard shard : shards) {
            for (Map.Entry<Long, SlidingWindowCounter> entry : shard.idCounters.entrySet()) {
                long id = entry.getKey();
                SlidingWindowCounter counter = entry.getValue();

                // Skip if no recent activity
                if (!counter.hasRecentActivity(currentTime, timeWindowMs)) {
                    continue;
                }

                long accessCount = counter.getCount(currentTime, timeWindowMs, bucketSizeMs, numBuckets);
                if (accessCount > 0) {
                    AccessStatsResult result = new AccessStatsResult(id, accessCount, counter.getLastAccessTime());

                    if (minHeap.size() < topN) {
                        // Heap not full, directly add
                        minHeap.offer(result);
                    } else {
                        // Heap is full, compare with the smallest element (heap top)
                        // If current element is larger, replace the heap top
                        if (TOPN_ACTIVE_COMPARATOR.compare(result, minHeap.peek()) > 0) {
                            minHeap.poll();
                            minHeap.offer(result);
                        }
                    }
                }
            }
        }

        // Convert heap to list and sort in descending order (TopN)
        List<AccessStatsResult> results = new ArrayList<>(minHeap);
        results.sort(TOPN_ACTIVE_COMPARATOR);
        return results;
    }

    /**
     * Clean up expired access records
     */
    private void cleanupExpiredRecords() {
        if (!Config.enable_active_tablet_sliding_window_access_stats) {
            return;
        }

        long currentTime = System.currentTimeMillis();
        int cleaned = 0;

        // Clean each shard
        for (AccessStatsShard shard : shards) {
            // Clean ID counters
            for (Map.Entry<Long, SlidingWindowCounter> entry : shard.idCounters.entrySet()) {
                SlidingWindowCounter counter = entry.getValue();
                counter.cleanup(currentTime, timeWindowMs);

                if (!counter.hasRecentActivity(currentTime, timeWindowMs)) {
                    shard.idCounters.remove(entry.getKey());
                    cleaned++;
                }
            }
        }

        if (LOG.isDebugEnabled() && cleaned > 0) {
            LOG.debug("Cleaned up {} expired access records for type tablet", cleaned);
        }
    }

    /**
     * Get statistics summary
     */
    public String getStatsSummary() {
        if (!Config.enable_active_tablet_sliding_window_access_stats) {
            return String.format("Active tablet sliding window access stats is disabled");
        }

        long currentTime = System.currentTimeMillis();
        refreshAggregatesIfNeeded(currentTime);
        int activeIds = (int) getActiveIdsInWindow();
        long totalAccess = getRecentAccessCountInWindow();

        return String.format(
            "SlidingWindowAccessStats{type=tablet, timeWindow=%ds, bucketSize=%ds, numBuckets=%d, "
                + "shardSize=%d, activeIds=%d, "
                + "totalAccess=%d, totalAccessCount=%d}",
            timeWindowMs / 1000, bucketSizeMs / 1000, numBuckets, SHARD_SIZE,
            activeIds, totalAccess, totalAccessCount.get());
    }

    /**
     * Cleanup daemon for expired records
     */
    private class AccessStatsCleanupDaemon extends MasterDaemon {
        public AccessStatsCleanupDaemon() {
            super("sliding-window-access-stats-cleanup-tablet" +  cleanupIntervalMs);
        }

        @Override
        protected void runAfterCatalogReady() {
            if (!Env.getCurrentEnv().isMaster()) {
                return;
            }

            try {
                cleanupExpiredRecords();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("tablet stat = {}, top 10 active = {}",
                            getStatsSummary(), getTopNActive(10));
                }
            } catch (Exception e) {
                LOG.warn("Failed to cleanup expired access records for type tablet", e);
            }
        }
    }

    public static TabletSlidingWindowAccessStats getInstance() {
        if (instance == null) {
            synchronized (TabletSlidingWindowAccessStats.class) {
                if (instance == null) {
                    instance = new TabletSlidingWindowAccessStats();
                }
            }
        }
        return instance;
    }

    // async record tablet instance access
    public static void recordTablet(long id) {
        TabletSlidingWindowAccessStats sas = getInstance();
        sas.recordAccessAsync(id);
    }
}