CooldownConfHandler.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cooldown;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.util.MasterDaemon;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

public class CooldownConfHandler extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(CooldownConfHandler.class);

    // TODO(plat1ko): better to use `Condition`?
    private static final long INTERVAL_MS = 5000; // 5s
    private static final int UPDATE_BATCH_SIZE = 512;

    private final Map<Long, CooldownConf> cooldownConfToUpdate = Maps.newConcurrentMap();

    public CooldownConfHandler() {
        super("CooldownConfHandler", INTERVAL_MS);
    }

    public void addCooldownConfToUpdate(List<CooldownConf> cooldownConfs) {
        cooldownConfs.forEach(conf -> cooldownConfToUpdate.put(conf.getTabletId(), conf));
    }

    @Override
    protected void runAfterCatalogReady() {
        if (cooldownConfToUpdate.isEmpty()) {
            return;
        }
        List<CooldownConf> cooldownConfList = cooldownConfToUpdate.values().stream().collect(Collectors.toList());
        for (int start = 0; start < cooldownConfList.size(); start += UPDATE_BATCH_SIZE) {
            updateCooldownConf(
                    cooldownConfList.subList(start, Math.min(start + UPDATE_BATCH_SIZE, cooldownConfList.size())));
        }
    }

    private void updateCooldownConf(List<CooldownConf> confToUpdate) {
        ArrayList<CooldownConf> updatedConf = new ArrayList<>();
        updatedConf.ensureCapacity(confToUpdate.size());

        Map<Long, Tablet> tabletMap = new HashMap<>(); // cache tablet

        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
        for (CooldownConf conf : confToUpdate) {
            // choose cooldown replica from alive replicas
            List<Replica> replicas = invertedIndex.getReplicas(conf.getTabletId()).stream().filter(r -> r.isAlive())
                    .collect(Collectors.toList());
            if (replicas.isEmpty()) {
                continue;
            }
            Random rand = new SecureRandom();
            int index = rand.nextInt(replicas.size());
            conf.setCooldownReplicaId(replicas.get(index).getId());
            // find TabletMeta to get cooldown term
            Tablet tablet = getTablet(conf);
            if (tablet == null || tablet.getCooldownConf().second != conf.cooldownTerm) {
                // If tablet.cooldownTerm != conf.cooldownTerm, means cooldown conf of this tablet has been updated,
                // should skip this update.
                continue;
            }
            ++conf.cooldownTerm;
            updatedConf.add(conf);
            tabletMap.put(conf.tabletId, tablet);
        }

        // write editlog
        CooldownConfList list = new CooldownConfList(updatedConf);
        Env.getCurrentEnv().getEditLog().logUpdateCooldownConf(list);

        // update Tablet
        for (CooldownConf conf : updatedConf) {
            Tablet tablet = tabletMap.get(conf.tabletId);
            tablet.setCooldownConf(conf.cooldownReplicaId, conf.cooldownTerm);
            LOG.info("update cooldown conf. tabletId={} cooldownReplicaId={} cooldownTerm={}", conf.tabletId,
                    conf.cooldownReplicaId, conf.cooldownTerm);
        }

        // update finish, remove from map
        confToUpdate.forEach(conf -> cooldownConfToUpdate.remove(conf.getTabletId()));

        // TODO(plat1ko): push CooldownConf to BE?
    }

    private static Tablet getTablet(CooldownConf conf) {
        try {
            OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(conf.dbId)
                    .getTable(conf.tableId)
                    .get();
            table.readLock();
            try {
                return table.getPartition(conf.partitionId).getIndex(conf.indexId).getTablet(conf.tabletId);
            } finally {
                table.readUnlock();
            }
        } catch (RuntimeException e) {
            if (Env.getCurrentRecycleBin().isRecyclePartition(conf.dbId, conf.tableId, conf.partitionId)) {
                LOG.debug("failed to get tablet, it's in catalog recycle bin. tabletId={}", conf.tabletId);
            } else {
                LOG.warn("failed to get tablet. tabletId={}", conf.tabletId);
            }
            return null;
        }
    }

    public static void replayUpdateCooldownConf(CooldownConfList cooldownConfList) {
        cooldownConfList.getCooldownConf().forEach(conf -> {
            Tablet tablet = getTablet(conf);
            if (tablet != null) {
                tablet.setCooldownConf(conf.cooldownReplicaId, conf.cooldownTerm);
            }
        });
    }
}