NodeAction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.httpv2.rest.manager;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.proc.ProcService;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.httpv2.rest.SetConfigAction;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/*
 * Used to return all node information, configuration information and modify node config.
 */
@RestController
@RequestMapping("/rest/v2/manager/node")
public class NodeAction extends RestBaseController {
    private static final Logger LOG = LogManager.getLogger(NodeAction.class);
    private static final Pattern PATTERN = Pattern.compile(":");

    public static final String AUTHORIZATION = "Authorization";
    private static final int HTTP_WAIT_TIME_SECONDS = 2;

    public static final String CONFIG = "配置项";
    public static final String NODE_IP_PORT = "节点";
    public static final String NODE_TYPE = "节点类型";
    public static final String CONFIG_TYPE = "配置值类型";
    public static final String MASTER_ONLY = "MasterOnly";
    public static final String CONFIG_VALUE = "配置值";
    public static final String IS_MUTABLE = "可修改";

    public static final ImmutableList<String> FE_CONFIG_TITLE_NAMES = new ImmutableList.Builder<String>().add(CONFIG)
            .add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE).add(MASTER_ONLY).add(CONFIG_VALUE).add(IS_MUTABLE)
            .build();

    public static final ImmutableList<String> BE_CONFIG_TITLE_NAMES = new ImmutableList.Builder<String>().add(CONFIG)
            .add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE).add(CONFIG_VALUE).add(IS_MUTABLE).build();

    private Object httpExecutorLock = new Object();
    private static volatile ExecutorService httpExecutor = null;

    // Returns all fe information, similar to 'show frontends'.
    @RequestMapping(path = "/frontends", method = RequestMethod.GET)
    public Object frontends_info(HttpServletRequest request, HttpServletResponse response) throws Exception {
        executeCheckPassword(request, response);
        checkDbAuth(ConnectContext.get().getCurrentUserIdentity(), InfoSchemaDb.DATABASE_NAME, PrivPredicate.SELECT);

        return fetchNodeInfo(request, response, "/frontends");
    }

    // Returns all be information, similar to 'show backends'.
    @RequestMapping(path = "/backends", method = RequestMethod.GET)
    public Object backends_info(HttpServletRequest request, HttpServletResponse response) throws Exception {
        executeCheckPassword(request, response);
        checkDbAuth(ConnectContext.get().getCurrentUserIdentity(), InfoSchemaDb.DATABASE_NAME, PrivPredicate.SELECT);

        return fetchNodeInfo(request, response, "/backends");
    }

    // Returns all broker information, similar to 'show broker'.
    @RequestMapping(path = "/brokers", method = RequestMethod.GET)
    public Object brokers_info(HttpServletRequest request, HttpServletResponse response) throws Exception {
        executeCheckPassword(request, response);
        checkDbAuth(ConnectContext.get().getCurrentUserIdentity(), InfoSchemaDb.DATABASE_NAME, PrivPredicate.SELECT);

        return fetchNodeInfo(request, response, "/brokers");
    }

    // {
    //   "column_names": [
    //     ""
    //   ],
    //   "rows": [
    //     [
    //       ""
    //     ]
    //   ]
    // }
    private Object fetchNodeInfo(HttpServletRequest request, HttpServletResponse response, String procPath)
            throws Exception {
        try {
            if (needRedirect(request.getScheme())) {
                return redirectToHttps(request);
            }

            if (checkForwardToMaster(request)) {
                return forwardToMaster(request, null);
            }

            ProcResult procResult = ProcService.getInstance().open(procPath).fetchResult();
            List<String> columnNames = Lists.newArrayList(procResult.getColumnNames());
            return ResponseEntityBuilder.ok(new NodeInfo(columnNames, procResult.getRows()));
        } catch (Exception e) {
            LOG.warn(e);
            throw e;
        }
    }

    @Getter
    @Setter
    public static class NodeInfo {
        public List<String> columnNames;
        public List<List<String>> rows;

        public NodeInfo(List<String> columnNames, List<List<String>> rows) {
            this.columnNames = columnNames;
            this.rows = rows;
        }
    }

    // Return fe and be all configuration names.
    // {
    //   "frontend": [
    //     ""
    //   ],
    //   "backend": [
    //     ""
    //   ]
    // }
    @RequestMapping(path = "/configuration_name", method = RequestMethod.GET)
    public Object configurationName(HttpServletRequest request, HttpServletResponse response) {
        executeCheckPassword(request, response);
        checkDbAuth(ConnectContext.get().getCurrentUserIdentity(), InfoSchemaDb.DATABASE_NAME, PrivPredicate.SELECT);

        Map<String, List<String>> result = Maps.newHashMap();
        try {
            result.put("frontend", Lists.newArrayList(Config.dump().keySet()));

            List<String> beConfigNames = Lists.newArrayList();
            List<Long> beIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
            if (!beIds.isEmpty()) {
                Backend be = Env.getCurrentSystemInfo().getBackend(beIds.get(0));
                String url = "http://" + NetUtils.getHostPortInAccessibleFormat(be.getHost(), be.getHttpPort())
                        + "/api/show_config";
                String questResult = HttpUtils.doGet(url, null);
                List<List<String>> configs = GsonUtils.GSON.fromJson(questResult, new TypeToken<List<List<String>>>() {
                }.getType());
                for (List<String> config : configs) {
                    beConfigNames.add(config.get(0));
                }
            }
            result.put("backend", beConfigNames);
        } catch (Exception e) {
            LOG.warn(e);
            return ResponseEntityBuilder.internalError(e.getMessage());
        }
        return ResponseEntityBuilder.ok(result);
    }

    // Return all fe and be nodes.
    // {
    //   "frontend": [
    //     "host:httpPort"
    //   ],
    //   "backend": [
    //     "host:httpPort""
    //   ]
    // }
    @RequestMapping(path = "/node_list", method = RequestMethod.GET)
    public Object nodeList(HttpServletRequest request, HttpServletResponse response) {
        executeCheckPassword(request, response);
        checkDbAuth(ConnectContext.get().getCurrentUserIdentity(), InfoSchemaDb.DATABASE_NAME, PrivPredicate.SELECT);

        Map<String, List<String>> result = Maps.newHashMap();
        result.put("frontend", getFeList());
        result.put("backend", getBeList());
        return ResponseEntityBuilder.ok(result);
    }

    private static List<String> getFeList() {
        return Env.getCurrentEnv().getFrontends(null).stream()
                .map(fe -> NetUtils.getHostPortInAccessibleFormat(fe.getHost(), Config.http_port))
                .collect(Collectors.toList());
    }

    private static List<String> getBeList() {
        return Env.getCurrentSystemInfo().getAllBackendIds(false).stream().map(beId -> {
            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
            return NetUtils.getHostPortInAccessibleFormat(be.getHost(), be.getHttpPort());
        }).collect(Collectors.toList());
    }

    /*
     * this http interface is used to return configuration information requested by other fe.
     */
    @RequestMapping(path = "/config", method = RequestMethod.GET)
    public Object config(HttpServletRequest request, HttpServletResponse response) {
        executeCheckPassword(request, response);
        checkDbAuth(ConnectContext.get().getCurrentUserIdentity(), InfoSchemaDb.DATABASE_NAME, PrivPredicate.SELECT);

        List<List<String>> configs = ConfigBase.getConfigInfo(null);
        // Sort all configs by config key.
        configs.sort(Comparator.comparing(o -> o.get(0)));

        // reorder the fields
        List<List<String>> results = Lists.newArrayList();
        for (List<String> config : configs) {
            List<String> list = Lists.newArrayList();
            list.add(config.get(0));
            list.add(config.get(2));
            list.add(config.get(4));
            list.add(config.get(1));
            list.add(config.get(3));
            results.add(list);
        }
        return results;
    }

    // Return the configuration information of fe or be.
    //
    // for fe:
    // {
    //   "column_names": [
    //     "配置项",
    //     "节点",
    //     "节点类型",
    //     "配置类型",
    //     "仅master",
    //     "配置值",
    //     "可修改"
    //   ],
    //   "rows": [
    //     [
    //       ""
    //     ]
    //   ]
    // }
    //
    // for be:
    // {
    //   "column_names": [
    //     "配置项",
    //     "节点",
    //     "节点类型",
    //     "配置类型",
    //     "配置值",
    //     "可修改"
    //   ],
    //   "rows": [
    //     [
    //       ""
    //     ]
    //   ]
    // }
    @RequestMapping(path = "/configuration_info", method = RequestMethod.POST)
    public Object configurationInfo(HttpServletRequest request, HttpServletResponse response,
            @RequestParam(value = "type") String type,
            @RequestBody(required = false) ConfigInfoRequestBody requestBody) {
        executeCheckPassword(request, response);
        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN_OR_NODE);

        initHttpExecutor();

        if (requestBody == null) {
            requestBody = new ConfigInfoRequestBody();
        }
        List<Pair<String, Integer>> hostPorts;
        if (type.equalsIgnoreCase("fe")) {
            if (requestBody.getNodes() != null && !requestBody.getNodes().isEmpty()) {
                hostPorts = parseHostPort(requestBody.getNodes());
            } else {
                hostPorts = parseHostPort(getFeList());
            }

            List<Map.Entry<String, Integer>> errNodes = Lists.newArrayList();
            List<List<String>> data = handleConfigurationInfo(hostPorts, request.getHeader(AUTHORIZATION),
                    "/rest/v2/manager/node/config", "FE", requestBody.getConfNames(), errNodes);
            if (!errNodes.isEmpty()) {
                LOG.warn("Failed to get fe node configuration information from:{}", errNodes.toString());
            }
            return ResponseEntityBuilder.ok(new NodeInfo(FE_CONFIG_TITLE_NAMES, data));
        } else if (type.equalsIgnoreCase("be")) {
            if (requestBody.getNodes() != null && !requestBody.getNodes().isEmpty()) {
                hostPorts = parseHostPort(requestBody.getNodes());
            } else {
                hostPorts = parseHostPort(getBeList());
            }

            List<Map.Entry<String, Integer>> errNodes = Lists.newArrayList();
            List<List<String>> data = handleConfigurationInfo(hostPorts, request.getHeader(AUTHORIZATION),
                    "/api/show_config", "BE", requestBody.getConfNames(), errNodes);
            if (!errNodes.isEmpty()) {
                LOG.warn("Failed to get be node configuration information from:{}", errNodes.toString());
            }
            return ResponseEntityBuilder.ok(new NodeInfo(BE_CONFIG_TITLE_NAMES, data));
        }
        return ResponseEntityBuilder.badRequest(
                "Unsupported type: " + type + ". Only types of fe or be are " + "supported");
    }

    // Use thread pool to concurrently fetch configuration information from specified fe or be nodes.
    private List<List<String>> handleConfigurationInfo(List<Pair<String, Integer>> hostPorts, String authorization,
            String questPath, String nodeType, List<String> confNames, List<Map.Entry<String, Integer>> errNodes) {
        // The configuration information returned by each node is a List<List<String>> type,
        // configInfoTotal is used to store the configuration information of all nodes.
        List<List<List<String>>> configInfoTotal = Lists.newArrayList();
        MarkedCountDownLatch<String, Integer> configRequestDoneSignal = new MarkedCountDownLatch<>(hostPorts.size());
        for (int i = 0; i < hostPorts.size(); ++i) {
            configInfoTotal.add(Lists.newArrayList());

            Pair<String, Integer> hostPort = hostPorts.get(i);
            String address = NetUtils.getHostPortInAccessibleFormat(hostPort.first, hostPort.second);
            configRequestDoneSignal.addMark(address, -1);
            String url = "http://" + address + questPath;
            httpExecutor.submit(
                    new HttpConfigInfoTask(url, hostPort, authorization, nodeType, confNames, configRequestDoneSignal,
                            configInfoTotal.get(i)));
        }
        List<List<String>> resultConfigs = Lists.newArrayList();
        try {
            configRequestDoneSignal.await(HTTP_WAIT_TIME_SECONDS, TimeUnit.SECONDS);
            for (List<List<String>> lists : configInfoTotal) {
                resultConfigs.addAll(lists);
            }
        } catch (InterruptedException e) {
            errNodes.addAll(configRequestDoneSignal.getLeftMarks());
        }

        return resultConfigs;
    }

    private void initHttpExecutor() {
        if (httpExecutor == null) {
            synchronized (httpExecutorLock) {
                if (httpExecutor == null) {
                    httpExecutor = ThreadPoolManager.newDaemonFixedThreadPool(5, 100, "node-config-update-pool", true);
                }
            }
        }
    }

    static List<Pair<String, Integer>> parseHostPort(List<String> nodes) {
        List<Pair<String, Integer>> hostPorts = Lists.newArrayList();
        for (String node : nodes) {
            try {
                Pair<String, Integer> ipPort = SystemInfoService.validateHostAndPort(node);
                hostPorts.add(ipPort);
            } catch (Exception e) {
                LOG.warn(e);
            }
        }
        return hostPorts;
    }

    private class HttpConfigInfoTask implements Runnable {
        private String url;
        private Pair<String, Integer> hostPort;
        private String authorization;
        private String nodeType;
        private List<String> confNames;
        private MarkedCountDownLatch<String, Integer> configRequestDoneSignal;
        private List<List<String>> config;

        public HttpConfigInfoTask(String url, Pair<String, Integer> hostPort, String authorization, String nodeType,
                List<String> confNames, MarkedCountDownLatch<String, Integer> configRequestDoneSignal,
                List<List<String>> config) {
            this.url = url;
            this.hostPort = hostPort;
            this.authorization = authorization;
            this.nodeType = nodeType;
            this.confNames = confNames;
            this.configRequestDoneSignal = configRequestDoneSignal;
            this.config = config;
        }

        @Override
        public void run() {
            String configInfo;
            try {
                configInfo = HttpUtils.doGet(url,
                        ImmutableMap.<String, String>builder().put(AUTHORIZATION, authorization).build());
                List<List<String>> configs = GsonUtils.GSON.fromJson(configInfo, new TypeToken<List<List<String>>>() {
                }.getType());
                for (List<String> conf : configs) {
                    if (confNames == null || confNames.isEmpty() || confNames.contains(conf.get(0))) {
                        addConfig(conf);
                    }
                }
                configRequestDoneSignal.markedCountDown(NetUtils
                        .getHostPortInAccessibleFormat(hostPort.first, hostPort.second), -1);
            } catch (Exception e) {
                LOG.warn("get config from {}:{} failed.", hostPort.first, hostPort.second, e);
                configRequestDoneSignal.countDown();
            }
        }

        private void addConfig(List<String> conf) {
            conf.add(1, NetUtils
                    .getHostPortInAccessibleFormat(hostPort.first, hostPort.second));
            conf.add(2, nodeType);
            config.add(conf);
        }
    }

    // Modify fe configuration.
    //
    // request body:
    // {
    //   "config_name":{
    //     "node":[
    //       ""
    //     ],
    //     "value":"",
    //     "persist":""
    //   }
    // }
    //
    // return data:
    // {
    //   "failed":[
    //     {
    //       "config_name":"",
    //       "value"="",
    //       "node":"",
    //       "err_info":""
    //     }
    //   ]
    //  }
    @RequestMapping(path = "/set_config/fe", method = RequestMethod.POST)
    public Object setConfigFe(HttpServletRequest request, HttpServletResponse response,
            @RequestBody Map<String, SetConfigRequestBody> requestBody) {
        executeCheckPassword(request, response);
        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);

        List<Map<String, String>> failedTotal = Lists.newArrayList();
        List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, failedTotal);
        List<Pair<String, Integer>> aliveFe = Env.getCurrentEnv().getFrontends(null).stream().filter(Frontend::isAlive)
                .map(fe -> Pair.of(fe.getHost(), Config.http_port)).collect(Collectors.toList());
        checkNodeIsAlive(nodeConfigList, aliveFe, failedTotal);

        Map<String, String> header = Maps.newHashMap();
        header.put(AUTHORIZATION, request.getHeader(AUTHORIZATION));

        for (NodeConfigs nodeConfigs : nodeConfigList) {
            if (!nodeConfigs.getConfigs(true).isEmpty()) {
                String url = concatFeSetConfigUrl(nodeConfigs, true);
                try {
                    String responsePersist = HttpUtils.doGet(url, header);
                    parseFeSetConfigResponse(responsePersist, nodeConfigs.getHostPort(), failedTotal);
                } catch (Exception e) {
                    addSetConfigErrNode(nodeConfigs.getConfigs(true), nodeConfigs.getHostPort(), e.getMessage(),
                            failedTotal);
                }
            }
            if (!nodeConfigs.getConfigs(false).isEmpty()) {
                String url = concatFeSetConfigUrl(nodeConfigs, false);
                try {
                    String responseTemp = HttpUtils.doGet(url, header);
                    parseFeSetConfigResponse(responseTemp, nodeConfigs.getHostPort(), failedTotal);
                } catch (Exception e) {
                    addSetConfigErrNode(nodeConfigs.getConfigs(false), nodeConfigs.getHostPort(), e.getMessage(),
                            failedTotal);
                }
            }

        }
        Map<String, List<Map<String, String>>> data = Maps.newHashMap();
        data.put("failed", failedTotal);
        return ResponseEntityBuilder.ok(data);
    }

    private void addSetConfigErrNode(Map<String, String> configs, Pair<String, Integer> hostPort, String err,
            List<Map<String, String>> failedTotal) {
        for (Map.Entry<String, String> entry : configs.entrySet()) {
            Map<String, String> failed = Maps.newHashMap();
            addFailedConfig(entry.getKey(), entry.getValue(), NetUtils
                    .getHostPortInAccessibleFormat(hostPort.first, hostPort.second), err, failed);
            failedTotal.add(failed);
        }
    }

    private void parseFeSetConfigResponse(String response, Pair<String, Integer> hostPort,
            List<Map<String, String>> failedTotal) throws Exception {
        JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject();
        if (jsonObject.get("code").getAsInt() != HttpUtils.REQUEST_SUCCESS_CODE) {
            throw new Exception(jsonObject.get("msg").getAsString());
        }
        SetConfigAction.SetConfigEntity setConfigEntity = GsonUtils.GSON.fromJson(
                jsonObject.get("data").getAsJsonObject(), SetConfigAction.SetConfigEntity.class);
        for (SetConfigAction.ErrConfig errConfig : setConfigEntity.getErrConfigs()) {
            Map<String, String> failed = Maps.newHashMap();
            addFailedConfig(errConfig.getConfigName(), errConfig.getConfigValue(),
                    NetUtils.getHostPortInAccessibleFormat(hostPort.first, hostPort.second), errConfig.getErrInfo(),
                    failed);
            failedTotal.add(failed);
        }
    }

    private static void addFailedConfig(String configName, String value, String node, String errInfo,
            Map<String, String> failed) {
        failed.put("config_name", configName);
        failed.put("value", value);
        failed.put("node", node);
        failed.put("err_info", errInfo);
    }

    private String concatFeSetConfigUrl(NodeConfigs nodeConfigs, boolean isPersist) {
        StringBuilder sb = new StringBuilder();
        Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
        sb.append("http://").append(hostPort.first).append(":").append(hostPort.second).append("/api/_set_config");
        Map<String, String> configs = nodeConfigs.getConfigs(isPersist);
        boolean addAnd = false;
        for (Map.Entry<String, String> entry : configs.entrySet()) {
            if (addAnd) {
                sb.append("&");
            } else {
                sb.append("?");
                addAnd = true;
            }
            sb.append(entry.getKey()).append("=").append(entry.getValue());
        }
        if (isPersist) {
            sb.append("&persist=true&reset_persist=false");
        }
        return sb.toString();
    }

    // Modify fe configuration.
    // The request body and return data are in the same format as fe
    @RequestMapping(path = "/set_config/be", method = RequestMethod.POST)
    public Object setConfigBe(HttpServletRequest request, HttpServletResponse response,
            @RequestBody Map<String, SetConfigRequestBody> requestBody) {
        executeCheckPassword(request, response);
        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);

        List<Map<String, String>> failedTotal = Lists.newArrayList();
        List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, failedTotal);
        List<Pair<String, Integer>> aliveBe = Env.getCurrentSystemInfo().getAllBackendIds(true).stream().map(beId -> {
            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
            return Pair.of(be.getHost(), be.getHttpPort());
        }).collect(Collectors.toList());
        checkNodeIsAlive(nodeConfigList, aliveBe, failedTotal);

        handleBeSetConfig(nodeConfigList, request.getHeader(AUTHORIZATION), failedTotal);
        failedTotal = failedTotal.stream().filter(e -> !e.isEmpty()).collect(Collectors.toList());

        Map<String, List<Map<String, String>>> data = Maps.newHashMap();
        data.put("failed", failedTotal);
        return ResponseEntityBuilder.ok(data);
    }

    @PostMapping("/{action}/be")
    public Object operateBackend(HttpServletRequest request, HttpServletResponse response, @PathVariable String action,
            @RequestBody BackendReqInfo reqInfo) {
        try {
            if (needRedirect(request.getScheme())) {
                return redirectToHttps(request);
            }

            if (checkForwardToMaster(request)) {
                return forwardToMaster(request, reqInfo);
            }

            List<String> hostPorts = reqInfo.getHostPorts();
            List<HostInfo> hostInfos = new ArrayList<>();
            for (String hostPort : hostPorts) {
                hostInfos.add(SystemInfoService.getHostAndPort(hostPort));
            }
            SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
            if ("ADD".equals(action)) {
                Map<String, String> properties;
                if (reqInfo.getProperties() == null) {
                    properties = new HashMap<>();
                } else {
                    properties = reqInfo.getProperties();
                }
                Map<String, String> tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties,
                        Tag.DEFAULT_BACKEND_TAG);
                currentSystemInfo.addBackends(hostInfos, tagMap);
            } else if ("DROP".equals(action)) {
                currentSystemInfo.dropBackends(hostInfos);
            } else if ("DECOMMISSION".equals(action)) {
                ImmutableMap<Long, Backend> backendsInCluster = currentSystemInfo.getAllBackendsByAllCluster();
                backendsInCluster.forEach((k, v) -> {
                    hostInfos.stream()
                            .filter(h -> v.getHost().equals(h.getHost()) && v.getHeartbeatPort() == h.getPort())
                            .findFirst().ifPresent(h -> {
                                v.setDecommissioned(true);
                                Env.getCurrentEnv().getEditLog().logBackendStateChange(v);
                            });
                });
            }
        } catch (Exception e) {
            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
        }
        return ResponseEntityBuilder.ok();
    }

    @PostMapping("/{action}/fe")
    public Object operateFrontends(HttpServletRequest request, HttpServletResponse response,
            @PathVariable String action, @RequestBody FrontendReqInfo reqInfo) {
        try {
            if (needRedirect(request.getScheme())) {
                return redirectToHttps(request);
            }

            if (checkForwardToMaster(request)) {
                return forwardToMaster(request, reqInfo);
            }

            String role = reqInfo.getRole();
            Env currentEnv = Env.getCurrentEnv();
            FrontendNodeType frontendNodeType;
            if (FrontendNodeType.FOLLOWER.name().equals(role)) {
                frontendNodeType = FrontendNodeType.FOLLOWER;
            } else {
                frontendNodeType = FrontendNodeType.OBSERVER;
            }
            HostInfo info = SystemInfoService.getHostAndPort(reqInfo.getHostPort());
            if ("ADD".equals(action)) {
                currentEnv.addFrontend(frontendNodeType, info.getHost(), info.getPort());
            } else if ("DROP".equals(action)) {
                currentEnv.dropFrontend(frontendNodeType, info.getHost(), info.getPort());
            }
        } catch (Exception e) {
            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
        }
        return ResponseEntityBuilder.ok();
    }

    @Data
    private static class BackendReqInfo {

        private List<String> hostPorts;

        private Map<String, String> properties;
    }

    @Data
    private static class FrontendReqInfo {

        private String role;

        private String hostPort;
    }

    // Parsing request body into List<NodeConfigs>
    private List<NodeConfigs> parseSetConfigNodes(Map<String, SetConfigRequestBody> requestBody,
            List<Map<String, String>> errNodes) {
        List<NodeConfigs> nodeConfigsList = Lists.newArrayList();
        for (String configName : requestBody.keySet()) {
            SetConfigRequestBody configPara = requestBody.get(configName);
            String value = configPara.getValue();
            boolean persist = configPara.isPersist();
            if (value == null || configPara.getNodes() == null) {
                continue;
            }

            for (String node : configPara.getNodes()) {
                Pair<String, Integer> ipPort;
                try {
                    ipPort = SystemInfoService.validateHostAndPort(node);
                } catch (Exception e) {
                    Map<String, String> failed = Maps.newHashMap();
                    addFailedConfig(configName, configPara.getValue(), node, "node invalid", failed);
                    errNodes.add(failed);
                    continue;
                }
                boolean find = false;
                for (NodeConfigs nodeConfigs : nodeConfigsList) {
                    Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
                    if (ipPort.first.equals(hostPort.first) && ipPort.second.equals(hostPort.second)) {
                        find = true;
                        nodeConfigs.addConfig(configName, value, persist);
                    }
                }
                if (!find) {
                    NodeConfigs newNodeConfigs = new NodeConfigs(ipPort.first, ipPort.second);
                    nodeConfigsList.add(newNodeConfigs);
                    newNodeConfigs.addConfig(configName, value, persist);
                }
            }
        }
        return nodeConfigsList;
    }

    private void checkNodeIsAlive(List<NodeConfigs> nodeConfigsList, List<Pair<String, Integer>> aliveNodes,
            List<Map<String, String>> failedNodes) {
        Iterator<NodeConfigs> it = nodeConfigsList.iterator();
        while (it.hasNext()) {
            NodeConfigs node = it.next();
            boolean isExist = false;
            for (Pair<String, Integer> aliveHostPort : aliveNodes) {
                if (aliveHostPort.first.equals(node.getHostPort().first) && aliveHostPort.second.equals(
                        node.getHostPort().second)) {
                    isExist = true;
                    break;
                }
            }
            if (!isExist) {
                addSetConfigErrNode(node.getConfigs(true), node.getHostPort(), "Node does not exist or is not alive",
                        failedNodes);
                addSetConfigErrNode(node.getConfigs(false), node.getHostPort(), "Node does not exist or is not alive",
                        failedNodes);
                it.remove();
            }
        }
    }

    private List<Map<String, String>> handleBeSetConfig(List<NodeConfigs> nodeConfigList, String authorization,
            List<Map<String, String>> failedTotal) {
        initHttpExecutor();

        int configNum = nodeConfigList.stream().mapToInt(e -> e.getConfigs(true).size() + e.getConfigs(false).size())
                .sum();
        MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal = new MarkedCountDownLatch<>(configNum);
        for (NodeConfigs nodeConfigs : nodeConfigList) {
            submitBeSetConfigTask(nodeConfigs, true, authorization, beSetConfigCountDownSignal, failedTotal);
            submitBeSetConfigTask(nodeConfigs, false, authorization, beSetConfigCountDownSignal, failedTotal);
        }
        try {
            beSetConfigCountDownSignal.await(HTTP_WAIT_TIME_SECONDS, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.warn("set be config exception:", e);
        } finally {
            List<Map.Entry<String, Integer>> leftNode = beSetConfigCountDownSignal.getLeftMarks();
            for (Map.Entry<String, Integer> failedNode : leftNode) {
                Map<String, String> failed = parseNodeConfig(failedNode.getKey());
                if (!failed.isEmpty()) {
                    failed.put("err_info", "Connection timeout");
                    failedTotal.add(failed);
                }
            }
        }
        return failedTotal;
    }

    private void submitBeSetConfigTask(NodeConfigs nodeConfigs, boolean isPersist, String authorization,
            MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal, List<Map<String, String>> failedTotal) {
        if (!nodeConfigs.getConfigs(isPersist).isEmpty()) {
            for (Map.Entry<String, String> entry : nodeConfigs.getConfigs(isPersist).entrySet()) {
                failedTotal.add(Maps.newHashMap());
                Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
                beSetConfigCountDownSignal.addMark(
                        concatNodeConfig(hostPort.first, hostPort.second, entry.getKey(), entry.getValue()), -1);

                String url = concatBeSetConfigUrl(hostPort.first, hostPort.second, entry.getKey(), entry.getValue(),
                        isPersist);
                httpExecutor.submit(
                        new HttpSetConfigTask(url, hostPort, authorization, entry.getKey(), entry.getValue(),
                                beSetConfigCountDownSignal, failedTotal.get(failedTotal.size() - 1)));
            }
        }
    }

    private String concatBeSetConfigUrl(String host, Integer port, String configName, String configValue,
            boolean isPersist) {
        StringBuilder stringBuffer = new StringBuilder();
        stringBuffer.append("http://").append(host).append(":").append(port).append("/api/update_config").append("?")
                .append(configName).append("=").append(configValue);
        if (isPersist) {
            stringBuffer.append("&persist=true");
        }
        return stringBuffer.toString();
    }

    private String concatNodeConfig(String host, Integer port, String configName, String configValue) {
        return NetUtils
                .getHostPortInAccessibleFormat(host, port) + ":" + configName + ":" + configValue;
    }

    private Map<String, String> parseNodeConfig(String nodeConfig) {
        Map<String, String> map = Maps.newHashMap();
        String[] splitStrings = PATTERN.split(nodeConfig);
        if (splitStrings.length == 4) {
            addFailedConfig(splitStrings[2], splitStrings[3], splitStrings[0] + ":" + splitStrings[1], "", map);
        }
        return map;
    }

    private class HttpSetConfigTask implements Runnable {
        private String url;
        private Pair<String, Integer> hostPort;
        private String authorization;
        private String configName;
        private String configValue;
        private MarkedCountDownLatch<String, Integer> beSetConfigDoneSignal;
        private Map<String, String> failed;

        public HttpSetConfigTask(String url, Pair<String, Integer> hostPort, String authorization, String configName,
                String configValue, MarkedCountDownLatch<String, Integer> beSetConfigDoneSignal,
                Map<String, String> failed) {
            this.url = url;
            this.hostPort = hostPort;
            this.authorization = authorization;
            this.configName = configName;
            this.configValue = configValue;
            this.beSetConfigDoneSignal = beSetConfigDoneSignal;
            this.failed = failed;
        }

        @Override
        public void run() {
            try {
                String response = HttpUtils.doPost(url,
                        ImmutableMap.<String, String>builder().put(AUTHORIZATION, authorization).build(), null);
                JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject();
                String status = jsonObject.get("status").getAsString();
                if (!status.equals("OK")) {
                    addFailedConfig(configName, configValue, NetUtils
                            .getHostPortInAccessibleFormat(hostPort.first, hostPort.second),
                            jsonObject.get("msg").getAsString(), failed);
                }
                beSetConfigDoneSignal.markedCountDown(
                        concatNodeConfig(hostPort.first, hostPort.second, configName, configValue), -1);
            } catch (Exception e) {
                LOG.warn("set be:{} config:{} failed.", NetUtils
                        .getHostPortInAccessibleFormat(hostPort.first, hostPort.second),
                        configName + "=" + configValue, e);
                beSetConfigDoneSignal.countDown();
            }
        }
    }

    // Store persistent and non-persistent configuration information that needs to be modified on the node.
    public static class NodeConfigs {
        private Pair<String, Integer> hostPort;
        private Map<String, String> persistConfigs;
        private Map<String, String> nonPersistConfigs;

        public NodeConfigs(String host, Integer httpPort) {
            hostPort = Pair.of(host, httpPort);
            persistConfigs = Maps.newHashMap();
            nonPersistConfigs = Maps.newHashMap();
        }

        public Pair<String, Integer> getHostPort() {
            return hostPort;
        }

        public void addConfig(String name, String value, boolean persist) {
            if (persist) {
                persistConfigs.put(name, value);
            } else {
                nonPersistConfigs.put(name, value);
            }
        }

        public Map<String, String> getConfigs(boolean isPersist) {
            return isPersist ? persistConfigs : nonPersistConfigs;
        }

    }

    @Getter
    @Setter
    public static class ConfigInfoRequestBody {
        @JsonProperty("conf_name")
        public List<String> confNames;

        @JsonProperty("node")
        public List<String> nodes;
    }

    @Getter
    @Setter
    public static class SetConfigRequestBody {
        @JsonProperty("node")
        private List<String> nodes;

        private String value;

        private boolean persist;
    }
}