K8sDeployManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.deploy.impl;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import jline.internal.Log;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
public class K8sDeployManager extends DeployManager {
    private static final Logger LOG = LogManager.getLogger(K8sDeployManager.class);
    public static final String ENV_APP_NAMESPACE = "APP_NAMESPACE";
    public static final String ENV_DOMAIN_LTD = "DOMAIN_LTD";
    public static final String DEFAULT_APP_NAMESPACE = "default";
    public static final String DEFAULT_DOMAIN_LTD = "svc.cluster.local";
    // each SERVICE (FE/BE/OBSERVER/BROKER) represents a module of Doris, such as Frontends, Backends, ...
    // and each service has a name in k8s.
    public static final String ENV_FE_SERVICE = "FE_SERVICE";
    public static final String ENV_FE_OBSERVER_SERVICE = "FE_OBSERVER_SERVICE";
    public static final String ENV_BE_SERVICE = "BE_SERVICE";
    public static final String ENV_BROKER_SERVICE = "BROKER_SERVICE";
    public static final String ENV_CN_SERVICE = "CN_SERVICE";
    public static final String ENV_FE_STATEFULSET = "FE_STATEFULSET";
    public static final String ENV_FE_OBSERVER_STATEFULSET = "FE_OBSERVER_STATEFULSET";
    public static final String ENV_BE_STATEFULSET = "BE_STATEFULSET";
    public static final String ENV_BROKER_STATEFULSET = "BROKER_STATEFULSET";
    public static final String ENV_CN_STATEFULSET = "CN_STATEFULSET";
    public static final String FE_PORT = "edit-log-port"; // k8s only support -, not _
    public static final String BE_PORT = "heartbeat-port";
    public static final String BROKER_PORT = "broker-port";
    // corresponding to the environment variable ENV_APP_NAMESPACE.
    // App represents a Palo cluster in K8s, and has a namespace, and default namespace is 'default'
    private String appNamespace;
    private String domainLTD;
    private KubernetesClient client = null;
    private Watch statefulSetWatch = null;
    // =======for test only==========
    public static final String K8S_CA_CERT_FILE = "cce-ca.pem";
    public static final String K8S_CLIENT_CERT_FILE = "cce-admin.pem";
    public static final String K8S_CLIENT_KEY_FILE = "cce-admin-key.pem";
    public static final String TEST_MASTER_URL = "https://127.0.0.1:1111/";
    public static final String TEST_NAMESPACE = "default";
    public static final String TEST_SERVICENAME = "palo-fe";
    // =======for test only==========
    public K8sDeployManager(Env env, long intervalMs) {
        // if enable fqdn,we wait for k8s to actively push the statefulset change
        super(env, intervalMs, Config.enable_fqdn_mode);
        initEnvVariables(ENV_FE_SERVICE, ENV_FE_OBSERVER_SERVICE, ENV_BE_SERVICE, ENV_BROKER_SERVICE, ENV_CN_SERVICE);
    }
    @Override
    protected void initEnvVariables(String envElectableFeServiceGroup, String envObserverFeServiceGroup,
            String envBackendServiceGroup, String envBrokerServiceGroup, String envCnServiceGroup) {
        super.initEnvVariables(envElectableFeServiceGroup, envObserverFeServiceGroup, envBackendServiceGroup,
                envBrokerServiceGroup, envCnServiceGroup);
        // namespace
        appNamespace = Strings.nullToEmpty(System.getenv(ENV_APP_NAMESPACE));
        if (Strings.isNullOrEmpty(appNamespace)) {
            appNamespace = DEFAULT_APP_NAMESPACE;
        }
        LOG.info("use namespace: {}", appNamespace);
        domainLTD = Strings.nullToEmpty(System.getenv(ENV_DOMAIN_LTD));
        if (Strings.isNullOrEmpty(domainLTD)) {
            domainLTD = DEFAULT_DOMAIN_LTD;
        }
        LOG.info("use domainLTD: {}", domainLTD);
        if (Config.enable_fqdn_mode) {
            //Fill NodeTypeAttr.subAttr1 with statefulName
            //If serviceName is configured, the corresponding statefulSetName must be configured
            for (NodeType nodeType : NodeType.values()) {
                NodeTypeAttr nodeTypeAttr = nodeTypeAttrMap.get(nodeType);
                if (nodeTypeAttr.hasService()) {
                    String statefulSetEnvName = getStatefulSetEnvName(nodeType);
                    Log.info("Env name of: {} is: {}", nodeType.name(), statefulSetEnvName);
                    String statefulSetName = Strings.nullToEmpty(System.getenv(statefulSetEnvName));
                    if (Strings.isNullOrEmpty(statefulSetName)) {
                        LOG.error("failed to init statefulSetName: {}", statefulSetEnvName);
                        System.exit(-1);
                    }
                    LOG.info("use statefulSetName: {}, {}", nodeType.name(), statefulSetName);
                    nodeTypeAttr.setSubAttr(statefulSetName);
                }
            }
        }
    }
    @Override
    public void startListenerInternal() {
        statefulSetWatch = getWatch(client());
        LOG.info("Start listen statefulSets change event.");
    }
    private String getStatefulSetEnvName(NodeType nodeType) {
        switch (nodeType) {
            case ELECTABLE:
                return ENV_FE_STATEFULSET;
            case OBSERVER:
                return ENV_FE_OBSERVER_STATEFULSET;
            case BACKEND:
                return ENV_BE_STATEFULSET;
            case BACKEND_CN:
                return ENV_CN_STATEFULSET;
            case BROKER:
                return ENV_BROKER_STATEFULSET;
            default:
                return null;
        }
    }
    @Override
    protected List<HostInfo> getGroupHostInfos(NodeType nodeType) {
        if (Config.enable_fqdn_mode) {
            return getGroupHostInfosByStatefulSet(nodeType);
        } else {
            return getGroupHostInfosByEndpoint(nodeType);
        }
    }
    private List<HostInfo> getGroupHostInfosByStatefulSet(NodeType nodeType) {
        String statefulSetName = nodeTypeAttrMap.get(nodeType).getSubAttr();
        Preconditions.checkNotNull(statefulSetName);
        StatefulSet statefulSet = statefulSet(appNamespace, nodeTypeAttrMap.get(nodeType).getSubAttr());
        if (statefulSet == null) {
            LOG.warn("get null statefulSet in namespace {}, statefulSetName: {}", appNamespace, statefulSetName);
            return null;
        }
        return getHostInfosByNum(nodeType, statefulSet.getSpec().getReplicas());
    }
    private List<HostInfo> getGroupHostInfosByEndpoint(NodeType nodeType) {
        // get portName
        String portName = getPortName(nodeType);
        Preconditions.checkNotNull(portName);
        // get serviceName
        String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
        Preconditions.checkNotNull(serviceName);
        // get endpoint
        Endpoints endpoints = endpoints(appNamespace, serviceName);
        if (endpoints == null) {
            // endpoints may be null if service does not exist;
            LOG.warn("get null endpoints of namespace: {} in service: {}", appNamespace, serviceName);
            return null;
        }
        // get host port
        List<HostInfo> result = Lists.newArrayList();
        List<EndpointSubset> subsets = endpoints.getSubsets();
        for (EndpointSubset subset : subsets) {
            Integer port = -1;
            List<EndpointPort> ports = subset.getPorts();
            for (EndpointPort eport : ports) {
                if (eport.getName().equals(portName)) {
                    port = eport.getPort();
                    break;
                }
            }
            if (port == -1) {
                LOG.warn("failed to get {} port", portName);
                return null;
            }
            List<EndpointAddress> addrs = subset.getAddresses();
            for (EndpointAddress eaddr : addrs) {
                result.add(new HostInfo(eaddr.getIp(), port));
            }
        }
        LOG.info("get host port from group: {}: {}", serviceName, result);
        return result;
    }
    // The rules for the domain name of k8s are $(podName).$(servicename).$(namespace).svc.cluster.local
    // and The podName rule of k8s is $(statefulset name) - $(sequence number)
    // see https://www.cnblogs.com/xiaokantianse/p/14267987.html#_label1_4
    public String getDomainName(NodeType nodeType, int index) {
        String statefulSetName = nodeTypeAttrMap.get(nodeType).getSubAttr();
        Preconditions.checkNotNull(statefulSetName);
        String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
        Preconditions.checkNotNull(serviceName);
        StringBuilder builder = new StringBuilder();
        builder.append(statefulSetName + "-" + index);
        builder.append(".");
        builder.append(serviceName);
        builder.append(".");
        builder.append(appNamespace);
        builder.append(".");
        builder.append(domainLTD);
        return builder.toString();
    }
    private Endpoints endpoints(String namespace, String serviceName) {
        try {
            return client().endpoints().inNamespace(namespace).withName(serviceName).get();
        } catch (Exception e) {
            LOG.warn("encounter exception when get endpoint from namespace {}, service: {}",
                    appNamespace, serviceName, e);
            return null;
        }
    }
    public Service service(String namespace, String serviceName) {
        try {
            return client().services().inNamespace(namespace).withName(serviceName).get();
        } catch (Exception e) {
            LOG.warn("encounter exception when get service from namespace {}, service: {}",
                    appNamespace, serviceName, e);
            return null;
        }
    }
    public StatefulSet statefulSet(String namespace, String statefulSetName) {
        try {
            return client().apps().statefulSets().inNamespace(namespace).withName(statefulSetName).get();
        } catch (Exception e) {
            LOG.warn("encounter exception when get statefulSet from namespace {}, statefulSet: {}",
                    appNamespace, statefulSetName, e);
            return null;
        }
    }
    private void dealEvent(String statefulsetName, Integer num) {
        NodeType nodeType = getNodeType(statefulsetName);
        if (nodeType == null) {
            return;
        }
        List<HostInfo> hostInfosByNum = getHostInfosByNum(nodeType, num);
        if (hostInfosByNum == null) {
            return;
        }
        Event event = new Event(nodeType, hostInfosByNum);
        nodeChangeQueue.offer(event);
    }
    public List<HostInfo> getHostInfosByNum(NodeType nodeType, Integer num) {
        int servicePort = getServicePort(nodeType);
        if (servicePort == -1) {
            LOG.warn("get servicePort failed,{}", nodeType.name());
            return null;
        }
        List<HostInfo> hostInfos = Lists.newArrayList();
        for (int i = 0; i < num; i++) {
            String domainName = getDomainName(nodeType, i);
            hostInfos.add(new HostInfo(domainName, servicePort));
            if (LOG.isDebugEnabled()) {
                LOG.debug("get hostInfo from domainName: {}, hostInfo: {}", domainName, hostInfos.get(i).toString());
            }
        }
        return hostInfos;
    }
    private int getServicePort(NodeType nodeType) {
        Integer port = -1;
        String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
        Preconditions.checkNotNull(serviceName);
        Service service = service(appNamespace, serviceName);
        if (service == null) {
            LOG.warn("get null service in namespace: {}, serviceName: {}", appNamespace, serviceName);
            return port;
        }
        String portName = getPortName(nodeType);
        Preconditions.checkNotNull(portName);
        List<ServicePort> ports = service.getSpec().getPorts();
        for (ServicePort servicePort : ports) {
            if (servicePort.getName().equals(portName)) {
                port = servicePort.getPort();
                break;
            }
        }
        return port;
    }
    private String getPortName(NodeType nodeType) {
        switch (nodeType) {
            case BROKER:
                return BROKER_PORT;
            case ELECTABLE:
            case OBSERVER:
                return FE_PORT;
            case BACKEND:
            case BACKEND_CN:
                return BE_PORT;
            default:
                return null;
        }
    }
    private NodeType getNodeType(String ststefulSetName) {
        if (StringUtils.isEmpty(ststefulSetName)) {
            return null;
        }
        for (Map.Entry<NodeType, NodeTypeAttr> entry : nodeTypeAttrMap.entrySet()) {
            if (ststefulSetName.equals(entry.getValue().getSubAttr())) {
                return entry.getKey();
            }
        }
        return null;
    }
    private synchronized KubernetesClient client() {
        if (client != null) {
            return client;
        }
        try {
            if (Config.with_k8s_certs) {
                // for test only
                ConfigBuilder configBuilder = new ConfigBuilder().withMasterUrl(TEST_MASTER_URL)
                        .withTrustCerts(true)
                        .withCaCertFile(K8S_CA_CERT_FILE).withClientCertFile(K8S_CLIENT_CERT_FILE)
                        .withClientKeyFile(K8S_CLIENT_KEY_FILE);
                client = new DefaultKubernetesClient(configBuilder.build());
            } else {
                // When accessing k8s api within the pod, no params need to be provided.
                client = new DefaultKubernetesClient();
            }
        } catch (KubernetesClientException e) {
            LOG.warn("failed to get k8s client.", e);
            throw e;
        }
        return client;
    }
    private Watch getWatch(KubernetesClient client) {
        return client.apps().statefulSets().inNamespace(appNamespace).watch(new Watcher<StatefulSet>() {
            @Override
            public void onClose(WatcherException e) {
                LOG.warn("Watch error received: {}.", e.getMessage());
            }
            @Override
            public void eventReceived(Action action, StatefulSet statefulSet) {
                LOG.info("Watch event received {}: {}: {}", action.name(), statefulSet.getMetadata().getName(),
                        statefulSet.getSpec().getReplicas());
                dealEvent(statefulSet.getMetadata().getName(), statefulSet.getSpec().getReplicas());
            }
            @Override
            public void onClose() {
                LOG.info("Watch gracefully closed.");
            }
        });
    }
    public void close() {
        if (statefulSetWatch != null) {
            statefulSetWatch.close();
        }
        if (client != null) {
            client.close();
        }
    }
}