KillUtils.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.utils;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.FEOpExecutor;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
 * Utility class for killing queries and connections.
 */
public class KillUtils {
    private static final Logger LOG = LogManager.getLogger(KillUtils.class);

    /**
     * Kill a query by query id or connection id.
     *
     * @param ctx the current connect context
     * @param killConnection true if kill connection, false if only kill query
     * @param queryId the query id to kill
     * @param connectionId the connection id to kill
     * @param stmt the origin kill statement, which may need to be forwarded to other FE
     */
    public static void kill(ConnectContext ctx, boolean killConnection, String queryId, int connectionId,
            OriginStatement stmt) throws UserException {
        if (killConnection) {
            // kill connection connection_id
            // kill connection_id
            Preconditions.checkState(connectionId >= 0, connectionId);
            killByConnectionId(ctx, true, connectionId);
        } else {
            if (!Strings.isNullOrEmpty(queryId)) {
                // kill query "query_id"
                killQueryByQueryId(ctx, queryId, stmt);
            } else {
                // kill query connection_id
                Preconditions.checkState(connectionId >= 0, connectionId);
                killByConnectionId(ctx, false, connectionId);
            }
        }
    }

    /**
     * Kill a query by query id.
     *
     * @param ctx the current connect context
     * @param queryId the query id to kill
     * @param stmt the origin kill statement, which may need to be forwarded to other FE
     */
    @VisibleForTesting
    public static void killQueryByQueryId(ConnectContext ctx, String queryId, OriginStatement stmt)
            throws UserException {
        // 1. First, try to find the query in the current FE and kill it
        if (killByQueryIdOnCurrentNode(ctx, queryId)) {
            return;
        }

        if (ctx.isProxy()) {
            // The query is not found in the current FE, and the command is forwarded from other FE.
            // return error to let the proxy FE to handle it.
            if (LOG.isDebugEnabled()) {
                LOG.debug("kill query '{}' in proxy mode but not found", queryId);
            }
            ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
        }

        // 2. Query not found in current FE, try to kill the query in other FE.
        List<String> errMsgs = Lists.newArrayList();
        for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) {
            if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
                continue;
            }

            TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
            FEOpExecutor executor = new FEOpExecutor(feAddr, stmt, ConnectContext.get(), false);
            if (LOG.isDebugEnabled()) {
                LOG.debug("try kill query '{}' to FE: {}", queryId, feAddr.toString());
            }
            try {
                executor.execute();
            } catch (Exception e) {
                throw new DdlException(e.getMessage(), e);
            }
            if (executor.getStatusCode() != TStatusCode.OK.getValue()) {
                // The query is not found in this FE, continue to find in other FEs
                // and save error msg
                errMsgs.add(String.format("failed to apply to fe %s:%s, error message: %s",
                        fe.getHost(), fe.getRpcPort(), executor.getErrMsg()));
            } else {
                // Find query in other FE, just return
                ctx.getState().setOk();
                return;
            }
        }

        // 3. Query not found in any FE, try cancel the query in BE.
        if (LOG.isDebugEnabled()) {
            LOG.debug("not found query '{}' in any FE, try to kill it in BE. Messages: {}",
                    queryId, errMsgs);
        }
        ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
    }

    /**
     * Kill a query by query id on the current FE.
     *
     * @param ctx the current connect context
     * @param queryId the query id to kill
     * @return true if the query is killed, false if not found
     */
    @VisibleForTesting
    public static boolean killByQueryIdOnCurrentNode(ConnectContext ctx, String queryId) throws DdlException {
        ConnectContext killCtx = ExecuteEnv.getInstance().getScheduler().getContextWithQueryId(queryId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("kill query '{}' on current node", queryId);
        }
        if (killCtx != null) {
            // Check auth. Only user itself and user with admin priv can kill connection
            if (!killCtx.getQualifiedUser().equals(ctx.getQualifiedUser())
                    && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ctx, PrivPredicate.ADMIN)) {
                ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, queryId);
            }
            killCtx.kill(false);
            ctx.getState().setOk();
            return true;
        }
        return false;
    }

    /**
     * Kill a connection by connection id.
     *
     * @param ctx the current connect context
     * @param connectionId the connection id to kill
     */
    @VisibleForTesting
    public static void killByConnectionId(ConnectContext ctx, boolean killConnection, int connectionId)
            throws DdlException {
        ConnectContext killCtx = ctx.getConnectScheduler().getContext(connectionId);
        if (killCtx == null) {
            ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, connectionId);
        }
        if (ctx == killCtx) {
            // Suicide
            ctx.setKilled();
        } else {
            // Check auth
            // Only user itself and user with admin priv can kill connection
            if (!killCtx.getQualifiedUser().equals(ctx.getQualifiedUser())
                    && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ctx, PrivPredicate.ADMIN)) {
                ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, connectionId);
            }
            killCtx.kill(killConnection);
        }
        ctx.getState().setOk();
    }
}