TabletChecker.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.analysis.AdminCancelRepairTableStmt;
import org.apache.doris.analysis.AdminRepairTableStmt;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MysqlCompatibleDatabase;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletHealth;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.trees.plans.commands.AdminCancelRepairTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminRepairTableCommand;
import org.apache.doris.system.SystemInfoService;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Table.Cell;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/*
 * This checker is responsible for checking all unhealthy tablets.
 * It does not responsible for any scheduler of tablet repairing or balance
 */
public class TabletChecker extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(TabletChecker.class);

    private Env env;
    private SystemInfoService infoService;
    private TabletScheduler tabletScheduler;
    private TabletSchedulerStat stat;

    HashMap<String, AtomicLong> tabletCountByStatus = new HashMap<String, AtomicLong>() {
        {
            put("total", new AtomicLong(0L));
            put("unhealthy", new AtomicLong(0L));
            put("added", new AtomicLong(0L));
            put("in_sched", new AtomicLong(0L));
            put("not_ready", new AtomicLong(0L));
            put("exceed_limit", new AtomicLong(0L));
        }
    };

    // db id -> (tbl id -> PrioPart)
    // priority of replicas of partitions in this table will be set to VERY_HIGH if not healthy
    private com.google.common.collect.Table<Long, Long, Set<PrioPart>> prios = HashBasedTable.create();

    // represent a partition which need to be repaired preferentially
    public static class PrioPart {
        public long partId;
        public long addTime;
        public long timeoutMs;

        public PrioPart(long partId, long addTime, long timeoutMs) {
            this.partId = partId;
            this.addTime = addTime;
            this.timeoutMs = timeoutMs;
        }

        public boolean isTimeout() {
            return System.currentTimeMillis() - addTime > timeoutMs;
        }

        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof PrioPart)) {
                return false;
            }
            return partId == ((PrioPart) obj).partId;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(partId).hashCode();
        }
    }

    public static class RepairTabletInfo {
        public long dbId;
        public long tblId;
        public List<Long> partIds;

        public RepairTabletInfo(Long dbId, Long tblId, List<Long> partIds) {
            this.dbId = dbId;
            this.tblId = tblId;
            this.partIds = partIds;
        }
    }

    public TabletChecker(Env env, SystemInfoService infoService, TabletScheduler tabletScheduler,
                         TabletSchedulerStat stat) {
        super("tablet checker", Config.tablet_checker_interval_ms);
        this.env = env;
        this.infoService = infoService;
        this.tabletScheduler = tabletScheduler;
        this.stat = stat;

        initMetrics();
    }

    private void initMetrics() {
        for (String status : tabletCountByStatus.keySet()) {
            GaugeMetric<Long> gauge = new GaugeMetric<Long>("tablet_status_count",
                    Metric.MetricUnit.NOUNIT, "tablet count on different status") {
                @Override
                public Long getValue() {
                    return tabletCountByStatus.get(status).get();
                }
            };
            gauge.addLabel(new MetricLabel("type", status));
            MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
        }
    }

    private void addPrios(RepairTabletInfo repairTabletInfo, long timeoutMs) {
        Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
        long currentTime = System.currentTimeMillis();
        synchronized (prios) {
            Set<PrioPart> parts = prios.get(repairTabletInfo.dbId, repairTabletInfo.tblId);
            if (parts == null) {
                parts = Sets.newHashSet();
                prios.put(repairTabletInfo.dbId, repairTabletInfo.tblId, parts);
            }

            for (long partId : repairTabletInfo.partIds) {
                PrioPart prioPart = new PrioPart(partId, currentTime, timeoutMs);
                parts.add(prioPart);
            }
        }

        // we also need to change the priority of tablets which are already in
        tabletScheduler.changeTabletsPriorityToVeryHigh(
                repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
    }

    private void removePrios(RepairTabletInfo repairTabletInfo) {
        Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
        synchronized (prios) {
            Map<Long, Set<PrioPart>> tblMap = prios.row(repairTabletInfo.dbId);
            if (tblMap == null) {
                return;
            }
            Set<PrioPart> parts = tblMap.get(repairTabletInfo.tblId);
            if (parts == null) {
                return;
            }
            for (long partId : repairTabletInfo.partIds) {
                parts.remove(new PrioPart(partId, -1, -1));
            }
            if (parts.isEmpty()) {
                tblMap.remove(repairTabletInfo.tblId);
            }
        }
    }

    /*
     * For each cycle, TabletChecker will check all OlapTable's tablet.
     * If a tablet is not healthy, a TabletInfo will be created and sent to TabletScheduler for repairing.
     */
    @Override
    public void runAfterCatalogReady() {
        int pendingNum = tabletScheduler.getPendingNum();
        int runningNum = tabletScheduler.getRunningNum();
        if (pendingNum > Config.max_scheduling_tablets
                || runningNum > Config.max_scheduling_tablets) {
            LOG.info("too many tablets are being scheduled. pending: {}, running: {}, limit: {}. skip check",
                    pendingNum, runningNum, Config.max_scheduling_tablets);
            return;
        }

        checkTablets();

        removePriosIfNecessary();

        stat.counterTabletCheckRound.incrementAndGet();
        if (LOG.isDebugEnabled()) {
            LOG.debug(stat.incrementalBrief());
        }
    }

    public static class CheckerCounter {
        public long totalTabletNum = 0;
        public long unhealthyTabletNum = 0;
        public long addToSchedulerTabletNum = 0;
        public long tabletInScheduler = 0;
        public long tabletNotReady = 0;
        public long tabletExceedLimit = 0;
    }

    private enum LoopControlStatus {
        CONTINUE,
        BREAK_OUT
    }

    private void checkTablets() {
        long start = System.currentTimeMillis();
        CheckerCounter counter = new CheckerCounter();

        // 1. Traverse partitions in "prios" first,
        // To prevent the partitions in the "prios" from being unscheduled
        // because the queue in the tablet scheduler is full
        com.google.common.collect.Table<Long, Long, Set<PrioPart>> copiedPrios;
        synchronized (prios) {
            copiedPrios = HashBasedTable.create(prios);
        }

        ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();

        OUT:
        for (long dbId : copiedPrios.rowKeySet()) {
            Database db = env.getInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }
            List<Long> aliveBeIds = infoService.getAllBackendIds(true);
            Map<Long, Set<PrioPart>> tblPartMap = copiedPrios.row(dbId);
            for (long tblId : tblPartMap.keySet()) {
                Table tbl = db.getTableNullable(tblId);
                if (tbl == null || !tbl.isManagedTable()) {
                    continue;
                }
                OlapTable olapTable = (OlapTable) tbl;
                olapTable.readLock();
                try {
                    if (colocateTableIndex.isColocateTable(olapTable.getId())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("table {} is a colocate table, skip tablet checker.", olapTable.getName());
                        }
                        continue;
                    }
                    for (Partition partition : olapTable.getAllPartitions()) {
                        LoopControlStatus st = handlePartitionTablet(db, olapTable, partition, true, aliveBeIds, start,
                                counter);
                        if (st == LoopControlStatus.BREAK_OUT) {
                            break OUT;
                        } else {
                            continue;
                        }
                    }
                } finally {
                    olapTable.readUnlock();
                }
            }
        }

        // 2. Traverse other partitions not in "prios"
        List<Long> dbIds = env.getInternalCatalog().getDbIds();
        OUT:
        for (Long dbId : dbIds) {
            Database db = env.getInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }

            if (db instanceof MysqlCompatibleDatabase) {
                continue;
            }

            List<Table> tableList = db.getTables();
            List<Long> aliveBeIds = infoService.getAllBackendIds(true);

            for (Table table : tableList) {
                if (!table.isManagedTable()) {
                    continue;
                }

                table.readLock();
                try {
                    if (colocateTableIndex.isColocateTable(table.getId())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("table {} is a colocate table, skip tablet checker.", table.getName());
                        }
                        continue;
                    }

                    OlapTable tbl = (OlapTable) table;
                    for (Partition partition : tbl.getAllPartitions()) {
                        // skip partitions in prios, because it has been checked before.
                        if (isInPrios(db.getId(), tbl.getId(), partition.getId())) {
                            continue;
                        }

                        LoopControlStatus st = handlePartitionTablet(db, tbl, partition, false, aliveBeIds, start,
                                counter);
                        if (st == LoopControlStatus.BREAK_OUT) {
                            break OUT;
                        } else {
                            continue;
                        }
                    } // partitions
                } finally {
                    table.readUnlock();
                }
            } // tables
        } // end for dbs

        long cost = System.currentTimeMillis() - start;
        stat.counterTabletCheckCostMs.addAndGet(cost);
        stat.counterTabletChecked.addAndGet(counter.totalTabletNum);
        stat.counterUnhealthyTabletNum.addAndGet(counter.unhealthyTabletNum);
        stat.counterTabletAddToBeScheduled.addAndGet(counter.addToSchedulerTabletNum);

        tabletCountByStatus.get("unhealthy").set(counter.unhealthyTabletNum);
        tabletCountByStatus.get("total").set(counter.totalTabletNum);
        tabletCountByStatus.get("added").set(counter.addToSchedulerTabletNum);
        tabletCountByStatus.get("in_sched").set(counter.tabletInScheduler);
        tabletCountByStatus.get("not_ready").set(counter.tabletNotReady);
        tabletCountByStatus.get("exceed_limit").set(counter.tabletExceedLimit);

        LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{},"
                + "cost: {} ms",
                counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum,
                counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost);
    }

    private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Partition partition, boolean isInPrios,
            List<Long> aliveBeIds, long startTime, CheckerCounter counter) {
        if (partition.getState() != PartitionState.NORMAL) {
            // when alter job is in FINISHING state, partition state will be set to NORMAL,
            // and we can schedule the tablets in it.
            return LoopControlStatus.CONTINUE;
        }
        boolean prioPartIsHealthy = true;
        boolean isUniqKeyMergeOnWrite = tbl.isUniqKeyMergeOnWrite();
        /*
         * Tablet in SHADOW index can not be repaired of balanced
         */
        for (MaterializedIndex idx : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
            for (Tablet tablet : idx.getTablets()) {
                counter.totalTabletNum++;

                if (tabletScheduler.containsTablet(tablet.getId())) {
                    counter.tabletInScheduler++;
                    continue;
                }

                TabletHealth tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(),
                        tbl.getPartitionInfo().getReplicaAllocation(partition.getId()), aliveBeIds);

                if (tabletHealth.status == TabletStatus.HEALTHY) {
                    // Only set last status check time when status is healthy.
                    tablet.setLastStatusCheckTime(startTime);
                    continue;
                } else if (tabletHealth.status == TabletStatus.UNRECOVERABLE) {
                    // This tablet is not recoverable, do not set it into tablet scheduler
                    // all UNRECOVERABLE tablet can be seen from "show proc '/statistic'"
                    counter.unhealthyTabletNum++;
                    continue;
                } else if (isInPrios) {
                    tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
                    prioPartIsHealthy = false;
                }

                counter.unhealthyTabletNum++;
                if (!tablet.readyToBeRepaired(infoService, tabletHealth.priority)) {
                    continue;
                }

                TabletSchedCtx tabletCtx = new TabletSchedCtx(
                        TabletSchedCtx.Type.REPAIR,
                        db.getId(), tbl.getId(),
                        partition.getId(), idx.getId(), tablet.getId(),
                        tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
                        System.currentTimeMillis());
                // the tablet status will be set again when being scheduled
                tabletCtx.setTabletHealth(tabletHealth);
                tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);

                AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
                if (res == AddResult.DISABLED) {
                    LOG.info("tablet scheduler return: {}. stop tablet checker", res.name());
                    return LoopControlStatus.BREAK_OUT;
                } else if (res == AddResult.ADDED) {
                    counter.addToSchedulerTabletNum++;
                } else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) {
                    counter.tabletExceedLimit++;
                }
            }
        } // indices

        if (prioPartIsHealthy && isInPrios) {
            // if all replicas in this partition are healthy, remove this partition from
            // priorities.
            LOG.info("partition is healthy, remove from prios: {}-{}-{}",
                    db.getId(), tbl.getId(), partition.getId());
            removePrios(new RepairTabletInfo(db.getId(),
                    tbl.getId(), Lists.newArrayList(partition.getId())));
        }
        return LoopControlStatus.CONTINUE;
    }

    private boolean isInPrios(long dbId, long tblId, long partId) {
        synchronized (prios) {
            if (prios.contains(dbId, tblId)) {
                return prios.get(dbId, tblId).contains(new PrioPart(partId, -1, -1));
            }
            return false;
        }
    }

    // remove partition from prios if:
    // 1. timeout
    // 2. meta not found
    private void removePriosIfNecessary() {
        com.google.common.collect.Table<Long, Long, Set<PrioPart>> copiedPrios = null;
        synchronized (prios) {
            copiedPrios = HashBasedTable.create(prios);
        }
        List<Pair<Long, Long>> deletedPrios = Lists.newArrayList();
        Iterator<Map.Entry<Long, Map<Long, Set<PrioPart>>>> iter = copiedPrios.rowMap().entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<Long, Map<Long, Set<PrioPart>>> dbEntry = iter.next();
            long dbId = dbEntry.getKey();
            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                iter.remove();
                continue;
            }

            Iterator<Map.Entry<Long, Set<PrioPart>>> jter = dbEntry.getValue().entrySet().iterator();
            while (jter.hasNext()) {
                Map.Entry<Long, Set<PrioPart>> tblEntry = jter.next();
                long tblId = tblEntry.getKey();
                OlapTable tbl = (OlapTable) db.getTableNullable(tblId);
                if (tbl == null) {
                    deletedPrios.add(Pair.of(dbId, tblId));
                    continue;
                }
                tbl.readLock();
                try {
                    Set<PrioPart> parts = tblEntry.getValue();
                    parts = parts.stream().filter(p -> (tbl.getPartition(p.partId) != null && !p.isTimeout())).collect(
                            Collectors.toSet());
                    if (parts.isEmpty()) {
                        deletedPrios.add(Pair.of(dbId, tblId));
                    }
                } finally {
                    tbl.readUnlock();
                }
            }

            if (dbEntry.getValue().isEmpty()) {
                iter.remove();
            }
        }
        for (Pair<Long, Long> prio : deletedPrios) {
            copiedPrios.remove(prio.first, prio.second);
        }
        prios = copiedPrios;
    }

    /*
     * handle ADMIN REPAIR TABLE command send by user.
     * This operation will add specified tables into 'prios', and tablets of this table will be set VERY_HIGH
     * when being scheduled.
     */
    public void repairTable(AdminRepairTableCommand command) throws DdlException {
        RepairTabletInfo repairTabletInfo = getRepairTabletInfo(
                command.getDbName(), command.getTblName(), command.getPartitions());
        addPrios(repairTabletInfo, command.getTimeoutS() * 1000);
        LOG.info("repair database: {}, table: {}, partition: {}",
                repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
    }

    public void repairTable(AdminRepairTableStmt stmt) throws DdlException {
        RepairTabletInfo repairTabletInfo = getRepairTabletInfo(
                stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
        addPrios(repairTabletInfo, stmt.getTimeoutS() * 1000);
        LOG.info("repair database: {}, table: {}, partition: {}",
                repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
    }

    /*
     * handle ADMIN CANCEL REPAIR TABLE command send by user.
     * This operation will remove the specified partitions from 'prios'
     */
    public void cancelRepairTable(AdminCancelRepairTableCommand command) throws DdlException {
        RepairTabletInfo repairTabletInfo
                = getRepairTabletInfo(command.getDbName(), command.getTblName(), command.getPartitions());
        removePrios(repairTabletInfo);
        LOG.info("cancel repair database: {}, table: {}, partition: {}",
                repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
    }

    public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws DdlException {
        RepairTabletInfo repairTabletInfo
                = getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
        removePrios(repairTabletInfo);
        LOG.info("cancel repair database: {}, table: {}, partition: {}",
                repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
    }

    public int getPrioPartitionNum() {
        int count = 0;
        synchronized (prios) {
            for (Set<PrioPart> set : prios.values()) {
                count += set.size();
            }
        }
        return count;
    }

    public List<List<String>> getPriosInfo() {
        List<List<String>> infos = Lists.newArrayList();
        synchronized (prios) {
            for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) {
                for (PrioPart part : cell.getValue()) {
                    List<String> row = Lists.newArrayList();
                    row.add(cell.getRowKey().toString());
                    row.add(cell.getColumnKey().toString());
                    row.add(String.valueOf(part.partId));
                    row.add(String.valueOf(part.timeoutMs - (System.currentTimeMillis() - part.addTime)));
                    infos.add(row);
                }
            }
        }
        return infos;
    }

    public static RepairTabletInfo getRepairTabletInfo(String dbName, String tblName,
            List<String> partitions) throws DdlException {
        Env env = Env.getCurrentEnv();
        Database db = env.getInternalCatalog().getDbOrDdlException(dbName);

        long dbId = db.getId();
        long tblId = -1;
        List<Long> partIds = Lists.newArrayList();
        OlapTable olapTable = db.getOlapTableOrDdlException(tblName);
        olapTable.readLock();
        try {
            tblId = olapTable.getId();

            if (partitions == null || partitions.isEmpty()) {
                partIds = olapTable.getPartitions().stream().map(Partition::getId).collect(Collectors.toList());
            } else {
                for (String partName : partitions) {
                    Partition partition = olapTable.getPartition(partName);
                    if (partition == null) {
                        throw new DdlException("Partition does not exist: " + partName);
                    }
                    partIds.add(partition.getId());
                }
            }
        } finally {
            olapTable.readUnlock();
        }

        Preconditions.checkState(tblId != -1);

        return new RepairTabletInfo(dbId, tblId, partIds);
    }
}