QueryQueue.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.resource.workloadgroup;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.resource.AdmissionControl;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;

// note(wb) refer java BlockingQueue, but support altering capacity
// todo(wb) add wait time to profile
public class QueryQueue {

    private static final Logger LOG = LogManager.getLogger(QueryQueue.class);
    // note(wb) used unfair by default, need more test later
    private final ReentrantLock queueLock = new ReentrantLock();
    // resource group property
    private int maxConcurrency;
    private int maxQueueSize;
    private int queueTimeout; // ms

    public static final String RUNNING_QUERY_NUM = "running_query_num";
    public static final String WAITING_QUERY_NUM = "waiting_query_num";

    private long wgId;

    private long propVersion;

    private PriorityQueue<QueueToken> waitingQueryQueue;
    private Queue<QueueToken> runningQueryQueue;

    Pair<Integer, Integer> getQueryQueueDetail() {
        try {
            queueLock.lock();
            return Pair.of(runningQueryQueue.size(), waitingQueryQueue.size());
        } finally {
            queueLock.unlock();
        }
    }

    long getPropVersion() {
        return propVersion;
    }

    long getWgId() {
        return wgId;
    }

    int getMaxConcurrency() {
        return maxConcurrency;
    }

    int getMaxQueueSize() {
        return maxQueueSize;
    }

    int getQueueTimeout() {
        return queueTimeout;
    }

    public QueryQueue(long wgId, int maxConcurrency, int maxQueueSize, int queueTimeout, long propVersion) {
        this.wgId = wgId;
        this.maxConcurrency = maxConcurrency;
        this.maxQueueSize = maxQueueSize;
        this.queueTimeout = queueTimeout;
        this.propVersion = propVersion;
        this.waitingQueryQueue = new PriorityQueue<QueueToken>();
        this.runningQueryQueue = new LinkedList<QueueToken>();
    }

    public String debugString() {
        return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency
                + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout + ", currentRunningQueryNum="
                + runningQueryQueue.size() + ", currentWaitingQueryNum=" + waitingQueryQueue.size();
    }

    public int usedSlotCount() {
        int cnt = 0;
        for (Iterator iterator = runningQueryQueue.iterator(); iterator.hasNext();) {
            QueueToken queueToken = (QueueToken) iterator.next();
            cnt += queueToken.getQuerySlotCount();
        }
        return cnt;
    }

    public QueueToken getToken(int querySlotCount) throws UserException {
        if (maxConcurrency > 0 && (querySlotCount > maxConcurrency || querySlotCount < 1)) {
            throw new UserException("query slot count " + querySlotCount
                    + " should be smaller than workload group's max concurrency "
                    + maxConcurrency + " and > 0");
        }
        AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl();
        queueLock.lock();
        try {
            if (LOG.isDebugEnabled()) {
                LOG.info(this.debugString());
            }
            QueueToken queueToken = new QueueToken(queueTimeout, querySlotCount, this);

            boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency;
            boolean hasFreeSlot = queueToken.getQuerySlotCount() <= maxConcurrency - usedSlotCount();
            boolean isResourceAvailable = admissionControl.checkResourceAvailable(queueToken);
            if (!isReachMaxCon && isResourceAvailable && hasFreeSlot) {
                runningQueryQueue.offer(queueToken);
                queueToken.complete();
                return queueToken;
            } else if (waitingQueryQueue.size() >= maxQueueSize) {
                throw new UserException("query waiting queue is full, queue capacity=" + maxQueueSize
                        + ", waiting num=" + waitingQueryQueue.size());
            } else {
                if (!hasFreeSlot) {
                    queueToken.setQueueMsg("NO_FREE_SLOT");
                }
                if (isReachMaxCon) {
                    queueToken.setQueueMsg("WAIT_IN_QUEUE");
                }
                queueToken.setTokenState(TokenState.ENQUEUE_SUCCESS);
                this.waitingQueryQueue.offer(queueToken);
                // if a query is added to wg's queue but not in AdmissionControl's
                // queue may be blocked by be memory later,
                // then we should put query to AdmissionControl in releaseAndNotify, it's too complicated.
                // To simplify the code logic, put all waiting query to AdmissionControl,
                // waiting query can be notified when query finish or memory is enough.
                admissionControl.addQueueToken(queueToken);
            }
            return queueToken;
        } finally {
            if (LOG.isDebugEnabled()) {
                LOG.info(this.debugString());
            }
            queueLock.unlock();
        }
    }

    public void notifyWaitQuery() {
        releaseAndNotify(null);
    }

    public void releaseAndNotify(QueueToken releaseToken) {
        AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl();
        queueLock.lock();
        try {
            if (releaseToken != null) {
                runningQueryQueue.remove(releaseToken);
                waitingQueryQueue.remove(releaseToken);
                admissionControl.removeQueueToken(releaseToken);
            }
            while (runningQueryQueue.size() < maxConcurrency) {
                QueueToken queueToken = waitingQueryQueue.peek();
                if (queueToken == null) {
                    break;
                }
                if (queueToken.getQuerySlotCount() > maxConcurrency - usedSlotCount()) {
                    break;
                }
                if (admissionControl.checkResourceAvailable(queueToken)) {
                    queueToken.complete();
                    runningQueryQueue.offer(queueToken);
                    waitingQueryQueue.remove();
                    admissionControl.removeQueueToken(queueToken);
                } else {
                    break;
                }
            }
        } finally {
            queueLock.unlock();
            if (LOG.isDebugEnabled()) {
                LOG.info(this.debugString());
            }
        }
    }

    public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout, long version) {
        queueLock.lock();
        try {
            this.maxConcurrency = maxConcurrency;
            this.maxQueueSize = maxQueueSize;
            this.queueTimeout = queryWaitTimeout;
            this.propVersion = version;
        } finally {
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.debugString());
            }
            queueLock.unlock();
        }
    }

}