QueryProfileAction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.httpv2.rest.manager;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.proc.CurrentQueryStatementsProcNode;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.profile.ProfileManager;
import org.apache.doris.common.profile.ProfileManager.ProfileElement;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/*
 * Used to return query information and query profile.
 * base: /rest/v2/manager/query
 * 1. /query_info
 * 2. /sql/{query_id}
 * 3. /profile/{format}/{query_id}
 * 4. /trace_id/{trace_id}
 * 5. /current_queries
 * 6. /kill/{query_id}
 */
@RestController
@RequestMapping("/rest/v2/manager/query")
public class QueryProfileAction extends RestBaseController {
    private static final Logger LOG = LogManager.getLogger(QueryProfileAction.class);

    public static final String QUERY_ID = "Query ID";
    public static final String NODE = "FE节点";
    public static final String USER = "查询用户";
    public static final String DEFAULT_DB = "执行数据库";
    public static final String SQL_STATEMENT = "Sql";
    public static final String QUERY_TYPE = "查询类型";
    public static final String START_TIME = "开始时间";
    public static final String END_TIME = "结束时间";
    public static final String TOTAL = "执行时长";
    public static final String QUERY_STATE = "状态";

    private static final String QUERY_ID_PARA = "query_id";
    private static final String SEARCH_PARA = "search";
    private static final String IS_ALL_NODE_PARA = "is_all_node";
    private static final String FRAGMENT_ID = "fragment_id";
    private static final String INSTANCE_ID = "instance_id";

    private static final String FRONTEND = "Frontend";

    public static final ImmutableList<String> QUERY_TITLE_NAMES = new ImmutableList.Builder<String>().add(QUERY_ID)
            .add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT).add(QUERY_TYPE).add(START_TIME).add(END_TIME)
            .add(TOTAL).add(QUERY_STATE).build();

    // As an old http api, query_info's output schema column is established since 1.2,
    // so it can not be changed for compatibility.
    // Now query_info api get data from ProfileManager, its column name is different but has the same meaning with
    // QUERY_TITLE_NAMES.
    // We should keep PROFILE_TITLE_NAMES and QUERY_TITLE_NAMES has the same meaning, and use PROFILE_TITLE_NAMES
    //  to get data from profile manager.
    public static final ImmutableList<String> PROFILE_TITLE_NAMES = new ImmutableList.Builder<String>()
            .add(SummaryProfile.PROFILE_ID).add(NODE)
            .add(SummaryProfile.USER).add(SummaryProfile.DEFAULT_DB).add(SummaryProfile.SQL_STATEMENT)
            .add(SummaryProfile.TASK_TYPE).add(SummaryProfile.START_TIME).add(SummaryProfile.END_TIME)
            .add(SummaryProfile.TOTAL_TIME).add(SummaryProfile.TASK_STATE).build();

    private List<String> requestAllFe(String httpPath, Map<String, String> arguments, String authorization,
            HttpMethod method) {
        List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
        ImmutableMap<String, String> header = ImmutableMap.<String, String>builder()
                .put(NodeAction.AUTHORIZATION, authorization).build();
        List<String> dataList = Lists.newArrayList();
        for (Pair<String, Integer> ipPort : frontends) {
            String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
            try {
                String data = null;
                if (method == HttpMethod.GET) {
                    data = HttpUtils.parseResponse(HttpUtils.doGet(url, header));
                } else if (method == HttpMethod.POST) {
                    data = HttpUtils.parseResponse(HttpUtils.doPost(url, header, null));
                }
                if (!Strings.isNullOrEmpty(data) && !data.equals("{}")) {
                    dataList.add(data);
                }
            } catch (Exception e) {
                LOG.warn("request url {} error", url, e);
            }
        }
        return dataList;
    }

    @RequestMapping(path = "/query_info", method = RequestMethod.GET)
    public Object queryInfo(HttpServletRequest request, HttpServletResponse response,
                            @RequestParam(value = QUERY_ID_PARA, required = false) String queryId,
                            @RequestParam(value = SEARCH_PARA, required = false) String search,
                            @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true")
                                    boolean isAllNode) {
        executeCheckPassword(request, response);

        List<List<String>> queries = Lists.newArrayList();
        if (isAllNode) {
            // Get query information for all fe
            String httpPath = "/rest/v2/manager/query/query_info";
            Map<String, String> arguments = Maps.newHashMap();
            arguments.put(QUERY_ID_PARA, queryId);
            if (!Strings.isNullOrEmpty(search)) {
                try {
                    // search may contain special characters that need to be encoded.
                    search = URLEncoder.encode(search, StandardCharsets.UTF_8.toString());
                } catch (UnsupportedEncodingException ignored) {
                    // Encoding exception, ignore search parameter.
                }
            }
            arguments.put(SEARCH_PARA, search);
            arguments.put(IS_ALL_NODE_PARA, "false");

            List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION),
                    HttpMethod.GET);
            for (String data : dataList) {
                try {
                    NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
                    }.getType());
                    queries.addAll(nodeInfo.getRows());
                } catch (Exception e) {
                    LOG.warn("parse query info error: {}", data, e);
                }
            }
            return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(QUERY_TITLE_NAMES, queries));
        }

        Stream<List<String>> queryStream = ProfileManager.getInstance()
                .getQueryInfoByColumnNameList(PROFILE_TITLE_NAMES).stream()
                .filter(profile -> profile.get(5).equalsIgnoreCase("Query"));
        queryStream = filterQueriesByUserAndQueryId(queryStream, queryId);
        queries = queryStream.collect(Collectors.toList());

        // add node information
        for (List<String> query : queries) {
            query.set(1, NetUtils.getHostPortInAccessibleFormat(Env.getCurrentEnv().getSelfNode().getHost(),
                    Config.http_port));
        }

        if (!Strings.isNullOrEmpty(search)) {
            List<List<String>> tempQueries = Lists.newArrayList();
            for (List<String> query : queries) {
                for (String field : query) {
                    if (field.contains(search)) {
                        tempQueries.add(query);
                        break;
                    }
                }
            }
            queries = tempQueries;
        }

        return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(QUERY_TITLE_NAMES, queries));
    }

    // Returns the sql for the specified query id.
    @RequestMapping(path = "/sql/{query_id}", method = RequestMethod.GET)
    public Object queryInfo(HttpServletRequest request, HttpServletResponse response,
                            @PathVariable("query_id") String queryId,
                            @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true")
                                    boolean isAllNode) {
        executeCheckPassword(request, response);

        Map<String, String> querySql = Maps.newHashMap();
        if (isAllNode) {
            String httpPath = "/rest/v2/manager/query/sql/" + queryId;
            ImmutableMap<String, String> arguments = ImmutableMap.<String, String>builder()
                    .put(IS_ALL_NODE_PARA, "false").build();
            List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION),
                    HttpMethod.GET);
            if (!dataList.isEmpty()) {
                try {
                    String sql = JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("sql").getAsString();
                    querySql.put("sql", sql);
                    return ResponseEntityBuilder.ok(querySql);
                } catch (Exception e) {
                    LOG.warn("parse sql error: {}", dataList.get(0), e);
                }
            }
        } else {
            Stream<List<String>> queryStream = ProfileManager.getInstance().getAllQueries().stream();
            queryStream = filterQueriesByUserAndQueryId(queryStream, queryId);
            List<List<String>> queries = queryStream.collect(Collectors.toList());
            if (!queries.isEmpty()) {
                querySql.put("sql", queries.get(0).get(3));
            }
        }
        return ResponseEntityBuilder.ok(querySql);
    }

    private Stream<List<String>> filterQueriesByUserAndQueryId(Stream<List<String>> queryStream, String queryId) {
        // filter by user
        // Only admin or root user can see all profile.
        // Common user can only review the query of their own.
        String user = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
        if (!user.equalsIgnoreCase(Auth.ADMIN_USER) && !user.equalsIgnoreCase(Auth.ROOT_USER)) {
            queryStream = queryStream.filter(q -> q.get(1).equals(user));
        }
        if (!Strings.isNullOrEmpty(queryId)) {
            queryStream = queryStream.filter(query -> query.get(0).equals(queryId));
        }
        return queryStream;
    }

    /**
     * Returns the text profile for the specified query id.
     * There are 3 formats:
     * 1. Text: return the entire profile of the specified query id
     * eg: {"profile": "text_xxx"}
     * <p>
     * 2. Json: return the profile in json.
     * Json format is mainly used for front-end UI drawing.
     * eg: {"profile" : "json_xxx"}
     */
    @RequestMapping(path = "/profile/{format}/{query_id}", method = RequestMethod.GET)
    public Object queryProfileText(HttpServletRequest request, HttpServletResponse response,
            @PathVariable("format") String format, @PathVariable("query_id") String queryId,
            @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") boolean isAllNode) {
        executeCheckPassword(request, response);

        if (!isAllNode) {
            try {
                checkAuthByUserAndQueryId(queryId);
            } catch (AuthenticationException e) {
                return ResponseEntityBuilder.badRequest(e.getMessage());
            }
        }

        if (format.equals("text")) {
            return getTextProfile(request, queryId, isAllNode);
        } else if (format.equals("json")) {
            return getJsonProfile(request, queryId, isAllNode);
        } else {
            return ResponseEntityBuilder.badRequest("Invalid profile format: " + format);
        }
    }

    /**
     * Get query id by trace id
     *
     * @param request
     * @param response
     * @param traceId
     * @param isAllNode
     * @return
     */
    @RequestMapping(path = "/trace_id/{trace_id}", method = RequestMethod.GET)
    public Object getQueryIdByTraceId(HttpServletRequest request, HttpServletResponse response,
            @PathVariable("trace_id") String traceId,
            @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") boolean isAllNode) {
        executeCheckPassword(request, response);

        if (isAllNode) {
            String httpPath = "/rest/v2/manager/query/trace_id/" + traceId;
            ImmutableMap<String, String> arguments =
                    ImmutableMap.<String, String>builder().put(IS_ALL_NODE_PARA, "false").build();
            List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
            ImmutableMap<String, String> header = ImmutableMap.<String, String>builder()
                    .put(NodeAction.AUTHORIZATION, request.getHeader(NodeAction.AUTHORIZATION)).build();
            for (Pair<String, Integer> ipPort : frontends) {
                String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
                try {
                    String responseJson = HttpUtils.doGet(url, header);
                    int code = JsonParser.parseString(responseJson).getAsJsonObject().get("code").getAsInt();
                    if (code == HttpUtils.REQUEST_SUCCESS_CODE) {
                        return responseJson;
                    }
                } catch (Exception e) {
                    LOG.warn(e);
                }
            }
        } else {
            ExecuteEnv env = ExecuteEnv.getInstance();
            String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
            if (Strings.isNullOrEmpty(queryId)) {
                return ResponseEntityBuilder.badRequest("Not found");
            }

            try {
                checkAuthByUserAndQueryId(queryId);
            } catch (AuthenticationException e) {
                return ResponseEntityBuilder.badRequest(e.getMessage());
            }

            return ResponseEntityBuilder.ok(queryId);
        }
        return ResponseEntityBuilder.badRequest("not found query id");
    }

    /**
     *  Query qError.
     */
    @RequestMapping(path = "/qerror/{id}", method = RequestMethod.GET)
    public ResponseEntity<String> getStats(@PathVariable(value = "id") String id) {
        ProfileElement profile = ProfileManager.getInstance().findProfileElementObject(id);
        if (profile == null) {
            return ResponseEntityBuilder.notFound(null);
        }
        StatsErrorEstimator statsErrorEstimator = profile.statsErrorEstimator;
        if (statsErrorEstimator == null) {
            return ResponseEntityBuilder.notFound(null);
        }
        return ResponseEntity.ok(GsonUtils.GSON.toJson(statsErrorEstimator));
    }

    @NotNull
    private ResponseEntity getTextProfile(HttpServletRequest request, String queryId, boolean isAllNode) {
        Map<String, String> profileMap = Maps.newHashMap();
        if (isAllNode) {
            return getProfileFromAllFrontends(request, "text", queryId, "", "");
        } else {
            String profile = ProfileManager.getInstance().getProfile(queryId);
            if (!Strings.isNullOrEmpty(profile)) {
                profileMap.put("profile", profile);
            }
        }
        return ResponseEntityBuilder.ok(profileMap);
    }

    @NotNull
    private ResponseEntity getJsonProfile(HttpServletRequest request, String queryId, boolean isAllNode) {
        Map<String, String> graph = Maps.newHashMap();
        if (isAllNode) {
            return getProfileFromAllFrontends(request, "json", queryId, null, null);
        } else {
            try {
                String brief = ProfileManager.getInstance().getProfileBrief(queryId);
                graph.put("profile", brief);
            } catch (Exception e) {
                LOG.warn("get profile graph error, queryId:{}", queryId, e);
            }
        }
        return ResponseEntityBuilder.ok(graph);
    }

    @NotNull
    private ResponseEntity getProfileFromAllFrontends(HttpServletRequest request, String format, String queryId,
            String fragmentId, String instanceId) {
        String httpPath = "/rest/v2/manager/query/profile/" + format + "/" + queryId;
        ImmutableMap.Builder<String, String> builder =
                ImmutableMap.<String, String>builder().put(IS_ALL_NODE_PARA, "false");
        if (!Strings.isNullOrEmpty(fragmentId)) {
            builder.put(FRAGMENT_ID, fragmentId);
        }
        if (!Strings.isNullOrEmpty(instanceId)) {
            builder.put(INSTANCE_ID, instanceId);
        }
        List<String> dataList = requestAllFe(httpPath, builder.build(), request.getHeader(NodeAction.AUTHORIZATION),
                HttpMethod.GET);
        Map<String, String> result = Maps.newHashMap();
        if (!dataList.isEmpty()) {
            try {
                String key = format.equals("graph") ? "graph" : "profile";
                String profile = JsonParser.parseString(dataList.get(0)).getAsJsonObject().get(key).getAsString();
                result.put(key, profile);
            } catch (Exception e) {
                return ResponseEntityBuilder.badRequest(e.getMessage());
            }
        }
        return ResponseEntityBuilder.ok(result);
    }

    private void checkAuthByUserAndQueryId(String queryId) throws AuthenticationException {
        String user = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
        if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
            ProfileManager.getInstance().checkAuthByUserAndQueryId(user, queryId);
        }
    }

    /**
     * return the result of CurrentQueryStatementsProcNode.
     *
     * @param request
     * @param response
     * @param isAllNode
     * @return
     */
    @RequestMapping(path = "/current_queries", method = RequestMethod.GET)
    public Object currentQueries(HttpServletRequest request, HttpServletResponse response,
            @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") boolean isAllNode) {
        executeCheckPassword(request, response);
        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);

        if (isAllNode) {
            // Get current queries from all FE
            String httpPath = "/rest/v2/manager/query/current_queries";
            Map<String, String> arguments = Maps.newHashMap();
            arguments.put(IS_ALL_NODE_PARA, "false");
            List<List<String>> queries = Lists.newArrayList();
            List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION),
                    HttpMethod.GET);
            for (String data : dataList) {
                try {
                    NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
                    }.getType());
                    queries.addAll(nodeInfo.getRows());
                } catch (Exception e) {
                    LOG.warn("parse query info error: {}", data, e);
                }
            }
            List<String> titles = Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
            titles.add(0, FRONTEND);
            return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles, queries));
        } else {
            try {
                CurrentQueryStatementsProcNode node = new CurrentQueryStatementsProcNode();
                ProcResult result = node.fetchResult();
                // add frontend info at first column.
                List<String> titles = Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
                titles.add(0, FRONTEND);
                List<List<String>> rows = result.getRows();
                String feIp = FrontendOptions.getLocalHostAddress();
                for (List<String> row : rows) {
                    row.add(0, feIp);
                }
                return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles, rows));
            } catch (AnalysisException e) {
                return ResponseEntityBuilder.badRequest(e.getMessage());
            }
        }
    }

    /**
     * kill queries with specific query id
     *
     * @param request
     * @param response
     * @param queryId
     * @return
     */
    @RequestMapping(path = "/kill/{query_id}", method = RequestMethod.POST)
    public Object killQuery(HttpServletRequest request, HttpServletResponse response,
            @PathVariable("query_id") String queryId,
            @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") boolean isAllNode) {
        executeCheckPassword(request, response);
        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);

        if (isAllNode) {
            // Get current queries from all FE
            String httpPath = "/rest/v2/manager/query/kill/" + queryId;
            Map<String, String> arguments = Maps.newHashMap();
            arguments.put(IS_ALL_NODE_PARA, "false");
            requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION), HttpMethod.POST);
            return ResponseEntityBuilder.ok();
        }

        ExecuteEnv env = ExecuteEnv.getInstance();
        env.getScheduler().cancelQuery(queryId, new Status(TStatusCode.CANCELLED, "cancel query by rest api"));
        return ResponseEntityBuilder.ok();
    }
}