TabletStatMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TTabletStat;
import org.apache.doris.thrift.TTabletStatResult;

import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/*
 * TabletStatMgr is for collecting tablet(replica) statistics from backends.
 * Each FE will collect by itself.
 */
public class TabletStatMgr extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class);

    private final ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(
            Config.tablet_stat_mgr_threads_num > 0
                    ? Config.tablet_stat_mgr_threads_num
                    : Runtime.getRuntime().availableProcessors(),
            1024, "tablet-stat-mgr", true);

    private MarkedCountDownLatch<Long, Backend> updateTabletStatsLatch = null;

    public TabletStatMgr() {
        super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000);
    }

    @Override
    protected void runAfterCatalogReady() {
        ImmutableMap<Long, Backend> backends;
        try {
            backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
        } catch (AnalysisException e) {
            LOG.warn("can't get backends info", e);
            return;
        }
        long start = System.currentTimeMillis();
        // no need to get tablet stat if backend is not alive
        List<Backend> aliveBackends = backends.values().stream().filter(Backend::isAlive)
                .collect(Collectors.toList());
        updateTabletStatsLatch = new MarkedCountDownLatch<>(aliveBackends.size());
        aliveBackends.forEach(backend -> {
            updateTabletStatsLatch.addMark(backend.getId(), backend);
            executor.submit(() -> {
                BackendService.Client client = null;
                TNetworkAddress address = null;
                boolean ok = false;
                try {
                    address = new TNetworkAddress(backend.getHost(), backend.getBePort());
                    client = ClientPool.backendPool.borrowObject(address);
                    TTabletStatResult result = client.getTabletStat();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("get tablet stat from backend: {}, num: {}", backend.getId(),
                                result.getTabletsStatsSize());
                    }
                    updateTabletStat(backend.getId(), result);
                    updateTabletStatsLatch.markedCountDown(backend.getId(), backend);
                    ok = true;
                } catch (Throwable e) {
                    updateTabletStatsLatch.markedCountDownWithStatus(backend.getId(), backend, Status.CANCELLED);
                    LOG.warn("task exec error. backend[{}]", backend.getId(), e);
                }

                try {
                    if (ok) {
                        ClientPool.backendPool.returnObject(address, client);
                    } else {
                        ClientPool.backendPool.invalidateObject(address, client);
                    }
                } catch (Throwable e) {
                    LOG.warn("client pool recyle error. backend[{}]", backend.getId(), e);
                }
            });
        });
        waitForTabletStatUpdate();

        if (LOG.isDebugEnabled()) {
            LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
                    (System.currentTimeMillis() - start));
        }

        // after update replica in all backends, update index row num
        start = System.currentTimeMillis();
        Pair<String, Long> maxTabletSize = Pair.of(/* tablet id= */null, /* byte size= */0L);
        Pair<String, Long> maxPartitionSize = Pair.of(/* partition id= */null, /* byte size= */0L);
        Pair<String, Long> maxTableSize = Pair.of(/* table id= */null, /* byte size= */0L);
        Pair<String, Long> minTabletSize = Pair.of(/* tablet id= */null, /* byte size= */Long.MAX_VALUE);
        Pair<String, Long> minPartitionSize = Pair.of(/* partition id= */null, /* byte size= */Long.MAX_VALUE);
        Pair<String, Long> minTableSize = Pair.of(/* tablet id= */null, /* byte size= */Long.MAX_VALUE);
        Long totalTableSize = 0L;
        Long tabletCount = 0L;
        Long partitionCount = 0L;
        Long tableCount = 0L;
        List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
        for (Long dbId : dbIds) {
            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }
            List<Table> tableList = db.getTables();
            for (Table table : tableList) {
                // Will process OlapTable and MTMV
                if (!table.isManagedTable()) {
                    continue;
                }
                tableCount++;
                OlapTable olapTable = (OlapTable) table;

                Long tableDataSize = 0L;
                Long tableTotalReplicaDataSize = 0L;

                Long tableTotalLocalIndexSize = 0L;
                Long tableTotalLocalSegmentSize = 0L;
                Long tableTotalRemoteIndexSize = 0L;
                Long tableTotalRemoteSegmentSize = 0L;

                Long tableRemoteDataSize = 0L;

                Long tableReplicaCount = 0L;

                Long tableRowCount = 0L;

                if (!table.readLockIfExist()) {
                    continue;
                }
                try {
                    List<Partition> allPartitions = olapTable.getAllPartitions();
                    partitionCount += allPartitions.size();
                    for (Partition partition : allPartitions) {
                        Long partitionDataSize = 0L;
                        long version = partition.getVisibleVersion();
                        for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
                            long indexRowCount = 0L;
                            boolean indexReported = true;
                            List<Tablet> tablets = index.getTablets();
                            tabletCount += tablets.size();
                            for (Tablet tablet : tablets) {

                                Long tabletDataSize = 0L;
                                Long tabletRemoteDataSize = 0L;

                                Long tabletRowCount = Long.MAX_VALUE;

                                boolean tabletReported = false;
                                for (Replica replica : tablet.getReplicas()) {
                                    LOG.debug("Table {} replica {} current version {}, report version {}",
                                            olapTable.getName(), replica.getId(),
                                            replica.getVersion(), replica.getLastReportVersion());
                                    // Replica with less row count is more accurate than the others
                                    // when replicas' version are identical. Because less row count
                                    // means this replica does more compaction than the others.
                                    if (replica.checkVersionCatchUp(version, false)
                                            && replica.getRowCount() < tabletRowCount) {
                                        // 1. If replica version and reported replica version are all equal to
                                        // PARTITION_INIT_VERSION, set tabletReported to true, which indicates this
                                        // tablet is empty for sure when previous report.
                                        // 2. If last report version is larger than PARTITION_INIT_VERSION, set
                                        // tabletReported to true as well. That is, we only guarantee all replicas of
                                        // the tablet are reported for the init version.
                                        // e.g. When replica version is 2, but last reported version is 1,
                                        // tabletReported would be false.
                                        if (replica.getVersion() == Partition.PARTITION_INIT_VERSION
                                                && replica.getLastReportVersion() == Partition.PARTITION_INIT_VERSION
                                                || replica.getLastReportVersion() > Partition.PARTITION_INIT_VERSION) {
                                            tabletReported = true;
                                        }
                                        tabletRowCount = replica.getRowCount();
                                    }

                                    if (replica.getDataSize() > tabletDataSize) {
                                        tabletDataSize = replica.getDataSize();
                                    }
                                    tableTotalReplicaDataSize += replica.getDataSize();

                                    if (replica.getRemoteDataSize() > tabletRemoteDataSize) {
                                        tabletRemoteDataSize = replica.getRemoteDataSize();
                                    }
                                    tableReplicaCount++;
                                    tableTotalLocalIndexSize += replica.getLocalInvertedIndexSize();
                                    tableTotalLocalSegmentSize += replica.getLocalSegmentSize();
                                    tableTotalRemoteIndexSize += replica.getRemoteInvertedIndexSize();
                                    tableTotalRemoteSegmentSize += replica.getRemoteSegmentSize();
                                }

                                tableDataSize += tabletDataSize;
                                partitionDataSize += tabletDataSize;
                                tableRemoteDataSize += tabletRemoteDataSize;
                                if (maxTabletSize.second <= tabletDataSize) {
                                    maxTabletSize = Pair.of("" + tablet.getId(), tabletDataSize);
                                }
                                if (minTabletSize.second >= tabletDataSize) {
                                    minTabletSize = Pair.of("" + tablet.getId(), tabletDataSize);
                                }

                                // When all BEs are down, avoid set Long.MAX_VALUE to index and table row count. Use 0.
                                if (tabletRowCount == Long.MAX_VALUE) {
                                    tabletRowCount = 0L;
                                }
                                tableRowCount += tabletRowCount;
                                indexRowCount += tabletRowCount;
                                // Only when all tablets of this index are reported, we set indexReported to true.
                                indexReported = indexReported && tabletReported;
                            } // end for tablets
                            index.setRowCountReported(indexReported);
                            index.setRowCount(indexRowCount);
                            LOG.debug("Table {} index {} all tablets reported[{}], row count {}",
                                    olapTable.getName(), olapTable.getIndexNameById(index.getId()),
                                    indexReported, indexRowCount);
                        } // end for indices
                        if (maxPartitionSize.second <= partitionDataSize) {
                            maxPartitionSize = Pair.of("" + partition.getId(), partitionDataSize);
                        }
                        if (minPartitionSize.second >= partitionDataSize) {
                            minPartitionSize = Pair.of("" + partition.getId(), partitionDataSize);
                        }
                    } // end for partitions
                    if (maxTableSize.second <= tableDataSize) {
                        maxTableSize = Pair.of("" + table.getId(), tableDataSize);
                    }
                    if (minTableSize.second >= tableDataSize) {
                        minTableSize = Pair.of("" + table.getId(), tableDataSize);
                    }

                    // this is only one thread to update table statistics, readLock is enough
                    olapTable.setStatistics(new OlapTable.Statistics(db.getName(), table.getName(),
                            tableDataSize, tableTotalReplicaDataSize,
                            tableRemoteDataSize, tableReplicaCount, tableRowCount, 0L, 0L,
                            tableTotalLocalIndexSize, tableTotalLocalSegmentSize,
                            tableTotalRemoteIndexSize, tableTotalRemoteSegmentSize));

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("finished to set row num for table: {} in database: {}",
                                 table.getName(), db.getFullName());
                    }
                } finally {
                    table.readUnlock();
                }
                totalTableSize += tableDataSize;
            }
        }
        MetricRepo.GAUGE_MAX_TABLE_SIZE_BYTES.setValue(maxTableSize.second);
        MetricRepo.GAUGE_MAX_PARTITION_SIZE_BYTES.setValue(maxPartitionSize.second);
        MetricRepo.GAUGE_MAX_TABLET_SIZE_BYTES.setValue(maxTabletSize.second);
        long minTableSizeTmp = minTableSize.second == Long.MAX_VALUE ? 0 : minTableSize.second;
        MetricRepo.GAUGE_MIN_TABLE_SIZE_BYTES.setValue(minTableSizeTmp);
        long minPartitionSizeTmp = minPartitionSize.second == Long.MAX_VALUE ? 0 : minPartitionSize.second;
        MetricRepo.GAUGE_MIN_PARTITION_SIZE_BYTES.setValue(minPartitionSizeTmp);
        long minTabletSizeTmp = minTabletSize.second == Long.MAX_VALUE ? 0 : minTabletSize.second;
        MetricRepo.GAUGE_MIN_TABLET_SIZE_BYTES.setValue(minTabletSizeTmp);
        long avgTableSize = totalTableSize / Math.max(1, tableCount); // avoid ArithmeticException: / by zero
        MetricRepo.GAUGE_AVG_TABLE_SIZE_BYTES.setValue(avgTableSize);
        long avgPartitionSize = totalTableSize / Math.max(1, partitionCount); // avoid ArithmeticException: / by zero
        MetricRepo.GAUGE_AVG_PARTITION_SIZE_BYTES.setValue(avgPartitionSize);
        long avgTabletSize = totalTableSize / Math.max(1, tabletCount); // avoid ArithmeticException: / by zero
        MetricRepo.GAUGE_AVG_TABLET_SIZE_BYTES.setValue(avgTabletSize);
        LOG.info("finished to update index row num of all databases. cost: {} ms",
                (System.currentTimeMillis() - start));
        LOG.info("Olap table num=" + tableCount + ", partition num=" + partitionCount + ", tablet num=" + tabletCount
                + ", max tablet byte size=" + maxTabletSize.second + "(tablet_id=" + maxTableSize.first + ")"
                + ", min tablet byte size=" + minTabletSizeTmp + "(tablet_id=" + minTabletSize.first + ")"
                + ", avg tablet byte size=" + avgTabletSize
                + ", max partition byte size=" + maxPartitionSize.second + "(partition_id=" + maxPartitionSize.first
                + ")"
                + ", min partition byte size=" + minPartitionSizeTmp + "(partition_id=" + minPartitionSize.first + ")"
                + ", avg partition byte size=" + avgPartitionSize
                + ", max table byte size=" + maxTableSize.second + "(table_id=" + maxTableSize.first + ")"
                + ", min table byte size=" + minTableSizeTmp + "(table_id=" + minTableSize.first + ")"
                + ", avg table byte size=" + avgTableSize);
    }

    public void waitForTabletStatUpdate() {
        boolean ok = true;
        try {
            if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) {
                LOG.info("timeout waiting {} update tablet stats tasks finish after {} seconds.",
                        updateTabletStatsLatch.getCount(), 600);
                ok = false;
            }
        } catch (InterruptedException e) {
            LOG.warn("InterruptedException, {}", this, e);
        }
        if (!ok || !updateTabletStatsLatch.getStatus().ok()) {
            List<Long> unfinishedBackendIds = updateTabletStatsLatch.getLeftMarks().stream()
                    .map(Map.Entry::getKey).collect(Collectors.toList());
            Status status = Status.TIMEOUT;
            if (!updateTabletStatsLatch.getStatus().ok()) {
                status = updateTabletStatsLatch.getStatus();
            }
            LOG.warn("Failed to update tablet stats reason: {}, unfinished backends: {}",
                    status.getErrorMsg(), unfinishedBackendIds);
            if (MetricRepo.isInit) {
                MetricRepo.COUNTER_UPDATE_TABLET_STAT_FAILED.increase(1L);
            }
        }
    }

    private void updateTabletStat(Long beId, TTabletStatResult result) {
        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
        if (result.isSetTabletStatList()) {
            for (TTabletStat stat : result.getTabletStatList()) {
                if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) {
                    Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId);
                    if (replica != null) {
                        replica.setDataSize(stat.getDataSize());
                        replica.setRemoteDataSize(stat.getRemoteDataSize());
                        replica.setLocalInvertedIndexSize(stat.getLocalIndexSize());
                        replica.setLocalSegmentSize(stat.getLocalSegmentSize());
                        replica.setRemoteInvertedIndexSize(stat.getRemoteIndexSize());
                        replica.setRemoteSegmentSize(stat.getRemoteSegmentSize());
                        replica.setRowCount(stat.getRowCount());
                        replica.setTotalVersionCount(stat.getTotalVersionCount());
                        replica.setVisibleVersionCount(stat.isSetVisibleVersionCount() ? stat.getVisibleVersionCount()
                                : stat.getTotalVersionCount());
                        // Older version BE doesn't set visible version. Set it to max for compatibility.
                        replica.setLastReportVersion(stat.isSetVisibleVersion() ? stat.getVisibleVersion()
                                : Long.MAX_VALUE);
                    }
                }
            }
        } else {
            for (Map.Entry<Long, TTabletStat> entry : result.getTabletsStats().entrySet()) {
                if (invertedIndex.getTabletMeta(entry.getKey()) == null) {
                    // the replica is obsolete, ignore it.
                    continue;
                }
                Replica replica = invertedIndex.getReplica(entry.getKey(), beId);
                if (replica == null) {
                    // replica may be deleted from catalog, ignore it.
                    continue;
                }
                // TODO(cmy) no db lock protected. I think it is ok even we get wrong row num
                replica.setDataSize(entry.getValue().getDataSize());
                replica.setRowCount(entry.getValue().getRowCount());
            }
        }
    }
}