MovesCacheMap.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.common.Pair;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/*
 * MovesCacheMap stores MovesCache for every tag and medium.
 * MovesCache is a simple encapsulation of Guava Cache. Use it by calling MovesCache.get().
 * MovesCache's expireAfterAccess can be reset when updating the cache mapping. If expireAfterAccess reset,
 * all MovesCaches will be cleared and recreated.
 */
public class MovesCacheMap {
    private static final Logger LOG = LogManager.getLogger(MovesCacheMap.class);

    // tag -> medium -> MovesCache
    private final Map<Tag, Map<TStorageMedium, MovesCache>> cacheMap = Maps.newHashMap();
    private long lastExpireConfig = -1L;

    // TabletId -> Pair<Move, ToDeleteReplicaId>, 'ToDeleteReplicaId == -1'
    // means this move haven't been scheduled successfully.
    public static class MovesCache {
        Cache<Long, Pair<PartitionRebalancer.TabletMove, Long>> cache;

        MovesCache(long duration, TimeUnit unit) {
            cache = CacheBuilder.newBuilder().expireAfterAccess(duration, unit).build();
        }

        public Cache<Long, Pair<PartitionRebalancer.TabletMove, Long>> get() {
            return cache;
        }
    }

    // Cyclical update the cache mapping, cuz the tag may be deleted, we should delete the corresponding cache too.
    public void updateMapping(Map<Tag, LoadStatisticForTag> statisticMap, long expireAfterAccessSecond) {
        if (expireAfterAccessSecond > 0 && lastExpireConfig != expireAfterAccessSecond) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Reset expireAfterAccess, last {} s, now {} s. Moves will be cleared.",
                        lastExpireConfig, expireAfterAccessSecond);
            }
            cacheMap.clear();
            lastExpireConfig = expireAfterAccessSecond;
        }

        cacheMap.entrySet().stream().filter(c -> !statisticMap.containsKey(c.getKey())).forEach(
                c -> cacheMap.remove(c.getKey()));

        List<Map.Entry<Tag, LoadStatisticForTag>> toAdd = statisticMap.entrySet().stream()
                .filter(c -> !cacheMap.containsKey(c.getKey()))
                .collect(Collectors.toList());
        for (Map.Entry<Tag, LoadStatisticForTag> entry : toAdd) {
            Map<TStorageMedium, MovesCache> newCacheMap = Maps.newHashMap();
            Arrays.stream(TStorageMedium.values())
                    .forEach(m -> newCacheMap.put(m, new MovesCache(expireAfterAccessSecond, TimeUnit.SECONDS)));
            this.cacheMap.put(entry.getKey(), newCacheMap);
        }
    }

    public Map<Tag, Map<TStorageMedium, MovesCache>> getCacheMap() {
        return cacheMap;
    }

    public MovesCache getCache(Tag tag, TStorageMedium medium) {
        Map<TStorageMedium, MovesCache> mediumMoves = cacheMap.get(tag);
        if (mediumMoves != null) {
            return mediumMoves.get(medium);
        }
        return null;
    }

    public void invalidateTablet(TabletSchedCtx tabletCtx) {
        Map<TStorageMedium, MovesCache> mediumMoves = cacheMap.get(tabletCtx.getTag());
        if (mediumMoves != null) {
            MovesCache cache = mediumMoves.get(tabletCtx.getStorageMedium());
            if (cache != null) {
                cache.get().invalidate(tabletCtx.getTabletId());
            } else {
                mediumMoves.values().forEach(it -> it.get().invalidate(tabletCtx.getTabletId()));
            }
        }
    }

    // For given tablet ctx, find it in cacheMap
    public Pair<PartitionRebalancer.TabletMove, Long> getTabletMove(TabletSchedCtx tabletCtx) {
        for (Map<TStorageMedium, MovesCache> mediumMap : cacheMap.values()) {
            MovesCache cache = mediumMap.get(tabletCtx.getStorageMedium());
            if (cache == null) {
                continue;
            }
            return cache.get().getIfPresent(tabletCtx.getTabletId());
        }
        return null;
    }


    // For each MovesCache, performs any pending maintenance operations needed by the cache.
    public void maintain() {
        cacheMap.values().forEach(maps -> maps.values().forEach(map -> map.get().cleanUp()));
    }

    public long size() {
        return cacheMap.values().stream().mapToLong(
                maps -> maps.values().stream().mapToLong(map -> map.get().size()).sum()).sum();
    }

    @Override
    public String toString() {
        StringJoiner sj = new StringJoiner("\n", "MovesInProgress detail:\n", "");
        cacheMap.entrySet().forEach(c -> c.getValue().forEach((k, v)
                -> sj.add("(" + k + ": " + v.get().asMap() + ")")));
        return sj.toString();
    }
}