FrontendsProcNode.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.proc;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.DiskUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FeDiskInfo;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

/*
 * Show current added frontends
 * SHOW PROC /frontends/
 */
public class FrontendsProcNode implements ProcNodeInterface {
    private static final Logger LOG = LogManager.getLogger(FrontendsProcNode.class);

    public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
            .add("Name").add("Host").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort")
            .add("ArrowFlightSqlPort").add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive")
            .add("ReplayedJournalId").add("LastStartTime").add("LastHeartbeat")
            .add("IsHelper").add("ErrMsg").add("Version")
            .add("CurrentConnected")
            .build();

    public static final ImmutableList<String> DISK_TITLE_NAMES = new ImmutableList.Builder<String>()
            .add("Name").add("Host").add("DirType").add("Dir").add("Filesystem")
            .add("Capacity").add("Used").add("Available").add("UseRate").add("MountOn")
            .build();

    private Env env;

    public FrontendsProcNode(Env env) {
        this.env = env;
    }

    @Override
    public ProcResult fetchResult() {
        BaseProcResult result = new BaseProcResult();
        result.setNames(TITLE_NAMES);

        List<List<String>> infos = Lists.newArrayList();

        getFrontendsInfo(env, infos);

        for (List<String> info : infos) {
            result.addRow(info);
        }

        return result;
    }

    public static void getFrontendsInfo(Env env, String detailType, List<List<String>> infos) {
        if (detailType == null) {
            getFrontendsInfo(env, infos);
        } else if (detailType.equalsIgnoreCase("disks")) {
            getFrontendsDiskInfo(env, infos);
        }
    }

    public static List<Pair<String, Integer>> getFrontendWithRpcPort(Env env, boolean includeSelf) {
        List<Pair<String, Integer>> allFe = new ArrayList<>();
        List<Frontend> frontends = env.getFrontends(null);

        String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
        if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
            selfNode = ConnectContext.get().getCurrentConnectedFEIp();
        }

        String finalSelfNode = selfNode;
        frontends.stream()
            .filter(fe -> (!fe.getHost().equals(finalSelfNode) || includeSelf))
            .map(fe -> Pair.of(fe.getHost(), fe.getRpcPort()))
                .forEach(allFe::add);
        return allFe;
    }

    public static void getFrontendsInfo(Env env, List<List<String>> infos) {
        InetSocketAddress master = null;
        try {
            master = env.getHaProtocol().getLeader();
        } catch (Exception e) {
            // this may happen when majority of FOLLOWERS are down and no MASTER right now.
            LOG.warn("failed to get leader: {}", e.getMessage());
        }

        // get all node which are joined in bdb group
        List<InetSocketAddress> allFe = env.getHaProtocol().getElectableNodes(true /* include leader */);
        allFe.addAll(env.getHaProtocol().getObserverNodes());
        List<HostInfo> helperNodes = env.getHelperNodes();

        // Because the `show frontend` stmt maybe forwarded from other FE.
        // if we only get self node from currrent catalog, the "CurrentConnected" field will always points to Msater FE.
        String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
        if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
            selfNode = ConnectContext.get().getCurrentConnectedFEIp();
        }

        List<Frontend> envFes = env.getFrontends(null /* all */);
        LOG.info("bdbje fes {}, env fes {}", allFe, envFes);
        for (Frontend fe : envFes) {
            List<String> info = new ArrayList<String>();
            info.add(fe.getNodeName());
            info.add(fe.getHost());
            info.add(Integer.toString(fe.getEditLogPort()));
            info.add(Integer.toString(Config.http_port));

            if (fe.getHost().equals(env.getSelfNode().getHost())) {
                info.add(Integer.toString(Config.query_port));
                info.add(Integer.toString(Config.rpc_port));
                info.add(Integer.toString(Config.arrow_flight_sql_port));
            } else {
                info.add(Integer.toString(fe.getQueryPort()));
                info.add(Integer.toString(fe.getRpcPort()));
                info.add(Integer.toString(fe.getArrowFlightSqlPort()));
            }

            info.add(fe.getRole().name());
            InetSocketAddress socketAddress = new InetSocketAddress(fe.getHost(), fe.getEditLogPort());
            //An ipv6 address may have different format, so we compare InetSocketAddress objects instead of IP Strings.
            //e.g.  fdbd:ff1:ce00:1c26::d8 and fdbd:ff1:ce00:1c26:0:0:d8
            info.add(String.valueOf(socketAddress.equals(master)));

            info.add(Integer.toString(env.getClusterId()));
            info.add(String.valueOf(isJoin(allFe, fe)));

            if (fe.getHost().equals(env.getSelfNode().getHost())) {
                info.add("true");
                info.add(Long.toString(env.getEditLog().getMaxJournalId()));
            } else {
                info.add(String.valueOf(fe.isAlive()));
                info.add(Long.toString(fe.getReplayedJournalId()));
            }
            info.add(TimeUtils.longToTimeString(fe.getLastStartupTime()));
            info.add(TimeUtils.longToTimeString(fe.getLastUpdateTime()));
            info.add(String.valueOf(isHelperNode(helperNodes, fe)));
            info.add(fe.getHeartbeatErrMsg());
            info.add(fe.getVersion());
            // To indicate which FE we currently connected
            info.add(fe.getHost().equals(selfNode) ? "Yes" : "No");

            infos.add(info);
        }
    }

    public static Frontend getCurrentFrontendVersion(Env env) {
        for (Frontend fe : env.getFrontends(null /* all */)) {
            if (fe.getHost().equals(env.getSelfNode().getHost())) {
                return fe;
            }
        }
        return null;
    }

    public static void getFrontendsDiskInfo(Env env, List<List<String>> infos) {
        for (Frontend fe : env.getFrontends(null /* all */)) {
            if (fe.getDiskInfos() != null) {
                for (FeDiskInfo disk : fe.getDiskInfos()) {
                    List<String> info = new ArrayList<String>();
                    info.add(fe.getNodeName());
                    info.add(fe.getHost());
                    info.add(disk.getDirType());
                    info.add(disk.getDir());
                    info.add(disk.getSpaceInfo().fileSystem);
                    info.add(DiskUtils.sizeFormat(disk.getSpaceInfo().blocks * 1024));
                    info.add(DiskUtils.sizeFormat(disk.getSpaceInfo().used * 1024));
                    info.add(DiskUtils.sizeFormat(disk.getSpaceInfo().available * 1024));
                    info.add(Integer.toString(disk.getSpaceInfo().useRate) + "%");
                    info.add(disk.getSpaceInfo().mountedOn);
                    infos.add(info);
                }
            }
        }
    }


    private static boolean isHelperNode(List<HostInfo> helperNodes, Frontend fe) {
        return helperNodes.stream().anyMatch(p -> fe.toHostInfo().isSame(p));
    }

    private static boolean isJoin(List<InetSocketAddress> allFeHosts, Frontend fe) {
        for (InetSocketAddress addr : allFeHosts) {
            if (fe.getEditLogPort() != addr.getPort()) {
                continue;
            }
            // if hostname of InetSocketAddress is ip, addr.getHostName() may be not equal to fe.getIp()
            // so we need to compare fe.getIp() with address.getHostAddress()
            InetAddress address = addr.getAddress();
            if (null == address) {
                LOG.warn("Failed to get InetAddress {}", addr);
                continue;
            }
            if (fe.getHost().equals(address.getHostAddress())) {
                return true;
            }
        }

        // Avoid calling getHostName multiple times, don't remove it
        for (InetSocketAddress addr : allFeHosts) {
            // Avoid calling getHostName multiple times, don't remove it
            if (fe.getEditLogPort() != addr.getPort()) {
                continue;
            }
            // https://bugs.openjdk.org/browse/JDK-8143378#:~:text=getHostName()%3B%20takes%20about%205,millisecond%20on%20JDK%20update%2051
            // getHostName sometime has bug, take 5s
            String host = addr.getHostName();
            if (!Strings.isNullOrEmpty(host)) {
                if (host.equals(fe.getHost())) {
                    return true;
                }
            }
        }
        return false;
    }
}