CloudTabletStatMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest;
import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.proto.Cloud.TabletIndexPB;
import org.apache.doris.cloud.proto.Cloud.TabletStatsPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.rpc.RpcException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/*
* CloudTabletStatMgr is for collecting tablet(replica) statistics from backends.
* Each FE will collect by itself.
*/
public class CloudTabletStatMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(CloudTabletStatMgr.class);
// <(dbId, tableId) -> OlapTable.Statistics>
private volatile Map<Pair<Long, Long>, OlapTable.Statistics> cloudTableStatsMap = new HashMap<>();
private static final ExecutorService GET_TABLET_STATS_THREAD_POOL = Executors.newFixedThreadPool(
Config.max_get_tablet_stat_task_threads_num);
public CloudTabletStatMgr() {
super("cloud tablet stat mgr", Config.tablet_stat_update_interval_second * 1000);
}
@Override
protected void runAfterCatalogReady() {
LOG.info("cloud tablet stat begin");
long start = System.currentTimeMillis();
List<GetTabletStatsRequest> reqList = new ArrayList<GetTabletStatsRequest>();
GetTabletStatsRequest.Builder builder = GetTabletStatsRequest.newBuilder();
List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
for (Long dbId : dbIds) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
List<Table> tableList = db.getTables();
for (Table table : tableList) {
if (!table.isManagedTable()) {
continue;
}
table.readLock();
try {
OlapTable tbl = (OlapTable) table;
for (Partition partition : tbl.getAllPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Long tabletId : index.getTabletIdsInOrder()) {
Tablet tablet = index.getTablet(tabletId);
TabletIndexPB.Builder tabletBuilder = TabletIndexPB.newBuilder();
tabletBuilder.setDbId(dbId);
tabletBuilder.setTableId(table.getId());
tabletBuilder.setIndexId(index.getId());
tabletBuilder.setPartitionId(partition.getId());
tabletBuilder.setTabletId(tablet.getId());
builder.addTabletIdx(tabletBuilder);
if (builder.getTabletIdxCount() >= Config.get_tablet_stat_batch_size) {
reqList.add(builder.build());
builder = GetTabletStatsRequest.newBuilder();
}
}
}
} // partitions
} finally {
table.readUnlock();
}
} // tables
} // end for dbs
if (builder.getTabletIdxCount() > 0) {
reqList.add(builder.build());
}
List<Future<Void>> futures = new ArrayList<>();
for (GetTabletStatsRequest req : reqList) {
futures.add(GET_TABLET_STATS_THREAD_POOL.submit(() -> {
GetTabletStatsResponse resp = GetTabletStatsResponse.newBuilder().build();
try {
resp = getTabletStats(req);
} catch (RpcException e) {
LOG.warn("get tablet stats exception:", e);
}
if (resp.getStatus().getCode() != MetaServiceCode.OK) {
LOG.warn("get tablet stats return failed.");
}
if (LOG.isDebugEnabled()) {
int i = 0;
for (TabletIndexPB idx : req.getTabletIdxList()) {
LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id: {} size: {}",
idx.getDbId(), idx.getTableId(), idx.getIndexId(),
idx.getTabletId(), resp.getTabletStats(i++).getDataSize());
}
}
updateTabletStat(resp);
return null;
}));
}
try {
for (Future<Void> future : futures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error waiting for get tablet stats tasks to complete", e);
}
LOG.info("finished to get tablet stat of all backends. cost: {} ms",
(System.currentTimeMillis() - start));
// after update replica in all backends, update index row num
start = System.currentTimeMillis();
Pair<String, Long> maxTabletSize = Pair.of(/* tablet id= */null, /* byte size= */0L);
Pair<String, Long> maxPartitionSize = Pair.of(/* partition id= */null, /* byte size= */0L);
Pair<String, Long> maxTableSize = Pair.of(/* table id= */null, /* byte size= */0L);
Pair<String, Long> minTabletSize = Pair.of(/* tablet id= */null, /* byte size= */Long.MAX_VALUE);
Pair<String, Long> minPartitionSize = Pair.of(/* partition id= */null, /* byte size= */Long.MAX_VALUE);
Pair<String, Long> minTableSize = Pair.of(/* tablet id= */null, /* byte size= */Long.MAX_VALUE);
Long totalTableSize = 0L;
Long tabletCount = 0L;
Long partitionCount = 0L;
Long tableCount = 0L;
Map<Pair<Long, Long>, OlapTable.Statistics> newCloudTableStatsMap = new HashMap<>();
for (Long dbId : dbIds) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
List<Table> tableList = db.getTables();
for (Table table : tableList) {
if (!table.isManagedTable()) {
continue;
}
tableCount++;
OlapTable olapTable = (OlapTable) table;
Long tableDataSize = 0L;
Long tableTotalReplicaDataSize = 0L;
Long tableTotalLocalIndexSize = 0L;
Long tableTotalLocalSegmentSize = 0L;
Long tableReplicaCount = 0L;
Long tableRowCount = 0L;
Long tableRowsetCount = 0L;
Long tableSegmentCount = 0L;
if (!table.readLockIfExist()) {
continue;
}
try {
List<Partition> allPartitions = olapTable.getAllPartitions();
partitionCount += allPartitions.size();
for (Partition partition : allPartitions) {
Long partitionDataSize = 0L;
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
long indexRowCount = 0L;
List<Tablet> tablets = index.getTablets();
tabletCount += tablets.size();
for (Tablet tablet : tablets) {
long tabletDataSize = 0L;
long tabletRowsetCount = 0L;
long tabletSegmentCount = 0L;
long tabletRowCount = 0L;
long tabletIndexSize = 0L;
long tabletSegmentSize = 0L;
for (Replica replica : tablet.getReplicas()) {
if (replica.getDataSize() > tabletDataSize) {
tabletDataSize = replica.getDataSize();
tableTotalReplicaDataSize += replica.getDataSize();
}
if (replica.getRowCount() > tabletRowCount) {
tabletRowCount = replica.getRowCount();
}
if (replica.getRowsetCount() > tabletRowsetCount) {
tabletRowsetCount = replica.getRowsetCount();
}
if (replica.getSegmentCount() > tabletSegmentCount) {
tabletSegmentCount = replica.getSegmentCount();
}
if (replica.getLocalInvertedIndexSize() > tabletIndexSize) {
tabletIndexSize = replica.getLocalInvertedIndexSize();
}
if (replica.getLocalSegmentSize() > tabletSegmentSize) {
tabletSegmentSize = replica.getLocalSegmentSize();
}
tableReplicaCount++;
}
tableDataSize += tabletDataSize;
partitionDataSize += tabletDataSize;
if (maxTabletSize.second <= tabletDataSize) {
maxTabletSize = Pair.of("" + tablet.getId(), tabletDataSize);
}
if (minTabletSize.second >= tabletDataSize) {
minTabletSize = Pair.of("" + tablet.getId(), tabletDataSize);
}
tableRowCount += tabletRowCount;
indexRowCount += tabletRowCount;
tableRowsetCount += tabletRowsetCount;
tableSegmentCount += tabletSegmentCount;
tableTotalLocalIndexSize += tabletIndexSize;
tableTotalLocalSegmentSize += tabletSegmentSize;
} // end for tablets
index.setRowCountReported(true);
index.setRowCount(indexRowCount);
} // end for indices
if (maxPartitionSize.second <= partitionDataSize) {
maxPartitionSize = Pair.of("" + partition.getId(), partitionDataSize);
}
if (minPartitionSize.second >= partitionDataSize) {
minPartitionSize = Pair.of("" + partition.getId(), partitionDataSize);
}
} // end for partitions
if (maxTableSize.second <= tableDataSize) {
maxTableSize = Pair.of("" + table.getId(), tableDataSize);
}
if (minTableSize.second >= tableDataSize) {
minTableSize = Pair.of("" + table.getId(), tableDataSize);
}
// this is only one thread to update table statistics, readLock is enough
olapTable.setStatistics(new OlapTable.Statistics(db.getName(),
table.getName(), tableDataSize, tableTotalReplicaDataSize, 0L,
tableReplicaCount, tableRowCount, tableRowsetCount, tableSegmentCount,
tableTotalLocalIndexSize, tableTotalLocalSegmentSize, 0L, 0L));
LOG.debug("finished to set row num for table: {} in database: {}",
table.getName(), db.getFullName());
} finally {
table.readUnlock();
}
totalTableSize += tableDataSize;
newCloudTableStatsMap.put(Pair.of(dbId, table.getId()), new OlapTable.Statistics(db.getName(),
table.getName(), tableDataSize, tableTotalReplicaDataSize, 0L,
tableReplicaCount, tableRowCount, tableRowsetCount, tableSegmentCount, 0L, 0L, 0L, 0L));
}
}
this.cloudTableStatsMap = newCloudTableStatsMap;
MetricRepo.GAUGE_MAX_TABLE_SIZE_BYTES.setValue(maxTableSize.second);
MetricRepo.GAUGE_MAX_PARTITION_SIZE_BYTES.setValue(maxPartitionSize.second);
MetricRepo.GAUGE_MAX_TABLET_SIZE_BYTES.setValue(maxTabletSize.second);
long minTableSizeTmp = minTableSize.second == Long.MAX_VALUE ? 0 : minTableSize.second;
MetricRepo.GAUGE_MIN_TABLE_SIZE_BYTES.setValue(minTableSizeTmp);
long minPartitionSizeTmp = minPartitionSize.second == Long.MAX_VALUE ? 0 : minPartitionSize.second;
MetricRepo.GAUGE_MIN_PARTITION_SIZE_BYTES.setValue(minPartitionSizeTmp);
long minTabletSizeTmp = minTabletSize.second == Long.MAX_VALUE ? 0 : minTabletSize.second;
MetricRepo.GAUGE_MIN_TABLET_SIZE_BYTES.setValue(minTabletSizeTmp);
long avgTableSize = totalTableSize / Math.max(1, tableCount); // avoid ArithmeticException: / by zero
MetricRepo.GAUGE_AVG_TABLE_SIZE_BYTES.setValue(avgTableSize);
long avgPartitionSize = totalTableSize / Math.max(1, partitionCount); // avoid ArithmeticException: / by zero
MetricRepo.GAUGE_AVG_PARTITION_SIZE_BYTES.setValue(avgPartitionSize);
long avgTabletSize = totalTableSize / Math.max(1, tabletCount); // avoid ArithmeticException: / by zero
MetricRepo.GAUGE_AVG_TABLET_SIZE_BYTES.setValue(avgTabletSize);
LOG.info("finished to update index row num of all databases. cost: {} ms",
(System.currentTimeMillis() - start));
LOG.info("Olap table num=" + tableCount + ", partition num=" + partitionCount + ", tablet num=" + tabletCount
+ ", max tablet byte size=" + maxTabletSize.second + "(tablet_id=" + maxTableSize.first + ")"
+ ", min tablet byte size=" + minTabletSizeTmp + "(tablet_id=" + minTabletSize.first + ")"
+ ", avg tablet byte size=" + avgTabletSize
+ ", max partition byte size=" + maxPartitionSize.second + "(partition_id=" + maxPartitionSize.first
+ ")"
+ ", min partition byte size=" + minPartitionSizeTmp + "(partition_id=" + minPartitionSize.first + ")"
+ ", avg partition byte size=" + avgPartitionSize
+ ", max table byte size=" + maxTableSize.second + "(table_id=" + maxTableSize.first + ")"
+ ", min table byte size=" + minTableSizeTmp + "(table_id=" + minTableSize.first + ")"
+ ", avg table byte size=" + avgTableSize);
}
private void updateTabletStat(GetTabletStatsResponse response) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (TabletStatsPB stat : response.getTabletStatsList()) {
if (invertedIndex.getTabletMeta(stat.getIdx().getTabletId()) != null) {
List<Replica> replicas = invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId());
if (replicas == null || replicas.isEmpty() || replicas.get(0) == null) {
continue;
}
Replica replica = replicas.get(0);
replica.setDataSize(stat.getDataSize());
replica.setRowsetCount(stat.getNumRowsets());
replica.setSegmentCount(stat.getNumSegments());
replica.setRowCount(stat.getNumRows());
replica.setLocalInvertedIndexSize(stat.getIndexSize());
replica.setLocalSegmentSize(stat.getSegmentSize());
}
}
}
private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest request)
throws RpcException {
GetTabletStatsResponse response;
try {
response = MetaServiceProxy.getInstance().getTabletStats(request);
} catch (RpcException e) {
LOG.info("get tablet stat get exception:", e);
throw e;
}
return response;
}
public Map<Pair<Long, Long>, OlapTable.Statistics> getCloudTableStatsMap() {
return this.cloudTableStatsMap;
}
}