QeProcessorImpl.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.profile.ProfileManager;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryProfile;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

public final class QeProcessorImpl implements QeProcessor {

    private static final Logger LOG = LogManager.getLogger(QeProcessorImpl.class);
    private Map<TUniqueId, QueryInfo> coordinatorMap;

    private Map<TUniqueId, Integer> queryToInstancesNum;
    private Map<String, AtomicInteger> userToInstancesCount;
    private ExecutorService writeProfileExecutor;

    public static final QeProcessor INSTANCE;

    static {
        INSTANCE = new QeProcessorImpl();
    }

    private QeProcessorImpl() {
        coordinatorMap = new ConcurrentHashMap<>();
        queryToInstancesNum = new ConcurrentHashMap<>();
        userToInstancesCount = new ConcurrentHashMap<>();
        // write profile to ProfileManager when query is running.
        writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(3, 100,
                "profile-write-pool", true);
    }

    private Status processQueryProfile(TQueryProfile profile, TNetworkAddress address, boolean isDone) {
        ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(profile.query_id);
        if (executionProfile == null) {
            // When auto_profile_threshold_ms is not -1, this branch will be very common.
            // So this log is set to debug level.
            if (LOG.isDebugEnabled()) {
                LOG.debug("Could not find execution profile, query {} be {}",
                        DebugUtil.printId(profile.query_id), address.toString());
            }
            return new Status(TStatusCode.NOT_FOUND, "Could not find execution profile with query id "
                    + DebugUtil.printId(profile.query_id));
        }

        // Update profile may cost a lot of time, use a separate pool to deal with it.
        try {
            writeProfileExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    executionProfile.updateProfile(profile, address, isDone);
                }
            });
        } catch (Exception e) {
            LOG.warn("Failed to submit profile write task, query {} be {}",
                                DebugUtil.printId(profile.query_id), address.toString());
            return new Status(TStatusCode.INTERNAL_ERROR, "Failed to submit profile write task");
        }

        return Status.OK;
    }

    @Override
    public Coordinator getCoordinator(TUniqueId queryId) {
        QueryInfo queryInfo = coordinatorMap.get(queryId);
        if (queryInfo != null) {
            return queryInfo.getCoord();
        }
        return null;
    }

    @Override
    public List<Coordinator> getAllCoordinators() {
        List<Coordinator> res = new ArrayList<>();

        for (QueryInfo co : coordinatorMap.values()) {
            res.add(co.coord);
        }
        return res;
    }

    @Override
    public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId());
        }
        final QueryInfo result = coordinatorMap.putIfAbsent(queryId, info);
        if (result != null) {
            throw new UserException("queryId " + queryId + " already exists");
        }
        // Should add the execution profile to profile manager, BE will report the profile to FE and FE
        // will update it in ProfileManager
        if (info.coord.getQueryOptions().enable_profile) {
            ProfileManager.getInstance().addExecutionProfile(info.getCoord().getExecutionProfile());
        }
    }

    @Override
    public void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException {
        if (!coordinatorMap.containsKey(queryId)) {
            throw new UserException("query not exists in coordinatorMap:" + DebugUtil.printId(queryId));
        }
        QueryInfo queryInfo = coordinatorMap.get(queryId);
        if (queryInfo.getConnectContext() != null
                && !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
        ) {
            String user = queryInfo.getConnectContext().getQualifiedUser();
            long maxQueryInstances = queryInfo.getConnectContext().getEnv().getAuth().getMaxQueryInstances(user);
            if (maxQueryInstances <= 0) {
                maxQueryInstances = Config.default_max_query_instances;
            }
            if (maxQueryInstances > 0) {
                AtomicInteger currentCount = userToInstancesCount
                        .computeIfAbsent(user, ignored -> new AtomicInteger(0));
                // Many query can reach here.
                if (instancesNum + currentCount.get() > maxQueryInstances) {
                    throw new UserException("reach max_query_instances " + maxQueryInstances);
                }
            }
            queryToInstancesNum.put(queryId, instancesNum);
            userToInstancesCount.computeIfAbsent(user, ignored -> new AtomicInteger(0)).addAndGet(instancesNum);
            MetricRepo.USER_COUNTER_QUERY_INSTANCE_BEGIN.getOrAdd(user).increase(instancesNum.longValue());
        }
    }

    public Map<String, Integer> getInstancesNumPerUser() {
        return Maps.transformEntries(userToInstancesCount, (ignored, value) -> value != null ? value.get() : 0);
    }

    @Override
    public void unregisterQuery(TUniqueId queryId) {
        QueryInfo queryInfo = coordinatorMap.remove(queryId);
        if (queryInfo != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Deregister query id {}", DebugUtil.printId(queryId));
            }

            // Here we shuold use query option instead of ConnectContext,
            // because for the coordinator of load task, it does not have ConnectContext.
            if (queryInfo.getCoord().getQueryOptions().enable_profile) {
                ProfileManager.getInstance().markExecutionProfileFinished(queryId);
            }

            if (queryInfo.getConnectContext() != null
                    && !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
            ) {
                Integer num = queryToInstancesNum.remove(queryId);
                if (num != null) {
                    String user = queryInfo.getConnectContext().getQualifiedUser();
                    AtomicInteger instancesNum = userToInstancesCount.get(user);
                    if (instancesNum == null) {
                        LOG.warn("WTF?? query {} in queryToInstancesNum but not in userToInstancesCount",
                                DebugUtil.printId(queryId)
                        );
                    } else {
                        instancesNum.addAndGet(-num);
                    }
                }
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("not found query {} when unregisterQuery", DebugUtil.printId(queryId));
            }
        }

        // commit hive tranaction if needed
        Env.getCurrentHiveTransactionMgr().deregister(DebugUtil.printId(queryId));
    }

    @Override
    public Map<String, QueryStatisticsItem> getQueryStatistics() {
        final Map<String, QueryStatisticsItem> querySet = Maps.newHashMap();
        for (Map.Entry<TUniqueId, QueryInfo> entry : coordinatorMap.entrySet()) {
            final QueryInfo info = entry.getValue();
            final ConnectContext context = info.getConnectContext();
            if (info.sql == null || context == null) {
                continue;
            }
            final String queryIdStr = DebugUtil.printId(info.getConnectContext().queryId());
            final QueryStatisticsItem item = new QueryStatisticsItem.Builder().queryId(queryIdStr)
                    .queryStartTime(info.getStartExecTime()).sql(info.getSql()).user(context.getQualifiedUser())
                    .connId(String.valueOf(context.getConnectionId())).db(context.getDatabase())
                    .catalog(context.getDefaultCatalog())
                    .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
                    .profile(info.getCoord().getExecutionProfile().getRoot())
                    .isReportSucc(context.getSessionVariable().enableProfile()).build();
            querySet.put(queryIdStr, item);
        }
        return querySet;
    }

    @Override
    public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) {
        if (params.isSetQueryProfile()) {
            // Why not return response when process new profile failed?
            // First of all, we will do a refactor for report exec status in the future.
            // In that refactor, we will combine the report of exec status with query profile in a single rpc.
            // If we return error response in this pr, we will have problem when doing cluster upgrading.
            // For example, FE will return directly if it receives profile, but BE actually report exec status
            // with profile in a single rpc, this will make FE ignore the exec status and may lead to bug in query
            // like insert into select.
            if (params.isSetBackendId() && params.isSetDone()) {
                LOG.info("Receive profile {} report from {}, isDone {}, fragments {}",
                        DebugUtil.printId(params.getQueryProfile().getQueryId()), beAddr.toString(),
                        params.isDone(), params.getQueryProfile().fragment_id_to_profile.size());

                Backend backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId());
                if (backend == null) {
                    LOG.warn("Invalid report profile req, backend {} not found, query id: {}",
                            params.getBackendId(), DebugUtil.printId(params.getQueryProfile().getQueryId()));
                } else {
                    boolean isDone = params.isDone();
                    // the process status is ignored by design.
                    // actually be does not care the process status of profile on fe.
                    processQueryProfile(params.getQueryProfile(), backend.getHeartbeatAddress(), isDone);
                }
            } else {
                LOG.warn("Invalid report profile req, this is a logical error, BE must set backendId and isDone"
                            + " at same time, query id: {}", DebugUtil.printId(params.query_id));
            }
        }

        final TReportExecStatusResult result = new TReportExecStatusResult();

        if (params.isSetReportWorkloadRuntimeStatus()) {
            Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().updateBeQueryStats(params.report_workload_runtime_status);
            if (!params.isSetQueryId()) {
                result.setStatus(new TStatus(TStatusCode.OK));
                return result;
            }
        }

        final QueryInfo info = coordinatorMap.get(params.query_id);
        result.setStatus(new TStatus(TStatusCode.OK));
        if (info == null) {
            // Currently, the execution of query is splited from the exec status process.
            // So, it is very likely that when exec status arrived on FE asynchronously, coordinator
            // has been removed from coordinatorMap.
            return result;
        }
        try {
            info.getCoord().updateFragmentExecStatus(params);
        } catch (Exception e) {
            LOG.warn("Exception during handle report, response: {}, query: {}, instance: {}", result.toString(),
                    DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id), e);
            return result;
        }
        result.setStatus(new TStatus(TStatusCode.OK));
        return result;
    }

    @Override
    public String getCurrentQueryByQueryId(TUniqueId queryId) {
        QueryInfo info = coordinatorMap.get(queryId);
        if (info != null && info.sql != null) {
            return info.sql;
        }
        return "";
    }

    public Map<String, QueryInfo> getQueryInfoMap() {
        Map<String, QueryInfo> retQueryInfoMap = Maps.newHashMap();
        Set<TUniqueId> queryIdSet = coordinatorMap.keySet();
        for (TUniqueId qid : queryIdSet) {
            QueryInfo queryInfo = coordinatorMap.get(qid);
            if (queryInfo != null) {
                retQueryInfoMap.put(DebugUtil.printId(qid), queryInfo);
            }
        }
        return retQueryInfoMap;
    }

    public static final class QueryInfo {
        private final ConnectContext connectContext;
        private final Coordinator coord;
        private final String sql;
        private long registerTimeMs = 0L;

        // from Export, Pull load, Insert
        public QueryInfo(Coordinator coord) {
            this(null, null, coord);
        }

        // from query
        public QueryInfo(ConnectContext connectContext, String sql, Coordinator coord) {
            this.connectContext = connectContext;
            this.coord = coord;
            this.sql = sql;
            this.registerTimeMs = System.currentTimeMillis();
        }

        public ConnectContext getConnectContext() {
            return connectContext;
        }

        public Coordinator getCoord() {
            return coord;
        }

        public String getSql() {
            return sql;
        }

        public long getStartExecTime() {
            if (coord.getQueueToken() != null) {
                return coord.getQueueToken().getQueueEndTime();
            }
            return registerTimeMs;
        }

        public long getQueueStartTime() {
            if (coord.getQueueToken() != null) {
                return coord.getQueueToken().getQueueStartTime();
            }
            return -1;
        }

        public long getQueueEndTime() {
            if (coord.getQueueToken() != null) {
                return coord.getQueueToken().getQueueEndTime();
            }
            return -1;
        }

        public String getQueueStatus() {
            if (coord.getQueueToken() != null) {
                return coord.getQueueToken().getQueueMsg();
            }
            return "";
        }
    }
}