QueueToken.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.common.UserException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
// used to mark QueryQueue offer result
// if offer failed, then need to cancel query
// and return failed reason to user client
public class QueueToken implements Comparable<QueueToken> {
private static final Logger LOG = LogManager.getLogger(QueueToken.class);
@Override
public int compareTo(QueueToken other) {
return Long.compare(this.tokenId, other.getTokenId());
}
public enum TokenState {
ENQUEUE_SUCCESS,
READY_TO_RUN
}
static AtomicLong tokenIdGenerator = new AtomicLong(0);
private long tokenId = 0;
private volatile TokenState tokenState;
private long queueWaitTimeout = 0;
private long queueStartTime = -1;
private long queueEndTime = -1;
private final int querySlotCount;
private volatile String queueMsg = "";
QueryQueue queryQueue = null;
// Object is just a placeholder, it's meaningless now
private CompletableFuture<Object> future;
public QueueToken(long queueWaitTimeout, int querySlotCount, QueryQueue queryQueue) {
this.tokenId = tokenIdGenerator.addAndGet(1);
this.queueWaitTimeout = queueWaitTimeout;
this.queueStartTime = System.currentTimeMillis();
this.querySlotCount = querySlotCount;
this.queryQueue = queryQueue;
this.future = new CompletableFuture<>();
}
public void setQueueMsg(String msg) {
this.queueMsg = msg;
}
public void setTokenState(TokenState tokenState) {
this.tokenState = tokenState;
}
public String getQueueMsg() {
return queueMsg;
}
public void get(String queryId, int queryTimeout) throws UserException {
if (isReadyToRun()) {
return;
}
long waitTimeout = queueWaitTimeout > 0 ? Math.min(queueWaitTimeout, queryTimeout) : queryTimeout;
waitTimeout = waitTimeout <= 0 ? 4096 : waitTimeout;
try {
future.get(waitTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new UserException("query queue timeout, timeout: " + waitTimeout + " ms ");
} catch (CancellationException e) {
throw new UserException("query is cancelled");
} catch (Throwable t) {
String errMsg = String.format("error happens when query {} queue", queryId);
LOG.error(errMsg, t);
throw new RuntimeException(errMsg, t);
}
}
public void complete() {
this.queueEndTime = System.currentTimeMillis();
this.tokenState = TokenState.READY_TO_RUN;
this.setQueueMsg("RUNNING");
future.complete(null);
}
public void notifyWaitQuery() {
this.queryQueue.notifyWaitQuery();
}
public void cancel() {
future.cancel(true);
}
public long getQueueStartTime() {
return queueStartTime;
}
public long getQueueEndTime() {
return queueEndTime;
}
public boolean isReadyToRun() {
return tokenState == TokenState.READY_TO_RUN;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
QueueToken other = (QueueToken) obj;
return tokenId == other.tokenId;
}
public long getTokenId() {
return tokenId;
}
public int getQuerySlotCount() {
return querySlotCount;
}
}