DeployManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.deploy;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.deploy.impl.LocalFileDeployManager;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

/*
 * This deploy manager is to support Kubernetes, Ambari or other system for automating deployment.
 * The deploy manager will try to get the helper node when initialize catalog.
 * When this FE is transfer to Master, it will start a polling thread to detect the node change of at most 4
 * service groups in remote deployment system:
 *
 *      electableFeServiceGroup: contains Master and Follower FE
 *      backendServiceGroup: contains Backends
 *      observerFeServiceGroup:  contains Observer FE (optional, k8s only)
 *      brokerServiceGroup: contains Broker (optional, Ambari only)
 *
 * When node changing is detected, the deploy manager will try to ADD or DROP the new or missing node.
 *
 * Current support operations:
 *
 * A. Startup
 * 1. Start 1 Frontend(FE), and automatically transfer to the single startup Master.
 * 2. Start 3 FEs, they will reach a consensus on choosing first FE in node list as startup Master.
 *
 * B. Expansion
 * 1. With 1 existing FE(Master), add 2 FEs to reach HA.
 * 2. With 1 or 3 existing FE(Master + Follower), add more FE(observer).
 * 3. With 1 or 3 existing FE(Master + Follower), add more Backends(BE).
 * 3. With 1 or 3 existing FE(Master + Follower), add more Broker.
 *
 * C. Shrink
 * 1. With 3 existing FEs, drop 2 FEs.
 * 2. With 1 or 3 existing FE(Master + Follower), drop existing FE(observer).
 * 3. With 1 or 3 existing FE(Master + Follower), drop existing BE.
 * 3. With 1 or 3 existing FE(Master + Follower), drop existing Broker.
 *
 * Environment variables:
 *
 * FE_EXIST_ENDPOINT:
 *      he existing FE(Master + Follower) before the new FE start up.
 *      The main reason of this var is to indicate whether there is already an alive Master
 *      or the consensus of who is master is needed.
 *
 * FE_INIT_NUMBER:
 *      Number of newly start up FE(Master + Follower), can only be 1 or 3.
 *
 * Only one of FE_EXIST_ENDPOINT and FE_INIT_NUMBER need to be set.
 *
 * eg:
 *
 *  1. Start 1 FE as a single Master
 *      set FE_EXIST_ENDPOINT as empty
 *      set FE_INIT_NUMBER = 1
 *
 *  2. Start 3 FE(Master + Follower)
 *      set FE_EXIST_ENDPOINT as empty
 *      set FE_INIT_NUMBER = 3
 *
 *  3. With 1 existing FE(Master), add 2 FEs to reach HA.
 *      set FE_EXIST_ENDPOINT=existing_fe_host:edit_log_port
 *      set FE_INIT_NUMBER as empty
 *
 */
public class DeployManager extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(DeployManager.class);

    public static final String ENV_FE_EXIST_ENDPOINT = "FE_EXIST_ENDPOINT";
    public static final String ENV_FE_INIT_NUMBER = "FE_INIT_NUMBER";
    // we arbitrarily set all broker name as what ENV_BROKER_NAME specified.
    public static final String ENV_BROKER_NAME = "BROKER_NAME";

    public enum NodeType {
        ELECTABLE, OBSERVER, BACKEND, BROKER, BACKEND_CN
    }

    protected Env env;

    // Host identifier -> missing counter
    // eg:
    // In k8s, when a node is down, the endpoint may be disappear immediately in service group.
    // But the k8s will start it up again soon.
    // In the gap between node's down and startup, the deploy manager may detect the missing node and
    // do the unnecessary dropping operations.
    // So we use this map to count the continuous detected down times, if the continuous down time is more
    // then MAX_MISSING_TIME, we considered this node as down permanently.
    protected Map<String, Integer> counterMap = Maps.newHashMap();
    protected Integer maxMissingTime = 5;
    //if 'true',Actively pull node information from external systems.
    //if 'false',The external system actively pushes the node change information,
    // and only needs to listen to 'nodeChangeQueue'
    protected boolean listenRequired;
    protected BlockingQueue<Event> nodeChangeQueue;
    protected Map<NodeType, NodeTypeAttr> nodeTypeAttrMap = Maps.newHashMap();
    private boolean isRunning;

    public DeployManager(Env env, long intervalMs) {
        this(env, intervalMs, false);
    }

    public DeployManager(Env env, long intervalMs, boolean listenRequired) {
        super("deployManager", intervalMs);
        this.env = env;
        this.listenRequired = listenRequired;
        this.isRunning = false;
        if (listenRequired) {
            this.maxMissingTime = 0;
            this.nodeChangeQueue = Queues.newLinkedBlockingDeque();
        }
        // init NodeTypeAttr for each NodeType,so when get NodeTypeAttr by NodeType,we assume not null
        for (NodeType nodeType : NodeType.values()) {
            nodeTypeAttrMap.put(nodeType, new NodeTypeAttr(false));
        }
    }

    // Init all environment variables.
    // Derived Class can override this method to init more private env variables,
    // but must class the parent's init at first.
    protected void initEnvVariables(String envElectableFeServiceGroup, String envObserverFeServiceGroup,
            String envBackendServiceGroup, String envBrokerServiceGroup, String envCnServiceGroup) {

        String electableFeServiceGroup = Strings.nullToEmpty(System.getenv(envElectableFeServiceGroup));
        String observerFeServiceGroup = Strings.nullToEmpty(System.getenv(envObserverFeServiceGroup));
        String backendServiceGroup = Strings.nullToEmpty(System.getenv(envBackendServiceGroup));
        String brokerServiceGroup = Strings.nullToEmpty(System.getenv(envBrokerServiceGroup));
        String cnServiceGroup = Strings.nullToEmpty(System.getenv(envCnServiceGroup));

        LOG.info("get deploy env: {}, {}, {}, {}, {}", envElectableFeServiceGroup, envObserverFeServiceGroup,
                envBackendServiceGroup, envBrokerServiceGroup, envCnServiceGroup);

        // check if we have electable service
        if (!Strings.isNullOrEmpty(electableFeServiceGroup)) {
            LOG.info("Electable service group is found");
            nodeTypeAttrMap.get(NodeType.ELECTABLE).setHasService(true);
            nodeTypeAttrMap.get(NodeType.ELECTABLE).setServiceName(electableFeServiceGroup);
        }

        // check if we have observer service
        if (!Strings.isNullOrEmpty(observerFeServiceGroup)) {
            LOG.info("Observer service group is found");
            nodeTypeAttrMap.get(NodeType.OBSERVER).setHasService(true);
            nodeTypeAttrMap.get(NodeType.OBSERVER).setServiceName(observerFeServiceGroup);
        }

        // check if we have backend service
        if (!Strings.isNullOrEmpty(backendServiceGroup)) {
            LOG.info("Backend service group is found");
            nodeTypeAttrMap.get(NodeType.BACKEND).setHasService(true);
            nodeTypeAttrMap.get(NodeType.BACKEND).setServiceName(backendServiceGroup);
        }

        // check if we have broker service
        if (!Strings.isNullOrEmpty(brokerServiceGroup)) {
            LOG.info("Broker service group is found");
            nodeTypeAttrMap.get(NodeType.BROKER).setHasService(true);
            nodeTypeAttrMap.get(NodeType.BROKER).setServiceName(brokerServiceGroup);
        }

        // check if we have cn service
        if (!Strings.isNullOrEmpty(cnServiceGroup)) {
            LOG.info("Cn service group is found");
            nodeTypeAttrMap.get(NodeType.BACKEND_CN).setHasService(true);
            nodeTypeAttrMap.get(NodeType.BACKEND_CN).setServiceName(cnServiceGroup);
        }

        LOG.info("get electableFeServiceGroup: {}, observerFeServiceGroup: {}, backendServiceGroup: {}"
                        + " brokerServiceGroup: {}, cnServiceGroup: {}",
                electableFeServiceGroup, observerFeServiceGroup, backendServiceGroup, brokerServiceGroup,
                cnServiceGroup);
    }

    public void startListener() {
        if (listenRequired) {
            startListenerInternal();
        }
    }

    public void startListenerInternal() {
        throw new NotImplementedException("startListenerInternal not implemented");
    }

    // Call init before each runOneCycle
    // Default is do nothing. Can be override in derived class
    // return false if init failed.
    protected boolean init() {
        return true;
    }

    // Get all host port pairs from specified group.
    // Must implement in derived class.
    // If encounter errors, return null
    protected List<HostInfo> getGroupHostInfos(NodeType nodeType) {
        throw new NotImplementedException("getGroupHostInfos not implemented");
    }

    protected String getBrokerName() {
        String brokerName = System.getenv(ENV_BROKER_NAME);
        if (Strings.isNullOrEmpty(brokerName)) {
            LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME);
            System.exit(-1);
        }
        return brokerName;
    }

    public List<HostInfo> getHelperNodes() {
        String existFeHosts = System.getenv(ENV_FE_EXIST_ENDPOINT);
        if (!Strings.isNullOrEmpty(existFeHosts)) {
            // Some Frontends already exist in service group.
            // We consider them as helper node
            List<HostInfo> helperNodes = Lists.newArrayList();
            String[] splittedHosts = existFeHosts.split(",");
            for (String host : splittedHosts) {
                try {
                    helperNodes.add(SystemInfoService.getHostAndPort(host));
                } catch (AnalysisException e) {
                    LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts);
                    System.exit(-1);
                }
            }

            return helperNodes;
        }

        // No Frontend exist before.
        // This should be the every first time to start up the Frontend.
        // We use the following strategy to determine which one should be the master:
        // 1. get num of FE from environment variable FE_NUM
        // 2. get electable frontend hosts from electable service group
        // 3. sort electable frontend hosts
        // 4. choose the first host as master candidate

        // 1. get num of fe
        final String numOfFeStr = System.getenv(ENV_FE_INIT_NUMBER);
        if (Strings.isNullOrEmpty(numOfFeStr)) {
            LOG.error("No init FE num is specified. will exit");
            System.exit(-1);
        }

        Integer numOfFe = -1;
        try {
            numOfFe = Integer.valueOf(numOfFeStr);
        } catch (NumberFormatException e) {
            LOG.error("Invalid format of num of fe: {}. will exit", numOfFeStr);
            System.exit(-1);
        }
        LOG.info("get init num of fe from env: {}", numOfFe);

        // 2. get electable fe host from remote
        boolean ok = true;
        List<HostInfo> feHostInfos = null;
        while (true) {
            try {
                feHostInfos = getGroupHostInfos(NodeType.ELECTABLE);
                if (feHostInfos == null) {
                    ok = false;
                } else if (feHostInfos.size() != numOfFe) {
                    LOG.error("num of fe get from remote [{}] does not equal to the expected num: {}",
                            feHostInfos, numOfFe);
                    ok = false;
                } else {
                    ok = true;
                }
            } catch (Exception e) {
                LOG.error("failed to get electable fe hosts from remote.", e);
                ok = false;
            }

            if (!ok) {
                // Sleep 5 seconds and try again
                try {
                    Thread.sleep(5000);
                    continue;
                } catch (InterruptedException e) {
                    LOG.error("get InterruptedException when sleep", e);
                    System.exit(-1);
                }
            }

            LOG.info("get electable fe host from remote: {}", feHostInfos);
            break;
        }

        // 3. sort fe host list
        Collections.sort(feHostInfos);
        LOG.info("sorted fe host list: {}", feHostInfos);

        // 4. return the first one as helper
        return Lists.newArrayList(new HostInfo(feHostInfos.get(0).getHost(),
                feHostInfos.get(0).getPort()));
    }

    @Override
    protected void runAfterCatalogReady() {
        if (Config.enable_deploy_manager.equals("disable")) {
            LOG.warn("Config enable_deploy_manager is disable. Exit deploy manager");
            exit();
            return;
        }
        // 0. init
        if (!init()) {
            return;
        }

        if (isRunning) {
            LOG.warn("Last task not finished, ignore current task.");
            return;
        }
        isRunning = true;

        if (listenRequired && processQueue()) {
            isRunning = false;
            return;
        }
        try {
            processPolling();
        } catch (Exception e) {
            LOG.warn("failed to process polling", e);
        } finally {
            isRunning = false;
        }
    }

    private void processPolling() {
        for (NodeType nodeType : NodeType.values()) {
            NodeTypeAttr nodeTypeAttr = nodeTypeAttrMap.get(nodeType);
            if (!nodeTypeAttr.hasService) {
                continue;
            }
            List<HostInfo> remoteHosts = getGroupHostInfos(nodeType);
            if (LOG.isDebugEnabled()) {
                LOG.debug("get serviceName: {},remoteHosts: {}", nodeTypeAttr.getServiceName(), remoteHosts);
            }
            process(nodeType, remoteHosts);
        }
    }

    private boolean processQueue() {
        Event event = nodeChangeQueue.poll();
        if (event == null) {
            return false;
        }
        process(event.getNodeType(), event.getHostInfos());
        return true;
    }

    private void process(NodeType nodeType, List<HostInfo> remoteHosts) {
        if (remoteHosts == null) {
            return;
        }
        if (nodeType == NodeType.ELECTABLE && remoteHosts.isEmpty()) {
            LOG.warn("electable fe service is empty, which should not happen");
            return;
        }
        List<HostInfo> localHosts = getLocalHosts(nodeType);
        inspectNodeChange(remoteHosts, localHosts, nodeType);
    }

    private List<HostInfo> getLocalHosts(NodeType nodeType) {
        switch (nodeType) {
            case ELECTABLE:
                List<Frontend> localElectableFeAddrs = env.getFrontends(FrontendNodeType.FOLLOWER);
                return this.convertFesToHostInfos(localElectableFeAddrs);
            case OBSERVER:
                List<Frontend> localObserverFeAddrs = env.getFrontends(FrontendNodeType.OBSERVER);
                return this.convertFesToHostInfos(localObserverFeAddrs);
            case BACKEND:
                List<Backend> localBackends = Env.getCurrentSystemInfo().getMixBackends();
                return this.convertBesToHostInfos(localBackends);
            case BACKEND_CN:
                List<Backend> localCns = Env.getCurrentSystemInfo().getCnBackends();
                return this.convertBesToHostInfos(localCns);
            case BROKER:
                List<FsBroker> localBrokers = env.getBrokerMgr().getBrokerListMap().get(getBrokerName());
                if (localBrokers == null) {
                    localBrokers = Lists.newArrayList();
                }
                return convertBrokersToHostInfos(localBrokers);
            default:
                break;
        }
        return null;
    }

    private boolean needDrop(boolean found, HostInfo localHostInfo) {
        if (found) {
            if (counterMap.containsKey(localHostInfo.getIdent())) {
                counterMap.remove(localHostInfo.getIdent());
            }
            return false;
        } else {
            if (maxMissingTime <= 0) {
                return true;
            }
            // Check the detected downtime
            if (!counterMap.containsKey(localHostInfo.getIdent())) {
                // First detected downtime. Add to the map and ignore
                LOG.warn("downtime node: {} detected times: 1",
                        localHostInfo);
                counterMap.put(localHostInfo.getIdent(), 1);
                return false;
            } else {
                int times = counterMap.get(localHostInfo.getIdent());
                if (times < maxMissingTime) {
                    LOG.warn("downtime node: {} detected times: {}",
                            localHostInfo, times + 1);
                    counterMap.put(localHostInfo.getIdent(), times + 1);
                    return false;
                } else {
                    // Reset the counter map and do the dropping operation
                    LOG.warn("downtime node: {} detected times: {}. drop it",
                            localHostInfo, times + 1);
                    counterMap.remove(localHostInfo.getIdent());
                    return true;
                }
            }
        }
    }

    /*
     * Inspect the node change.
     * 1. Check if there are some nodes need to be dropped.
     * 2. Check if there are some nodes need to be added.
     *
     * Return true if something changed
     */
    private void inspectNodeChange(List<HostInfo> remoteHostInfos,
            List<HostInfo> localHostInfos,
            NodeType nodeType) {

        if (LOG.isDebugEnabled()) {
            for (HostInfo hostInfo : remoteHostInfos) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("inspectNodeChange: remote host info: {}", hostInfo);
                }
            }

            for (HostInfo hostInfo : localHostInfos) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("inspectNodeChange: local host info: {}", hostInfo);
                }
            }
        }

        // 2.1 Find local node which need to be dropped.
        for (HostInfo localHostInfo : localHostInfos) {
            HostInfo foundHostInfo = getFromHostInfos(remoteHostInfos, localHostInfo);
            boolean needDrop = needDrop(foundHostInfo != null, localHostInfo);
            if (needDrop) {
                if (this instanceof LocalFileDeployManager && Config.disable_local_deploy_manager_drop_node) {
                    LOG.warn("For now, Local File Deploy Manager dose not handle shrinking operations");
                    continue;
                }
                dealDropLocal(localHostInfo, nodeType);
            }
        }

        // 2.2. Find remote node which need to be added.
        for (HostInfo remoteHostInfo : remoteHostInfos) {
            HostInfo foundHostInfo = getFromHostInfos(localHostInfos, remoteHostInfo);
            if (foundHostInfo == null) {
                dealAddRemote(remoteHostInfo, nodeType);
            }
        }
    }

    private void dealDropLocal(HostInfo localHostInfo, NodeType nodeType) {
        Integer localPort = localHostInfo.getPort();
        String localHost = localHostInfo.getHost();
        // Double check if is itself
        if (isSelf(localHostInfo)) {
            // This is itself. Shut down now.
            LOG.error("self host {} does not exist in remote hosts. master is: {}:{}. Showdown.",
                    localHostInfo, env.getMasterHost(), Config.edit_log_port);
            System.exit(-1);
        }

        // Can not find local host from remote host list,
        // which means this node should be dropped.
        try {
            switch (nodeType) {
                case ELECTABLE:
                    env.dropFrontend(FrontendNodeType.FOLLOWER, localHost, localPort);
                    break;
                case OBSERVER:
                    env.dropFrontend(FrontendNodeType.OBSERVER, localHost, localPort);
                    break;
                case BACKEND:
                case BACKEND_CN:
                    Env.getCurrentSystemInfo().dropBackend(localHost, localPort);
                    break;
                case BROKER:
                    env.getBrokerMgr().dropBrokers(getBrokerName(), Lists.newArrayList(Pair.of(localHost, localPort)));
                    break;
                default:
                    break;
            }
        } catch (DdlException e) {
            LOG.error("Failed to drop {} node: {}:{}", nodeType, localHost, localPort, e);
        }

        LOG.info("Finished to drop {} node: {}:{}", nodeType, localHost, localPort);
    }

    private void dealAddRemote(HostInfo remoteHostInfo, NodeType nodeType) {
        Integer remotePort = remoteHostInfo.getPort();
        String remoteHost = remoteHostInfo.getHost();

        try {
            switch (nodeType) {
                case ELECTABLE:
                    env.addFrontend(FrontendNodeType.FOLLOWER, remoteHost, remotePort);
                    break;
                case OBSERVER:
                    env.addFrontend(FrontendNodeType.OBSERVER, remoteHost, remotePort);
                    break;
                case BACKEND:
                case BACKEND_CN:
                    List<HostInfo> newBackends = Lists.newArrayList();
                    newBackends.add(new HostInfo(remoteHost, remotePort));
                    Env.getCurrentSystemInfo().addBackends(newBackends, false);
                    break;
                case BROKER:
                    env.getBrokerMgr().addBrokers(getBrokerName(), Lists.newArrayList(Pair.of(remoteHost, remotePort)));
                    break;
                default:
                    break;
            }
        } catch (UserException e) {
            LOG.error("Failed to add {} node: {}:{}", nodeType, remoteHost, remotePort, e);
        }

        LOG.info("Finished to add {} node: {}:{}", nodeType, remoteHost, remotePort);
    }

    // Get host port pair from pair list. Return null if not found
    // when hostName,compare hostname,otherwise compare ip
    private HostInfo getFromHostInfos(List<HostInfo> hostInfos, HostInfo hostInfo) {
        for (HostInfo h : hostInfos) {
            if (hostInfo.getHost().equals(h.getHost()) && hostInfo.getPort() == (h.getPort())) {
                return hostInfo;
            }
        }
        return null;
    }

    private List<HostInfo> convertFesToHostInfos(List<Frontend> frontends) {
        List<HostInfo> hostPortPair = Lists.newArrayList();
        for (Frontend fe : frontends) {
            hostPortPair.add(convertToHostInfo(fe));
        }
        return hostPortPair;
    }

    private List<HostInfo> convertBrokersToHostInfos(List<FsBroker> brokers) {
        List<HostInfo> hostPortPair = Lists.newArrayList();
        for (FsBroker broker : brokers) {
            hostPortPair.add(convertToHostInfo(broker));
        }
        return hostPortPair;
    }

    private List<HostInfo> convertBesToHostInfos(List<Backend> backends) {
        List<HostInfo> hostPortPair = Lists.newArrayList();
        for (Backend fe : backends) {
            hostPortPair.add(convertToHostInfo(fe));
        }
        return hostPortPair;
    }

    private HostInfo convertToHostInfo(Frontend frontend) {
        return new HostInfo(frontend.getHost(), frontend.getEditLogPort());
    }

    private HostInfo convertToHostInfo(FsBroker broker) {
        return new HostInfo(broker.host, broker.port);
    }

    private HostInfo convertToHostInfo(Backend backend) {
        return new HostInfo(backend.getHost(), backend.getHeartbeatPort());
    }

    private boolean isSelf(HostInfo hostInfo) {
        if (env.getMasterHost().equals(hostInfo.getHost()) && Config.edit_log_port == hostInfo.getPort()) {
            return true;
        }
        return false;
    }

    protected class Event {
        private NodeType nodeType;
        private List<HostInfo> hostInfos;

        public Event(NodeType nodeType, List<HostInfo> hostInfos) {
            this.nodeType = nodeType;
            this.hostInfos = hostInfos;
        }

        public NodeType getNodeType() {
            return nodeType;
        }

        public List<HostInfo> getHostInfos() {
            return hostInfos;
        }

        @Override
        public String toString() {
            return "Event{"
                    + "nodeType=" + nodeType
                    + ", hostInfos=" + hostInfos
                    + '}';
        }
    }

    protected class NodeTypeAttr {
        private boolean hasService;
        private String serviceName;
        private String subAttr;

        public NodeTypeAttr(boolean hasService) {
            this.hasService = hasService;
        }

        public boolean hasService() {
            return hasService;
        }

        public void setHasService(boolean hasService) {
            this.hasService = hasService;
        }

        public String getServiceName() {
            return serviceName;
        }

        public void setServiceName(String serviceName) {
            this.serviceName = serviceName;
        }

        public String getSubAttr() {
            return subAttr;
        }

        public void setSubAttr(String subAttr) {
            this.subAttr = subAttr;
        }
    }
}