DiagnoseClusterBalanceProcDir.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.proc;

import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Env;
import org.apache.doris.clone.BackendLoadStatistic;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.LoadStatisticForTag;
import org.apache.doris.clone.RootPathLoadStatistic;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.common.Config;
import org.apache.doris.common.proc.DiagnoseProcDir.DiagnoseItem;
import org.apache.doris.common.proc.DiagnoseProcDir.DiagnoseStatus;
import org.apache.doris.common.proc.DiagnoseProcDir.SubProcDir;
import org.apache.doris.common.proc.TabletHealthProcDir.DBTabletStatistic;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.collect.Lists;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/*
 * show proc "/diagnose/cluster_balance";
 */
public class DiagnoseClusterBalanceProcDir extends SubProcDir {

    private ForkJoinPool taskPool = new ForkJoinPool();

    @Override
    public List<DiagnoseItem> getDiagnoseResult() {
        long now = System.currentTimeMillis();
        long minToMs = 60 * 1000L;

        Env env = Env.getCurrentEnv();
        TabletScheduler tabletScheduler = env.getTabletScheduler();
        List<TabletSchedCtx> pendingTablets = tabletScheduler.getPendingTablets(1);
        List<TabletSchedCtx> runningTablets = tabletScheduler.getRunningTablets(1);
        List<TabletSchedCtx> historyTablets = tabletScheduler.getHistoryTablets(10000);
        long historyLastVisitTime = historyTablets.stream()
                .mapToLong(tablet -> Math.max(tablet.getCreateTime(), tablet.getLastVisitedTime()))
                .max().orElse(-1);
        boolean schedReady = env.getFrontends(null).stream().anyMatch(
                fe -> fe.isAlive() && now >= fe.getLastStartupTime() + 1 * minToMs
                        && (fe.getRole() == FrontendNodeType.MASTER || fe.getRole() == FrontendNodeType.FOLLOWER));
        boolean schedRecent = !pendingTablets.isEmpty() || !runningTablets.isEmpty()
                || historyLastVisitTime >= now - 15 * minToMs;

        List<DiagnoseItem> items = Lists.newArrayList();
        items.add(diagnoseTabletHealth(schedReady, schedRecent));
        DiagnoseItem baseBalance = diagnoseBaseBalance(schedReady, schedRecent);
        items.add(baseBalance);
        items.add(diagnoseDiskBalance(schedReady, schedRecent, baseBalance.status == DiagnoseStatus.OK));
        items.add(diagnoseColocateRebalance(schedReady, schedRecent));
        items.add(diagnoseHistorySched(historyTablets,
                items.stream().allMatch(item -> item.status == DiagnoseStatus.OK)));

        return items;
    }

    private DiagnoseItem diagnoseTabletHealth(boolean schedReady, boolean schedRecent) {
        DiagnoseItem tabletHealth = new DiagnoseItem();
        tabletHealth.name = "Tablet Health";
        tabletHealth.status = DiagnoseStatus.OK;

        Env env = Env.getCurrentEnv();
        List<DBTabletStatistic> statistics = taskPool.submit(() ->
                env.getInternalCatalog().getDbIds().parallelStream()
                    // skip information_schema database
                    .flatMap(id -> Stream.of(id == 0 ? null : env.getInternalCatalog().getDbNullable(id)))
                    .filter(Objects::nonNull).map(DBTabletStatistic::new)
                    // sort by dbName
                    .sorted(Comparator.comparing(db -> db.db.getFullName())).collect(Collectors.toList())
        ).join();

        DBTabletStatistic total = statistics.stream().reduce(new DBTabletStatistic(), DBTabletStatistic::reduce);
        if (total.tabletNum != total.healthyNum) {
            tabletHealth.status = DiagnoseStatus.ERROR;
            tabletHealth.content = String.format("healthy tablet num %s < total tablet num %s",
                    total.healthyNum, total.tabletNum);
            tabletHealth.detailCmd = "show proc \"/cluster_health/tablet_health\";";
            boolean changeWarning = total.unrecoverableNum == 0;
            if (Config.disable_tablet_scheduler) {
                tabletHealth.suggestion = "has disable tablet balance, ensure master fe config: "
                        + "disable_tablet_scheduler = false";
            } else if (!schedReady) {
                tabletHealth.suggestion = "check all fe are ready, and then wait some minutes for "
                        + "sheduler to migrate tablets";
            } else if (schedRecent) {
                tabletHealth.suggestion = "tablet is still scheduling, run 'show proc \"/cluster_balance\"'";
            } else {
                changeWarning = false;
            }
            if (changeWarning) {
                tabletHealth.status = DiagnoseStatus.WARNING;
            }
        }

        return tabletHealth;
    }

    private DiagnoseItem diagnoseBaseBalance(boolean schedReady, boolean schedRecent) {
        SystemInfoService infoService = Env.getCurrentSystemInfo();
        Map<Tag, LoadStatisticForTag> loadStatisticMap = Env.getCurrentEnv().getTabletScheduler().getStatisticMap();

        DiagnoseItem baseBalance = new DiagnoseItem();
        baseBalance.status = DiagnoseStatus.OK;

        // check base balance
        List<Long> availableBeIds = infoService.getAllBackendIds(true).stream()
                .filter(beId -> infoService.checkBackendScheduleAvailable(beId))
                .collect(Collectors.toList());
        boolean isPartitionBal = Config.tablet_rebalancer_type.equalsIgnoreCase("partition");
        if (isPartitionBal) {
            baseBalance.name = "Partition Balance";
            List<Integer> tabletNums = availableBeIds.stream()
                    .map(beId -> infoService.getTabletNumByBackendId(beId))
                    .collect(Collectors.toList());
            int minTabletNum = tabletNums.stream().mapToInt(v -> v).min().orElse(0);
            int maxTabletNum = tabletNums.stream().mapToInt(v -> v).max().orElse(0);
            if (maxTabletNum <= Math.max(minTabletNum * Config.diagnose_balance_max_tablet_num_ratio,
                        minTabletNum + Config.diagnose_balance_max_tablet_num_diff)) {
                baseBalance.status = DiagnoseStatus.OK;
            } else {
                baseBalance.status = DiagnoseStatus.ERROR;
                baseBalance.content = String.format("tablets not balance, be %s has %s tablets, be %s has %s tablets",
                        availableBeIds.get(availableBeIds.indexOf(minTabletNum)), minTabletNum,
                        availableBeIds.get(availableBeIds.indexOf(maxTabletNum)), maxTabletNum);
                baseBalance.detailCmd = "show backends";
            }
        } else {
            baseBalance.name = "BeLoad Balance";
            baseBalance.status = DiagnoseStatus.OK;
            OUTER1:
            for (LoadStatisticForTag stat : loadStatisticMap.values()) {
                for (TStorageMedium storageMedium : TStorageMedium.values()) {
                    List<Long> lowBEs = stat.getBackendLoadStatistics().stream()
                            .filter(be -> availableBeIds.contains(be.getBeId())
                                    && be.getClazz(storageMedium) == Classification.LOW)
                            .map(BackendLoadStatistic::getBeId)
                            .collect(Collectors.toList());
                    List<Long> highBEs = stat.getBackendLoadStatistics().stream()
                            .filter(be -> availableBeIds.contains(be.getBeId())
                                    && be.getClazz(storageMedium) == Classification.HIGH)
                            .map(BackendLoadStatistic::getBeId)
                            .collect(Collectors.toList());
                    if (!lowBEs.isEmpty() || !highBEs.isEmpty()) {
                        baseBalance.status = DiagnoseStatus.ERROR;
                        baseBalance.content = String.format("backend load not balance for tag %s, storage medium %s, "
                                + "low load backends %s, high load backends %s",
                                stat.getTag(), storageMedium.name().toUpperCase(), lowBEs, highBEs);
                        baseBalance.detailCmd = String.format("show proc \"/cluster_balance/cluster_load_stat/%s/%s\"",
                                stat.getTag().toKey(), storageMedium.name().toUpperCase());
                        break OUTER1;
                    }
                }
            }
        }

        if (baseBalance.status != DiagnoseStatus.OK) {
            if (Config.disable_tablet_scheduler || Config.disable_balance) {
                baseBalance.suggestion = "has disable tablet balance, ensure master fe config: "
                        + "disable_tablet_scheduler = false, disable_balance = false";
            } else if (!schedReady) {
                baseBalance.suggestion = "check all fe are ready, and then wait some minutes for "
                        + "sheduler to migrate tablets";
                baseBalance.status = DiagnoseStatus.WARNING;
            } else if (schedRecent) {
                baseBalance.suggestion = "tablet is still scheduling, run 'show proc \"/cluster_balance\"'";
                baseBalance.status = DiagnoseStatus.WARNING;
            }
        }

        return baseBalance;
    }


    private DiagnoseItem diagnoseDiskBalance(boolean schedReady, boolean schedRecent, boolean baseBalanceOk) {
        DiagnoseItem diskBalance = new DiagnoseItem();
        diskBalance.name = "Disk Balance";
        diskBalance.status = DiagnoseStatus.OK;

        Map<Tag, LoadStatisticForTag> loadStatisticMap = Env.getCurrentEnv().getTabletScheduler().getStatisticMap();

        OUTER2:
        for (LoadStatisticForTag stat : loadStatisticMap.values()) {
            List<RootPathLoadStatistic> lowPaths = Lists.newArrayList();
            List<RootPathLoadStatistic> midPaths = Lists.newArrayList();
            List<RootPathLoadStatistic> highPaths = Lists.newArrayList();
            for (TStorageMedium storageMedium : TStorageMedium.values()) {
                for (BackendLoadStatistic beStat : stat.getBackendLoadStatistics()) {
                    lowPaths.clear();
                    midPaths.clear();
                    highPaths.clear();
                    beStat.getPathStatisticByClass(lowPaths, midPaths, highPaths, storageMedium);
                    if (!lowPaths.isEmpty() || !highPaths.isEmpty()) {
                        diskBalance.status = DiagnoseStatus.ERROR;
                        diskBalance.content = String.format("backend %s is not disk balance, low paths { %s }, "
                                + "high paths { %s }", beStat.getBeId(),
                                lowPaths.stream().map(RootPathLoadStatistic::getPath).collect(Collectors.toList()),
                                highPaths.stream().map(RootPathLoadStatistic::getPath).collect(Collectors.toList()));
                        diskBalance.detailCmd = String.format(
                                "show proc \"/cluster_balance/cluster_load_stat/%s/%s/%s\"",
                                stat.getTag().toKey(), storageMedium.name().toUpperCase(), beStat.getBeId());
                        break OUTER2;
                    }
                }
            }
        }
        if (diskBalance.status != DiagnoseStatus.OK) {
            if (Config.disable_tablet_scheduler || Config.disable_balance || Config.disable_disk_balance) {
                diskBalance.suggestion = "has disable tablet balance, ensure master fe config: "
                        + "disable_tablet_scheduler = false, disable_balance = false, disable_disk_balance = false";
            } else if (!schedReady) {
                diskBalance.suggestion = "check all fe are ready, and then wait some minutes for "
                        + "sheduler to migrate tablets";
                diskBalance.status = DiagnoseStatus.WARNING;
            } else if (schedRecent) {
                diskBalance.suggestion = "tablet is still scheduling, run 'show proc \"/cluster_balance\"'";
                diskBalance.status = DiagnoseStatus.WARNING;
            } else if (!baseBalanceOk) {
                diskBalance.suggestion = "disk balance run after all be balance, need them finished";
                diskBalance.status = DiagnoseStatus.WARNING;
            }
        }

        return diskBalance;
    }

    private DiagnoseItem diagnoseColocateRebalance(boolean schedReady, boolean schedRecent) {
        DiagnoseItem colocateBalance = new DiagnoseItem();
        colocateBalance.status = DiagnoseStatus.OK;
        colocateBalance.name = "Colocate Group Stable";
        Set<GroupId> unstableGroups = Env.getCurrentEnv().getColocateTableIndex().getUnstableGroupIds();
        if (!unstableGroups.isEmpty()) {
            colocateBalance.status = DiagnoseStatus.ERROR;
            colocateBalance.content = String.format("colocate groups are unstable: %s", unstableGroups);
            colocateBalance.detailCmd = "show proc \"/colocation_group\"";
            if (Config.disable_tablet_scheduler || Config.disable_colocate_balance) {
                colocateBalance.suggestion = "has disable tablet balance, ensure master fe config: "
                        + "disable_tablet_scheduler = false, disable_colocate_balance = false";
            } else if (!schedReady) {
                colocateBalance.suggestion = "check all fe are ready, and then wait some minutes for "
                        + "sheduler to migrate tablets";
                colocateBalance.status = DiagnoseStatus.WARNING;
            } else if (schedRecent) {
                colocateBalance.suggestion = "tablet is still scheduling, run 'show proc \"/cluster_balance\"'";
                colocateBalance.status = DiagnoseStatus.WARNING;
            }
        }

        return colocateBalance;
    }

    private DiagnoseItem diagnoseHistorySched(List<TabletSchedCtx> historyTablets, boolean ignoreErrs) {
        DiagnoseItem historySched = new DiagnoseItem();
        historySched.name = "History Tablet Sched";
        historySched.status = DiagnoseStatus.OK;

        if (!ignoreErrs) {
            long now = System.currentTimeMillis();
            List<TabletSchedCtx> failedTablets = historyTablets.stream()
                    .filter(tablet -> tablet.getLastVisitedTime() >= now - 1800 * 1000L
                            && tablet.getSchedFailedCode() != SubCode.WAITING_SLOT
                            && tablet.getSchedFailedCode() != SubCode.WAITING_DECOMMISSION
                            && tablet.getSchedFailedCode() != SubCode.DIAGNOSE_IGNORE
                            && (tablet.getState() == TabletSchedCtx.State.CANCELLED
                                    || tablet.getState() == TabletSchedCtx.State.UNEXPECTED))
                    .sorted(Comparator.comparing(TabletSchedCtx::getLastVisitedTime).reversed())
                    .limit(5).collect(Collectors.toList());
            if (!failedTablets.isEmpty()) {
                historySched.status = DiagnoseStatus.WARNING;
                historySched.content = String.format("tablet sched has failed: %s", failedTablets.stream()
                        .map(tablet -> String.format("tablet %s error: %s", tablet.getTabletId(), tablet.getErrMsg()))
                        .collect(Collectors.toList()));
                historySched.detailCmd = "show proc \"/cluster_balance/history_tablets\"";
            }
        }

        return historySched;
    }

}