SystemMetrics.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.metric;

import org.apache.doris.common.FeConstants;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;

/**
 * Save system metrics such as CPU, MEM, IO, Networks.
 * TODO: Add them gradually
 */
public class SystemMetrics {
    private static final Logger LOG = LogManager.getLogger(SystemMetrics.class);

    // NOTICE: The following 2 tcp metrics is got from /proc/net/snmp
    // So they can only be got on Linux system.
    // All TCP packets retransmitted
    protected long tcpRetransSegs = 0;
    // The number of all problematic TCP packets received
    protected long tcpInErrs = 0;
    // All received TCP packets
    protected long tcpInSegs = 0;
    // All send TCP packets with RST mark
    protected long tcpOutSegs = 0;
    // Total usable memory
    protected long memTotal = 0;
    // The amount of physical memory not used by the system
    protected long memFree = 0;
    // An estimate of how much memory is available for starting new applications, without swapping
    protected long memAvailable = 0;
    // Memory in buffer cache, so relatively temporary storage for raw disk blocks
    protected long buffers = 0;
    // Memory in the pagecache (Diskcache and Shared Memory)
    protected long cached = 0;

    public synchronized void update() {
        updateSnmpMetrics();
        updateMemoryMetrics();
    }

    private void updateSnmpMetrics() {
        String procFile = "/proc/net/snmp";
        if (FeConstants.runningUnitTest) {
            procFile = getClass().getClassLoader().getResource("data/net_snmp_normal").getFile();
        }
        try (FileReader fileReader = new FileReader(procFile);
                BufferedReader br = new BufferedReader(fileReader)) {
            String line = null;
            boolean found = false;
            while ((line = br.readLine()) != null) {
                if (line.startsWith("Tcp: ")) {
                    found = true;
                    break;
                }
            }
            if (!found) {
                throw new Exception("can not find tcp metrics");
            }

            // parse the header of TCP
            String[] headers = line.split(" ");
            Map<String, Integer> headerMap = Maps.newHashMap();
            int pos = 0;
            for (int i = 0; i < headers.length; i++) {
                headerMap.put(headers[i], pos++);
            }

            // read the metrics of TCP
            if ((line = br.readLine()) == null) {
                throw new Exception("failed to read metrics of TCP");
            }

            // eg: Tcp: 1 200 120000 -1 38920626 10487279 105581903 300009 305
            // 18079291213 15411998945 11808180 22905 4174570 0
            String[] parts = line.split(" ");
            if (parts.length != headerMap.size()) {
                throw new Exception("invalid tcp metrics: " + line + ". header size: " + headerMap.size());
            }

            tcpRetransSegs = Long.valueOf(parts[headerMap.get("RetransSegs")]);
            tcpInErrs = Long.valueOf(parts[headerMap.get("InErrs")]);
            tcpInSegs = Long.valueOf(parts[headerMap.get("InSegs")]);
            tcpOutSegs = Long.valueOf(parts[headerMap.get("OutSegs")]);

        } catch (Exception e) {
            LOG.warn("failed to get /proc/net/snmp: ", e.getMessage());
        }
    }

    private void updateMemoryMetrics() {
        String procFile = "/proc/meminfo";
        String[] memoryMetrics = {"MemTotal", "MemFree", "MemAvailable", "Buffers", "Cached"};
        Map<String, Long> memInfoMap = Maps.newHashMap();

        try (FileReader fileReader = new FileReader(procFile);
                BufferedReader br = new BufferedReader(fileReader)) {
            String[] parts;
            String line = null;
            while ((line = br.readLine()) != null) {
                for (String memoryMetric : memoryMetrics) {
                    if (!memInfoMap.containsKey(memoryMetric) && line.startsWith(memoryMetric)) {
                        parts = line.split("\\s+");
                        if (parts.length != 3) {
                            throw new Exception("invalid memory metrics: " + line);
                        } else {
                            memInfoMap.put(memoryMetric, new Long(parts[1]) * 1024);
                            break;
                        }
                    }
                }
                if (memInfoMap.size() == memoryMetrics.length) {
                    break;
                }
            }
            // if can not get metrics from /proc/meminfo, we will set -1 as default value
            memTotal = memInfoMap.getOrDefault("MemTotal", -1L);
            memFree = memInfoMap.getOrDefault("MemFree", -1L);
            memAvailable = memInfoMap.getOrDefault("MemAvailable", -1L);
            buffers = memInfoMap.getOrDefault("Buffers", -1L);
            cached = memInfoMap.getOrDefault("Cached", -1L);
        } catch (Exception e) {
            LOG.warn("failed to get /proc/meminfo: ", e.getMessage());
        }
    }

}