MetricRepo.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.metric;

import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJobV2.JobType;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.monitor.jvm.JvmService;
import org.apache.doris.monitor.jvm.JvmStats;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.transaction.TransactionStatus;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;

public final class MetricRepo {
    private static final Logger LOG = LogManager.getLogger(MetricRepo.class);

    // METRIC_REGISTER is only used for histogram metrics
    public static final MetricRegistry METRIC_REGISTER = new MetricRegistry();
    public static final DorisMetricRegistry DORIS_METRIC_REGISTER = new DorisMetricRegistry();

    public static volatile boolean isInit = false;
    public static final SystemMetrics SYSTEM_METRICS = new SystemMetrics();

    public static final String TABLET_NUM = "tablet_num";
    public static final String TABLET_MAX_COMPACTION_SCORE = "tablet_max_compaction_score";
    public static final String CLOUD_TAG = "cloud";

    public static LongCounterMetric COUNTER_REQUEST_ALL;
    public static LongCounterMetric COUNTER_QUERY_ALL;
    public static LongCounterMetric COUNTER_QUERY_ERR;
    public static LongCounterMetric COUNTER_QUERY_SLOW;
    public static LongCounterMetric COUNTER_QUERY_TABLE;
    public static LongCounterMetric COUNTER_QUERY_OLAP_TABLE;
    public static LongCounterMetric COUNTER_QUERY_HIVE_TABLE;

    public static LongCounterMetric HTTP_COUNTER_COPY_INFO_UPLOAD_REQUEST;
    public static LongCounterMetric HTTP_COUNTER_COPY_INFO_UPLOAD_ERR;
    public static LongCounterMetric HTTP_COUNTER_COPY_INFO_QUERY_REQUEST;
    public static LongCounterMetric HTTP_COUNTER_COPY_INFO_QUERY_ERR;

    public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_ALL;
    public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_ERR;
    public static Histogram HISTO_QUERY_LATENCY;
    public static AutoMappedMetric<Histogram> USER_HISTO_QUERY_LATENCY;
    public static AutoMappedMetric<GaugeMetricImpl<Long>> USER_GAUGE_QUERY_INSTANCE_NUM;
    public static AutoMappedMetric<GaugeMetricImpl<Integer>> USER_GAUGE_CONNECTIONS;
    public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_INSTANCE_BEGIN;
    public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_ALL;
    public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_FAILED;
    public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_SIZE;

    public static LongCounterMetric COUNTER_CACHE_ADDED_SQL;
    public static LongCounterMetric COUNTER_CACHE_ADDED_PARTITION;
    public static LongCounterMetric COUNTER_CACHE_HIT_SQL;
    public static LongCounterMetric COUNTER_CACHE_HIT_PARTITION;

    public static LongCounterMetric COUNTER_UPDATE_TABLET_STAT_FAILED;

    public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
    public static LongCounterMetric COUNTER_EDIT_LOG_READ;
    public static LongCounterMetric COUNTER_EDIT_LOG_CURRENT;
    public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES;
    public static LongCounterMetric COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES;
    public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
    public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
    public static LongCounterMetric COUNTER_LARGE_EDIT_LOG;

    public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
    public static Histogram HISTO_JOURNAL_BATCH_SIZE;
    public static Histogram HISTO_JOURNAL_BATCH_DATA_SIZE;
    public static Histogram HISTO_HTTP_COPY_INTO_UPLOAD_LATENCY;
    public static Histogram HISTO_HTTP_COPY_INTO_QUERY_LATENCY;

    public static LongCounterMetric COUNTER_IMAGE_WRITE_SUCCESS;
    public static LongCounterMetric COUNTER_IMAGE_WRITE_FAILED;
    public static LongCounterMetric COUNTER_IMAGE_PUSH_SUCCESS;
    public static LongCounterMetric COUNTER_IMAGE_PUSH_FAILED;
    public static LongCounterMetric COUNTER_IMAGE_CLEAN_SUCCESS;
    public static LongCounterMetric COUNTER_IMAGE_CLEAN_FAILED;

    public static LongCounterMetric COUNTER_TXN_REJECT;
    public static LongCounterMetric COUNTER_TXN_BEGIN;
    public static LongCounterMetric COUNTER_TXN_FAILED;
    public static LongCounterMetric COUNTER_TXN_SUCCESS;
    public static Histogram HISTO_TXN_EXEC_LATENCY;
    public static Histogram HISTO_TXN_PUBLISH_LATENCY;
    public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_NUM;
    public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_PUBLISH_TXN_NUM;

    public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
    public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
    public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;

    public static GaugeMetric<Long> GAUGE_ROUTINE_LOAD_PROGRESS;
    public static GaugeMetric<Long> GAUGE_ROUTINE_LOAD_LAG;
    public static GaugeMetric<Long> GAUGE_ROUTINE_LOAD_ABORT_TASK_NUM;
    public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_LANTENCY;
    public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_COUNT;
    public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT;
    public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
    public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;

    public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;

    public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
    public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_LATENCY;

    // following metrics will be updated by metric calculator
    public static GaugeMetricImpl<Double> GAUGE_QUERY_PER_SECOND;
    public static GaugeMetricImpl<Double> GAUGE_REQUEST_PER_SECOND;
    public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
    public static GaugeMetricImpl<Double> GAUGE_QUERY_SLOW_RATE;
    public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;

    public static Histogram HISTO_COMMIT_AND_PUBLISH_LATENCY;

    public static Histogram HISTO_GET_DELETE_BITMAP_UPDATE_LOCK_LATENCY;
    public static Histogram HISTO_GET_COMMIT_LOCK_LATENCY;
    public static Histogram HISTO_CALCULATE_DELETE_BITMAP_LATENCY;
    public static Histogram HISTO_COMMIT_TO_MS_LATENCY;

    // Catlaog/Database/Table num
    public static GaugeMetric<Integer> GAUGE_CATALOG_NUM;
    public static GaugeMetric<Integer> GAUGE_INTERNAL_DATABASE_NUM;
    public static GaugeMetric<Integer> GAUGE_INTERNAL_TABLE_NUM;
    // Table/Partition/Tablet DataSize
    public static GaugeMetricImpl<Long> GAUGE_MAX_TABLE_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_MAX_PARTITION_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_MIN_TABLE_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_MIN_PARTITION_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_MIN_TABLET_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_AVG_TABLE_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_AVG_PARTITION_SIZE_BYTES;
    public static GaugeMetricImpl<Long> GAUGE_AVG_TABLET_SIZE_BYTES;

    // Agent task
    public static LongCounterMetric COUNTER_AGENT_TASK_REQUEST_TOTAL;
    public static AutoMappedMetric<LongCounterMetric> COUNTER_AGENT_TASK_TOTAL;
    public static AutoMappedMetric<LongCounterMetric> COUNTER_AGENT_TASK_RESEND_TOTAL;

    private static Map<Pair<EtlJobType, JobState>, Long> loadJobNum = Maps.newHashMap();

    private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1,
            "metric-timer-pool", true);
    private static MetricCalculator metricCalculator = new MetricCalculator();

    // init() should only be called after catalog is contructed.
    public static synchronized void init() {
        if (isInit) {
            return;
        }

        // version
        GaugeMetric<Long> feVersion = new GaugeMetric<Long>("version", MetricUnit.NOUNIT, "") {
            @Override
            public Long getValue() {
                try {
                    return Long.parseLong("" + Version.DORIS_BUILD_VERSION_MAJOR + "0"
                                            + Version.DORIS_BUILD_VERSION_MINOR + "0"
                                            + Version.DORIS_BUILD_VERSION_PATCH
                                            + (Version.DORIS_BUILD_VERSION_HOTFIX > 0
                                                ? ("0" + Version.DORIS_BUILD_VERSION_HOTFIX)
                                                : ""));
                } catch (Throwable t) {
                    LOG.warn("failed to init version metrics", t);
                    return 0L;
                }
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(feVersion);

        // load jobs
        for (EtlJobType jobType : EtlJobType.values()) {
            if (jobType == EtlJobType.UNKNOWN) {
                continue;
            }
            for (JobState state : JobState.values()) {
                GaugeMetric<Long> gauge = new GaugeMetric<Long>("job", MetricUnit.NOUNIT, "job statistics") {
                    @Override
                    public Long getValue() {
                        if (!Env.getCurrentEnv().isMaster()) {
                            return 0L;
                        }
                        return MetricRepo.getLoadJobNum(jobType, state);
                    }
                };
                gauge.addLabel(new MetricLabel("job", "load")).addLabel(new MetricLabel("type", jobType.name()))
                        .addLabel(new MetricLabel("state", state.name()));
                DORIS_METRIC_REGISTER.addMetrics(gauge);
            }
        }

        initRoutineLoadJobMetrics();

        // running alter job
        Alter alter = Env.getCurrentEnv().getAlterInstance();
        for (JobType jobType : JobType.values()) {
            if (jobType != JobType.SCHEMA_CHANGE && jobType != JobType.ROLLUP) {
                continue;
            }

            GaugeMetric<Long> gauge = new GaugeMetric<Long>("job", MetricUnit.NOUNIT, "job statistics") {
                @Override
                public Long getValue() {
                    if (!Env.getCurrentEnv().isMaster()) {
                        return 0L;
                    }
                    if (jobType == JobType.SCHEMA_CHANGE) {
                        return alter.getSchemaChangeHandler()
                                .getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING);
                    } else {
                        return alter.getMaterializedViewHandler().getAlterJobV2Num(
                                org.apache.doris.alter.AlterJobV2.JobState.RUNNING);
                    }
                }
            };
            gauge.addLabel(new MetricLabel("job", "alter")).addLabel(new MetricLabel("type", jobType.name()))
                    .addLabel(new MetricLabel("state", "running"));
            DORIS_METRIC_REGISTER.addMetrics(gauge);
        }

        // capacity
        generateBackendsTabletMetrics();

        // connections
        USER_GAUGE_CONNECTIONS = addLabeledMetrics("user", () ->
                new GaugeMetricImpl<>("connection_total", MetricUnit.CONNECTIONS,
                        "total connections", 0));
        GaugeMetric<Integer> connections = new GaugeMetric<Integer>("connection_total",
                MetricUnit.CONNECTIONS, "total connections") {
            @Override
            public Integer getValue() {
                ExecuteEnv.getInstance().getScheduler().getUserConnectionMap()
                        .forEach((k, v) -> USER_GAUGE_CONNECTIONS.getOrAdd(k).setValue(v.get()));
                return ExecuteEnv.getInstance().getScheduler().getConnectionNum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(connections);

        // journal id
        GaugeMetric<Long> maxJournalId = new GaugeMetric<Long>("max_journal_id", MetricUnit.NOUNIT,
                "max journal id of this frontends") {
            @Override
            public Long getValue() {
                EditLog editLog = Env.getCurrentEnv().getEditLog();
                if (editLog == null) {
                    return -1L;
                }
                return editLog.getMaxJournalId();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(maxJournalId);

        // scheduled tablet num
        GaugeMetric<Long> scheduledTabletNum = new GaugeMetric<Long>("scheduled_tablet_num", MetricUnit.NOUNIT,
                "number of tablets being scheduled") {
            @Override
            public Long getValue() {
                if (!Env.getCurrentEnv().isMaster()) {
                    return 0L;
                }
                return (long) Env.getCurrentEnv().getTabletScheduler().getTotalNum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(scheduledTabletNum);

        // txn status
        for (TransactionStatus status : TransactionStatus.values()) {
            GaugeMetric<Long> gauge = new GaugeMetric<Long>("txn_status", MetricUnit.NOUNIT, "txn statistics") {
                @Override
                public Long getValue() {
                    if (!Env.getCurrentEnv().isMaster()) {
                        return 0L;
                    }
                    return Env.getCurrentGlobalTransactionMgr().getTxnNumByStatus(status);
                }
            };
            gauge.addLabel(new MetricLabel("type", status.name().toLowerCase()));
            DORIS_METRIC_REGISTER.addMetrics(gauge);
        }

        // qps, rps and error rate
        // these metrics should be set an init value, in case that metric calculator is not running
        GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", MetricUnit.NOUNIT, "query per second", 0.0);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_QUERY_PER_SECOND);
        GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", MetricUnit.NOUNIT, "request per second", 0.0);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_REQUEST_PER_SECOND);
        GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", MetricUnit.NOUNIT, "query error rate", 0.0);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_QUERY_ERR_RATE);
        GAUGE_QUERY_SLOW_RATE = new GaugeMetricImpl<>("query_slow_rate", MetricUnit.NOUNIT, "query slow rate", 0.0);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_QUERY_SLOW_RATE);
        GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetricImpl<>("max_tablet_compaction_score", MetricUnit.NOUNIT,
                "max tablet compaction score of all backends", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE);

        // query
        COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", MetricUnit.REQUESTS, "total request");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_REQUEST_ALL);
        COUNTER_QUERY_ALL = new LongCounterMetric("query_total", MetricUnit.REQUESTS, "total query");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ALL);
        COUNTER_QUERY_ERR = new LongCounterMetric("query_err", MetricUnit.REQUESTS, "total error query");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ERR);
        COUNTER_QUERY_SLOW = new LongCounterMetric("query_slow", MetricUnit.REQUESTS, "total slow query");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_SLOW);
        COUNTER_QUERY_TABLE = new LongCounterMetric("query_table", MetricUnit.REQUESTS, "total query from table");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_TABLE);
        COUNTER_QUERY_OLAP_TABLE = new LongCounterMetric("query_olap_table", MetricUnit.REQUESTS,
                "total query from olap table");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_OLAP_TABLE);
        COUNTER_QUERY_HIVE_TABLE = new LongCounterMetric("query_hive_table", MetricUnit.REQUESTS,
                "total query from hive table");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_HIVE_TABLE);
        USER_COUNTER_QUERY_ALL = new AutoMappedMetric<>(name -> {
            LongCounterMetric userCountQueryAll  = new LongCounterMetric("query_total", MetricUnit.REQUESTS,
                    "total query for single user");
            userCountQueryAll.addLabel(new MetricLabel("user", name));
            DORIS_METRIC_REGISTER.addMetrics(userCountQueryAll);
            return userCountQueryAll;
        });
        USER_COUNTER_QUERY_ERR = new AutoMappedMetric<>(name -> {
            LongCounterMetric userCountQueryErr  = new LongCounterMetric("query_err", MetricUnit.REQUESTS,
                    "total error query for single user");
            userCountQueryErr.addLabel(new MetricLabel("user", name));
            DORIS_METRIC_REGISTER.addMetrics(userCountQueryErr);
            return userCountQueryErr;
        });
        HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(
                MetricRegistry.name("query", "latency", "ms"));
        USER_HISTO_QUERY_LATENCY = new AutoMappedMetric<>(name -> {
            String metricName = MetricRegistry.name("query", "latency", "ms", "user=" + name);
            return METRIC_REGISTER.histogram(metricName);
        });
        USER_COUNTER_QUERY_INSTANCE_BEGIN = addLabeledMetrics("user", () ->
                new LongCounterMetric("query_instance_begin", MetricUnit.NOUNIT,
                "number of query instance begin"));
        USER_GAUGE_QUERY_INSTANCE_NUM = addLabeledMetrics("user", () ->
                new GaugeMetricImpl<>("query_instance_num", MetricUnit.NOUNIT,
                "number of running query instances of current user", 0L));
        GaugeMetric<Long> queryInstanceNum = new GaugeMetric<Long>("query_instance_num",
                MetricUnit.NOUNIT, "number of query instances of all current users") {
            @Override
            public Long getValue() {
                QeProcessorImpl qe = ((QeProcessorImpl) QeProcessorImpl.INSTANCE);
                long totalInstanceNum = 0;
                for (Map.Entry<String, Integer> e : qe.getInstancesNumPerUser().entrySet()) {
                    long value = e.getValue() == null ? 0L : e.getValue().longValue();
                    totalInstanceNum += value;
                    USER_GAUGE_QUERY_INSTANCE_NUM.getOrAdd(e.getKey()).setValue(value);
                }
                return totalInstanceNum;
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(queryInstanceNum);
        BE_COUNTER_QUERY_RPC_ALL = addLabeledMetrics("be", () ->
            new LongCounterMetric("query_rpc_total", MetricUnit.NOUNIT, ""));
        BE_COUNTER_QUERY_RPC_FAILED = addLabeledMetrics("be", () ->
            new LongCounterMetric("query_rpc_failed", MetricUnit.NOUNIT, ""));
        BE_COUNTER_QUERY_RPC_SIZE = addLabeledMetrics("be", () ->
            new LongCounterMetric("query_rpc_size", MetricUnit.BYTES, ""));

        // cache
        COUNTER_CACHE_ADDED_SQL = new LongCounterMetric("cache_added", MetricUnit.REQUESTS,
                "Number of SQL mode cache added");
        COUNTER_CACHE_ADDED_SQL.addLabel(new MetricLabel("type", "sql"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_CACHE_ADDED_SQL);
        COUNTER_CACHE_ADDED_PARTITION = new LongCounterMetric("cache_added", MetricUnit.REQUESTS,
                "Number of Partition mode cache added");
        COUNTER_CACHE_ADDED_PARTITION.addLabel(new MetricLabel("type", "partition"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_CACHE_ADDED_PARTITION);
        COUNTER_CACHE_HIT_SQL = new LongCounterMetric("cache_hit", MetricUnit.REQUESTS,
                "total hits query by sql model");
        COUNTER_CACHE_HIT_SQL.addLabel(new MetricLabel("type", "sql"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_CACHE_HIT_SQL);
        COUNTER_CACHE_HIT_PARTITION = new LongCounterMetric("cache_hit", MetricUnit.REQUESTS,
                "total hits query by partition model");
        COUNTER_CACHE_HIT_PARTITION.addLabel(new MetricLabel("type", "partition"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_CACHE_HIT_PARTITION);

        // edit log
        COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log", MetricUnit.OPERATIONS,
                "counter of edit log write into bdbje");
        COUNTER_EDIT_LOG_WRITE.addLabel(new MetricLabel("type", "write"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_WRITE);
        COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log", MetricUnit.OPERATIONS,
                "counter of edit log read from bdbje");
        COUNTER_EDIT_LOG_READ.addLabel(new MetricLabel("type", "read"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_READ);
        COUNTER_EDIT_LOG_CURRENT = new LongCounterMetric("edit_log", MetricUnit.OPERATIONS,
                "counter of current edit log in bdbje");
        COUNTER_EDIT_LOG_CURRENT.addLabel(new MetricLabel("type", "current"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CURRENT);
        COUNTER_EDIT_LOG_SIZE_BYTES = new LongCounterMetric("edit_log", MetricUnit.BYTES,
                "size of accumulated edit log");
        COUNTER_EDIT_LOG_SIZE_BYTES.addLabel(new MetricLabel("type", "accumulated_bytes"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_SIZE_BYTES);
        COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES = new LongCounterMetric("edit_log", MetricUnit.BYTES,
                "size of current edit log");
        COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.addLabel(new MetricLabel("type", "current_bytes"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES);

        COUNTER_LARGE_EDIT_LOG = new LongCounterMetric("edit_log", MetricUnit.OPERATIONS,
                "counter of large edit log write into bdbje");
        COUNTER_LARGE_EDIT_LOG.addLabel(new MetricLabel("type", "large_write"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_LARGE_EDIT_LOG);

        HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
                MetricRegistry.name("editlog", "write", "latency", "ms"));
        HISTO_JOURNAL_BATCH_SIZE = METRIC_REGISTER.histogram(
                MetricRegistry.name("journal", "write", "batch_size"));
        HISTO_JOURNAL_BATCH_DATA_SIZE = METRIC_REGISTER.histogram(
                MetricRegistry.name("journal", "write", "batch_data_size"));

        // edit log clean
        COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
            "counter of edit log succeed in cleaning");
        COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS);
        COUNTER_EDIT_LOG_CLEAN_FAILED = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
            "counter of edit log failed to clean");
        COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED);

        // image generate
        COUNTER_IMAGE_WRITE_SUCCESS = new LongCounterMetric("image_write", MetricUnit.OPERATIONS,
                "counter of image succeed in write");
        COUNTER_IMAGE_WRITE_SUCCESS.addLabel(new MetricLabel("type", "success"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_WRITE_SUCCESS);
        COUNTER_IMAGE_WRITE_FAILED = new LongCounterMetric("image_write", MetricUnit.OPERATIONS,
                "counter of image failed to write");
        COUNTER_IMAGE_WRITE_FAILED.addLabel(new MetricLabel("type", "failed"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_WRITE_FAILED);

        COUNTER_IMAGE_PUSH_SUCCESS = new LongCounterMetric("image_push", MetricUnit.OPERATIONS,
                "counter of image succeeded in pushing to other frontends");
        COUNTER_IMAGE_PUSH_SUCCESS.addLabel(new MetricLabel("type", "success"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_PUSH_SUCCESS);
        COUNTER_IMAGE_PUSH_FAILED = new LongCounterMetric("image_push", MetricUnit.OPERATIONS,
                "counter of image failed to other frontends");
        COUNTER_IMAGE_PUSH_FAILED.addLabel(new MetricLabel("type", "failed"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_PUSH_FAILED);

        // image clean
        COUNTER_IMAGE_CLEAN_SUCCESS = new LongCounterMetric("image_clean", MetricUnit.OPERATIONS,
                "counter of image succeeded in cleaning");
        COUNTER_IMAGE_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_CLEAN_SUCCESS);
        COUNTER_IMAGE_CLEAN_FAILED = new LongCounterMetric("image_clean", MetricUnit.OPERATIONS,
                "counter of image failed to clean");
        COUNTER_IMAGE_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_CLEAN_FAILED);

        // txn
        COUNTER_TXN_REJECT = new LongCounterMetric("txn_counter", MetricUnit.REQUESTS,
                "counter of rejected transactions");
        COUNTER_TXN_REJECT.addLabel(new MetricLabel("type", "reject"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_REJECT);
        COUNTER_TXN_BEGIN = new LongCounterMetric("txn_counter", MetricUnit.REQUESTS,
                "counter of beginning transactions");
        COUNTER_TXN_BEGIN.addLabel(new MetricLabel("type", "begin"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_BEGIN);
        COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_counter", MetricUnit.REQUESTS,
                "counter of success transactions");
        COUNTER_TXN_SUCCESS.addLabel(new MetricLabel("type", "success"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_SUCCESS);
        COUNTER_TXN_FAILED = new LongCounterMetric("txn_counter", MetricUnit.REQUESTS,
                "counter of failed transactions");
        COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED);
        COUNTER_UPDATE_TABLET_STAT_FAILED = new LongCounterMetric("update_tablet_stat_failed", MetricUnit.REQUESTS,
            "counter of failed to update tablet stat");
        COUNTER_UPDATE_TABLET_STAT_FAILED.addLabel(new MetricLabel("type", "failed"));
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_UPDATE_TABLET_STAT_FAILED);
        HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram(
            MetricRegistry.name("txn", "exec", "latency", "ms"));
        HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram(
            MetricRegistry.name("txn", "publish", "latency", "ms"));
        GaugeMetric<Long> txnNum = new GaugeMetric<Long>("txn_num", MetricUnit.NOUNIT,
                "number of running transactions") {
            @Override
            public Long getValue() {
                return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnNum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(txnNum);
        DB_GAUGE_TXN_NUM = addLabeledMetrics("db", () ->
                new GaugeMetricImpl<>("txn_num", MetricUnit.NOUNIT, "number of running transactions", 0L));
        GaugeMetric<Long> publishTxnNum = new GaugeMetric<Long>("publish_txn_num", MetricUnit.NOUNIT,
                "number of publish transactions") {
            @Override
            public Long getValue() {
                return Env.getCurrentGlobalTransactionMgr().getAllPublishTxnNum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(publishTxnNum);
        DB_GAUGE_PUBLISH_TXN_NUM = addLabeledMetrics("db",
                () -> new GaugeMetricImpl<>("publish_txn_num", MetricUnit.NOUNIT,
                "number of publish transactions", 0L));
        COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", MetricUnit.ROWS,
                "total rows of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ROWS);
        COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new LongCounterMetric("routine_load_receive_bytes", MetricUnit.BYTES,
                "total received bytes of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_RECEIVED_BYTES);
        COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
                "total error rows of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
        COUNTER_ROUTINE_LOAD_GET_META_LANTENCY = new LongCounterMetric("routine_load_get_meta_latency",
                MetricUnit.MILLISECONDS, "get meta lantency of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_LANTENCY);
        COUNTER_ROUTINE_LOAD_GET_META_COUNT = new LongCounterMetric("routine_load_get_meta_count", MetricUnit.NOUNIT,
                "get meta count of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_COUNT);
        COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT = new LongCounterMetric("routine_load_get_meta_fail_count",
                MetricUnit.NOUNIT, "get meta fail count of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT);
        COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME = new LongCounterMetric("routine_load_task_execute_time",
                MetricUnit.MILLISECONDS, "task execute time of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME);
        COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT = new LongCounterMetric("routine_load_task_execute_count",
                MetricUnit.MILLISECONDS, "task execute count of routine load");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT);

        COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
                "total hit sql block rule query");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_HIT_SQL_BLOCK_RULE);

        THRIFT_COUNTER_RPC_ALL = addLabeledMetrics("method", () ->
                new LongCounterMetric("thrift_rpc_total", MetricUnit.NOUNIT, ""));
        THRIFT_COUNTER_RPC_LATENCY = addLabeledMetrics("method", () ->
                new LongCounterMetric("thrift_rpc_latency_ms", MetricUnit.MILLISECONDS, ""));

        // copy into
        HTTP_COUNTER_COPY_INFO_UPLOAD_REQUEST = new LongCounterMetric("http_copy_into_upload_request_total",
                MetricUnit.REQUESTS, "http copy into upload total request");
        DORIS_METRIC_REGISTER.addMetrics(HTTP_COUNTER_COPY_INFO_UPLOAD_REQUEST);
        HTTP_COUNTER_COPY_INFO_UPLOAD_ERR = new LongCounterMetric("http_copy_into_upload_err_total",
                MetricUnit.REQUESTS, "http copy into upload err request");
        DORIS_METRIC_REGISTER.addMetrics(HTTP_COUNTER_COPY_INFO_UPLOAD_ERR);
        HTTP_COUNTER_COPY_INFO_QUERY_REQUEST = new LongCounterMetric("http_copy_into_query_request_total",
                MetricUnit.REQUESTS, "http copy into total query request");
        DORIS_METRIC_REGISTER.addMetrics(HTTP_COUNTER_COPY_INFO_QUERY_REQUEST);
        HTTP_COUNTER_COPY_INFO_QUERY_ERR = new LongCounterMetric("http_copy_into_upload_err_total",
                MetricUnit.REQUESTS, "http copy into err query request");
        DORIS_METRIC_REGISTER.addMetrics(HTTP_COUNTER_COPY_INFO_QUERY_ERR);
        HISTO_HTTP_COPY_INTO_UPLOAD_LATENCY = METRIC_REGISTER.histogram(
            MetricRegistry.name("http_copy_into_upload", "latency", "ms"));
        HISTO_HTTP_COPY_INTO_QUERY_LATENCY = METRIC_REGISTER.histogram(
            MetricRegistry.name("http_copy_into_query", "latency", "ms"));

        HISTO_COMMIT_AND_PUBLISH_LATENCY = METRIC_REGISTER.histogram(
                MetricRegistry.name("txn_commit_and_publish", "latency", "ms"));

        GaugeMetric<Integer> commitQueueLength = new GaugeMetric<Integer>("commit_queue_length",
                MetricUnit.NOUNIT, "commit queue length") {
            @Override
            public Integer getValue() {
                return Env.getCurrentEnv().getGlobalTransactionMgr().getQueueLength();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(commitQueueLength);

        HISTO_GET_DELETE_BITMAP_UPDATE_LOCK_LATENCY = METRIC_REGISTER.histogram(
                MetricRegistry.name("get_delete_bitmap_update_lock", "latency", "ms"));
        HISTO_GET_COMMIT_LOCK_LATENCY = METRIC_REGISTER.histogram(
                MetricRegistry.name("get_commit_lock", "latency", "ms"));
        HISTO_CALCULATE_DELETE_BITMAP_LATENCY = METRIC_REGISTER.histogram(
                MetricRegistry.name("calculate_delete_bitmap", "latency", "ms"));
        HISTO_COMMIT_TO_MS_LATENCY = METRIC_REGISTER.histogram(
                MetricRegistry.name("commit_to_ms", "latency", "ms"));

        GAUGE_CATALOG_NUM = new GaugeMetric<Integer>("catalog_num",
                MetricUnit.NOUNIT, "total catalog num") {
            @Override
            public Integer getValue() {
                return Env.getCurrentEnv().getCatalogMgr().getCatalogNum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_CATALOG_NUM);

        GAUGE_INTERNAL_DATABASE_NUM = new GaugeMetric<Integer>("internal_database_num",
                MetricUnit.NOUNIT, "total internal database num") {
            @Override
            public Integer getValue() {
                return Env.getCurrentEnv().getCatalogMgr().getInternalCatalog().getDbNum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_INTERNAL_DATABASE_NUM);

        GAUGE_INTERNAL_TABLE_NUM = new GaugeMetric<Integer>("internal_table_num",
                MetricUnit.NOUNIT, "total internal table num") {
            @Override
            public Integer getValue() {
                return Env.getCurrentEnv().getCatalogMgr().getInternalCatalog().getAllDbs().stream()
                        .map(d -> (Database) d).map(Database::getTableNum).reduce(0, Integer::sum);
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_INTERNAL_TABLE_NUM);

        GAUGE_MAX_TABLE_SIZE_BYTES = new GaugeMetricImpl<>("max_table_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_MAX_TABLE_SIZE_BYTES);

        GAUGE_MAX_PARTITION_SIZE_BYTES = new GaugeMetricImpl<>("max_partition_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_MAX_PARTITION_SIZE_BYTES);

        GAUGE_MAX_TABLET_SIZE_BYTES = new GaugeMetricImpl<>("max_tablet_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_MAX_TABLET_SIZE_BYTES);

        GAUGE_MIN_TABLE_SIZE_BYTES = new GaugeMetricImpl<>("min_table_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_MIN_TABLE_SIZE_BYTES);

        GAUGE_MIN_PARTITION_SIZE_BYTES = new GaugeMetricImpl<>("min_partition_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_MIN_PARTITION_SIZE_BYTES);

        GAUGE_MIN_TABLET_SIZE_BYTES = new GaugeMetricImpl<>("min_tablet_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_MIN_TABLET_SIZE_BYTES);

        GAUGE_AVG_TABLE_SIZE_BYTES = new GaugeMetricImpl<>("avg_table_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_AVG_TABLE_SIZE_BYTES);

        GAUGE_AVG_PARTITION_SIZE_BYTES = new GaugeMetricImpl<>("avg_partition_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_AVG_PARTITION_SIZE_BYTES);

        GAUGE_AVG_TABLET_SIZE_BYTES = new GaugeMetricImpl<>("avg_tablet_size_bytes", MetricUnit.BYTES, "", 0L);
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_AVG_TABLET_SIZE_BYTES);

        COUNTER_AGENT_TASK_REQUEST_TOTAL = new LongCounterMetric("agent_task_request_total", MetricUnit.NOUNIT,
                "total agent batch task request send to BE");
        DORIS_METRIC_REGISTER.addMetrics(COUNTER_AGENT_TASK_REQUEST_TOTAL);
        COUNTER_AGENT_TASK_TOTAL = addLabeledMetrics("task", () ->
                new LongCounterMetric("agent_task_total", MetricUnit.NOUNIT, "total agent task"));
        COUNTER_AGENT_TASK_RESEND_TOTAL = addLabeledMetrics("task", () ->
                new LongCounterMetric("agent_task_resend_total", MetricUnit.NOUNIT, "total agent task resend"));

        // init system metrics
        initSystemMetrics();
        CloudMetrics.init();

        updateMetrics();
        isInit = true;

        if (Config.enable_metric_calculator) {
            metricTimer.scheduleAtFixedRate(metricCalculator, 0, 15 * 1000L, TimeUnit.MILLISECONDS);
        }
    }

    private static void initRoutineLoadJobMetrics() {
        //  routine load jobs
        RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager();
        for (RoutineLoadJob.JobState jobState : RoutineLoadJob.JobState.values()) {
            if (jobState == RoutineLoadJob.JobState.PAUSED) {
                addRoutineLoadJobStateGaugeMetric(routineLoadManager, jobState, "USER_PAUSED",
                        job -> job.getPauseReason() != null
                            && job.getPauseReason().getCode() == InternalErrorCode.MANUAL_PAUSE_ERR);
                addRoutineLoadJobStateGaugeMetric(routineLoadManager, jobState, "ABNORMAL_PAUSED",
                        job -> job.getPauseReason() != null
                            && job.getPauseReason().getCode() != InternalErrorCode.MANUAL_PAUSE_ERR);
            }
            addRoutineLoadJobStateGaugeMetric(routineLoadManager, jobState, jobState.name(), job -> true);
        }
        GAUGE_ROUTINE_LOAD_PROGRESS = new GaugeMetric<Long>("routine_load_progress",
                MetricUnit.NOUNIT, "total routine load progress") {
            @Override
            public Long getValue() {
                if (!Env.getCurrentEnv().isMaster()) {
                    return 0L;
                }
                return routineLoadManager
                        .getActiveRoutineLoadJobs().stream()
                        .mapToLong(RoutineLoadJob::totalProgress)
                        .sum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_PROGRESS);
        GAUGE_ROUTINE_LOAD_LAG = new GaugeMetric<Long>("routine_load_lag",
                MetricUnit.NOUNIT, "total routine load lag") {
            @Override
            public Long getValue() {
                if (!Env.getCurrentEnv().isMaster()) {
                    return 0L;
                }
                return routineLoadManager
                        .getActiveRoutineLoadJobs().stream()
                        .mapToLong(RoutineLoadJob::totalLag)
                        .sum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_LAG);
        GAUGE_ROUTINE_LOAD_ABORT_TASK_NUM = new GaugeMetric<Long>("routine_load_abort_task_num",
                MetricUnit.NOUNIT, "total number of aborted tasks in active routine load jobs") {
            @Override
            public Long getValue() {
                if (!Env.getCurrentEnv().isMaster()) {
                    return 0L;
                }
                return routineLoadManager
                        .getActiveRoutineLoadJobs().stream()
                        .mapToLong(job -> job.getRoutineLoadStatistic().abortedTaskNum)
                        .sum();
            }
        };
        DORIS_METRIC_REGISTER.addMetrics(GAUGE_ROUTINE_LOAD_ABORT_TASK_NUM);
    }

    private static void addRoutineLoadJobStateGaugeMetric(RoutineLoadManager routineLoadManager,
                                            RoutineLoadJob.JobState jobState,
                                            String stateLabel, Predicate<RoutineLoadJob> filter) {
        GaugeMetric<Long> gauge = new GaugeMetric<Long>("job", MetricUnit.NOUNIT, "routine load job statistics") {
            @Override
            public Long getValue() {
                if (!Env.getCurrentEnv().isMaster()) {
                    return 0L;
                }
                return routineLoadManager
                        .getRoutineLoadJobByState(Collections.singleton(jobState))
                        .stream()
                        .filter(filter)
                        .count();
            }
        };
        gauge.addLabel(new MetricLabel("job", "load"))
                .addLabel(new MetricLabel("type", "ROUTINE_LOAD"))
                .addLabel(new MetricLabel("state", stateLabel));
        DORIS_METRIC_REGISTER.addMetrics(gauge);
    }

    private static void initSystemMetrics() {
        // TCP retransSegs
        GaugeMetric<Long> tcpRetransSegs = (GaugeMetric<Long>) new GaugeMetric<Long>(
                "snmp", MetricUnit.NOUNIT, "All TCP packets retransmitted") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.tcpRetransSegs;
            }
        };
        tcpRetransSegs.addLabel(new MetricLabel("name", "tcp_retrans_segs"));
        DORIS_METRIC_REGISTER.addSystemMetrics(tcpRetransSegs);

        // TCP inErrs
        GaugeMetric<Long> tpcInErrs = (GaugeMetric<Long>) new GaugeMetric<Long>(
                "snmp", MetricUnit.NOUNIT, "The number of all problematic TCP packets received") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.tcpInErrs;
            }
        };
        tpcInErrs.addLabel(new MetricLabel("name", "tcp_in_errs"));
        DORIS_METRIC_REGISTER.addSystemMetrics(tpcInErrs);

        // TCP inSegs
        GaugeMetric<Long> tpcInSegs = (GaugeMetric<Long>) new GaugeMetric<Long>(
                "snmp", MetricUnit.NOUNIT, "The number of all TCP packets received") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.tcpInSegs;
            }
        };
        tpcInSegs.addLabel(new MetricLabel("name", "tcp_in_segs"));
        DORIS_METRIC_REGISTER.addSystemMetrics(tpcInSegs);

        // TCP outSegs
        GaugeMetric<Long> tpcOutSegs = (GaugeMetric<Long>) new GaugeMetric<Long>(
                "snmp", MetricUnit.NOUNIT, "The number of all TCP packets send with RST") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.tcpOutSegs;
            }
        };
        tpcOutSegs.addLabel(new MetricLabel("name", "tcp_out_segs"));
        DORIS_METRIC_REGISTER.addSystemMetrics(tpcOutSegs);

        // Memory Total
        GaugeMetric<Long> memTotal = (GaugeMetric<Long>) new GaugeMetric<Long>(
                "meminfo", MetricUnit.BYTES, "Total usable memory") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.memTotal;
            }
        };
        memTotal.addLabel(new MetricLabel("name", "memory_total"));
        DORIS_METRIC_REGISTER.addSystemMetrics(memTotal);

        // Memory Free
        GaugeMetric<Long> memFree = (GaugeMetric<Long>) new GaugeMetric<Long>(
                "meminfo", MetricUnit.BYTES, "The amount of physical memory not used by the system") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.memFree;
            }
        };
        memFree.addLabel(new MetricLabel("name", "memory_free"));
        DORIS_METRIC_REGISTER.addSystemMetrics(memFree);

        // Memory Total
        GaugeMetric<Long> memAvailable = (GaugeMetric<Long>) new GaugeMetric<Long>("meminfo", MetricUnit.BYTES,
                "An estimate of how much memory is available for starting new applications, without swapping") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.memAvailable;
            }
        };
        memAvailable.addLabel(new MetricLabel("name", "memory_available"));
        DORIS_METRIC_REGISTER.addSystemMetrics(memAvailable);

        // Buffers
        GaugeMetric<Long> buffers = (GaugeMetric<Long>) new GaugeMetric<Long>("meminfo", MetricUnit.BYTES,
                "Memory in buffer cache, so relatively temporary storage for raw disk blocks") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.buffers;
            }
        };
        buffers.addLabel(new MetricLabel("name", "buffers"));
        DORIS_METRIC_REGISTER.addSystemMetrics(buffers);

        // Cached
        GaugeMetric<Long> cached = (GaugeMetric<Long>) new GaugeMetric<Long>(
                "meminfo", MetricUnit.BYTES, "Memory in the pagecache (Diskcache and Shared Memory)") {
            @Override
            public Long getValue() {
                return SYSTEM_METRICS.cached;
            }
        };
        cached.addLabel(new MetricLabel("name", "cached"));
        DORIS_METRIC_REGISTER.addSystemMetrics(cached);
    }

    // to generate the metrics related to tablets of each backends
    // this metric is reentrant, so that we can add or remove metric along with the backend add or remove
    // at runtime.
    public static void generateBackendsTabletMetrics() {
        // remove all previous 'tablet' metric
        DORIS_METRIC_REGISTER.removeMetrics(TABLET_NUM);
        DORIS_METRIC_REGISTER.removeMetrics(TABLET_MAX_COMPACTION_SCORE);

        SystemInfoService infoService = Env.getCurrentSystemInfo();

        for (Long beId : infoService.getAllBackendIds(false)) {
            Backend be = infoService.getBackend(beId);
            if (be == null) {
                continue;
            }

            // tablet number of each backends
            GaugeMetric<Long> tabletNum = new GaugeMetric<Long>(TABLET_NUM, MetricUnit.NOUNIT, "tablet number") {
                @Override
                public Long getValue() {
                    if (!Env.getCurrentEnv().isMaster()) {
                        return 0L;
                    }
                    return (long) infoService.getTabletNumByBackendId(beId);
                }
            };
            tabletNum.addLabel(new MetricLabel("backend",
                    NetUtils.getHostPortInAccessibleFormat(be.getHost(), be.getHeartbeatPort())));
            DORIS_METRIC_REGISTER.addMetrics(tabletNum);

            // max compaction score of tablets on each backends
            GaugeMetric<Long> tabletMaxCompactionScore = new GaugeMetric<Long>(TABLET_MAX_COMPACTION_SCORE,
                    MetricUnit.NOUNIT, "tablet max compaction score") {
                @Override
                public Long getValue() {
                    if (!Env.getCurrentEnv().isMaster()) {
                        return 0L;
                    }
                    return be.getTabletMaxCompactionScore();
                }
            };
            tabletMaxCompactionScore.addLabel(new MetricLabel("backend",
                    NetUtils.getHostPortInAccessibleFormat(be.getHost(), be.getHeartbeatPort())));
            DORIS_METRIC_REGISTER.addMetrics(tabletMaxCompactionScore);

        } // end for backends
    }

    public static synchronized String getMetric(MetricVisitor visitor) {
        if (!isInit) {
            return "";
        }

        // update the metrics first
        updateMetrics();

        // update load job metrics
        updateLoadJobMetrics();

        // jvm
        JvmService jvmService = new JvmService();
        JvmStats jvmStats = jvmService.stats();
        visitor.visitJvm(jvmStats);

        // doris metrics and system metrics.
        DORIS_METRIC_REGISTER.accept(visitor);

        // histogram
        SortedMap<String, Histogram> histograms = METRIC_REGISTER.getHistograms();
        for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
            visitor.visitHistogram(MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue());
        }

        visitor.visitNodeInfo();

        visitor.visitCloudTableStats();

        visitor.visitWorkloadGroup();

        return visitor.finish();
    }

    public static <M extends Metric<?>> AutoMappedMetric<M> addLabeledMetrics(String label, Supplier<M> metric) {
        return new AutoMappedMetric<>(value -> {
            M m = metric.get();
            m.addLabel(new MetricLabel(label, value));
            MetricRepo.DORIS_METRIC_REGISTER.addMetrics(m);
            return m;
        });
    }

    // update some metrics to make a ready to be visited
    private static void updateMetrics() {
        SYSTEM_METRICS.update();
    }

    public static synchronized List<Metric> getMetricsByName(String name) {
        return DORIS_METRIC_REGISTER.getMetricsByName(name);
    }

    private static void updateLoadJobMetrics() {
        LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
        MetricRepo.loadJobNum = loadManager.getLoadJobNum();
    }

    private static long getLoadJobNum(EtlJobType jobType, JobState jobState) {
        return MetricRepo.loadJobNum.getOrDefault(Pair.of(jobType, jobState), 0L);
    }

    public static void registerCloudMetrics(String clusterId, String clusterName) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)
                || Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        List<MetricLabel> labels = new ArrayList<>();
        labels.add(new MetricLabel("cluster_id", clusterId));
        labels.add(new MetricLabel("cluster_name", clusterName));

        LongCounterMetric requestAllCounter = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId);
        requestAllCounter.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(requestAllCounter);

        LongCounterMetric queryAllCounter = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId);
        queryAllCounter.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(queryAllCounter);

        LongCounterMetric queryErrCounter = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId);
        queryErrCounter.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(queryErrCounter);

        GaugeMetricImpl<Double> requestPerSecondGauge = CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE
                .getOrAdd(clusterId);
        requestPerSecondGauge.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(requestPerSecondGauge);

        GaugeMetricImpl<Double> queryPerSecondGauge = CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.getOrAdd(clusterId);
        queryPerSecondGauge.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(queryPerSecondGauge);

        GaugeMetricImpl<Double> queryErrRateGauge = CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId);
        queryErrRateGauge.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(queryErrRateGauge);

        String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName;
        CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key);
    }

    public static void increaseClusterRequestAll(String clusterName) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)) {
            return;
        }
        String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                .getCloudClusterNameToId().get(clusterName);
        if (Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        LongCounterMetric counter = CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId);
        List<MetricLabel> labels = new ArrayList<>();
        counter.increase(1L);
        labels.add(new MetricLabel("cluster_id", clusterId));
        labels.add(new MetricLabel("cluster_name", clusterName));
        counter.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
    }

    public static void increaseClusterQueryAll(String clusterName) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)) {
            return;
        }
        String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                .getCloudClusterNameToId().get(clusterName);
        if (Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        LongCounterMetric counter = CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId);
        List<MetricLabel> labels = new ArrayList<>();
        counter.increase(1L);
        labels.add(new MetricLabel("cluster_id", clusterId));
        labels.add(new MetricLabel("cluster_name", clusterName));
        counter.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
    }

    public static void increaseClusterQueryErr(String clusterName) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)) {
            return;
        }
        String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                .getCloudClusterNameToId().get(clusterName);
        if (Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        LongCounterMetric counter = CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId);
        List<MetricLabel> labels = new ArrayList<>();
        counter.increase(1L);
        labels.add(new MetricLabel("cluster_id", clusterId));
        labels.add(new MetricLabel("cluster_name", clusterName));
        counter.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
    }

    public static void updateClusterRequestPerSecond(String clusterId, double value, List<MetricLabel> labels) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        GaugeMetricImpl<Double> gauge = CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE.getOrAdd(clusterId);
        gauge.setValue(value);
        gauge.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
    }

    public static void updateClusterQueryPerSecond(String clusterId, double value, List<MetricLabel> labels) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        GaugeMetricImpl<Double> gauge = CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.getOrAdd(clusterId);
        gauge.setValue(value);
        gauge.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
    }

    public static void updateClusterQueryErrRate(String clusterId, double value, List<MetricLabel> labels) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        GaugeMetricImpl<Double> gauge = CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId);
        gauge.setValue(value);
        gauge.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
    }

    public static void updateClusterBackendAlive(String clusterName, String clusterId, String ipAddress,
            boolean alive) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)
                || Strings.isNullOrEmpty(clusterId) || Strings.isNullOrEmpty(ipAddress)) {
            return;
        }
        String key = clusterId + "_" + ipAddress;
        GaugeMetricImpl<Integer> metric = CloudMetrics.CLUSTER_BACKEND_ALIVE.getOrAdd(key);
        metric.setValue(alive ? 1 : 0);
        List<MetricLabel> labels = new ArrayList<>();
        labels.add(new MetricLabel("cluster_id", clusterId));
        labels.add(new MetricLabel("cluster_name", clusterName));
        labels.add(new MetricLabel("address", ipAddress));
        metric.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(metric);
    }

    public static void updateClusterBackendAliveTotal(String clusterName, String clusterId, int aliveNum) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)
                || Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        GaugeMetricImpl<Integer> gauge = CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.getOrAdd(clusterId);
        gauge.setValue(aliveNum);
        List<MetricLabel> labels = new ArrayList<>();
        labels.add(new MetricLabel("cluster_id", clusterId));
        labels.add(new MetricLabel("cluster_name", clusterName));
        gauge.setLabels(labels);
        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
    }

    public static void updateClusterQueryLatency(String clusterName, long elapseMs) {
        if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName)) {
            return;
        }
        String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                .getCloudClusterNameToId().get(clusterName);
        if (Strings.isNullOrEmpty(clusterId)) {
            return;
        }
        String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName;
        CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key).update(elapseMs);
    }
}