WorkloadGroupMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.analysis.AlterWorkloadGroupStmt;
import org.apache.doris.analysis.CreateWorkloadGroupStmt;
import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.DropWorkloadGroupOperationLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPostProcessable {
public static final String DEFAULT_GROUP_NAME = "normal";
public static final Long DEFAULT_GROUP_ID = 1L;
public static final ImmutableList<String> WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
.add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK)
.add(WorkloadGroup.TAG)
.add(WorkloadGroup.READ_BYTES_PER_SECOND).add(WorkloadGroup.REMOTE_READ_BYTES_PER_SECOND)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();
private static final Logger LOG = LogManager.getLogger(WorkloadGroupMgr.class);
@SerializedName(value = "idToWorkloadGroup")
private final Map<Long, WorkloadGroup> idToWorkloadGroup = Maps.newHashMap();
private final Map<String, WorkloadGroup> nameToWorkloadGroup = Maps.newHashMap();
private final Map<Long, QueryQueue> idToQueryQueue = Maps.newHashMap();
private final ResourceProcNode procNode = new ResourceProcNode();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Override
protected void runAfterCatalogReady() {
try {
resetQueryQueueProp();
} catch (Throwable e) {
LOG.warn("reset query queue failed ", e);
}
}
public void resetQueryQueueProp() {
List<QueryQueue> newPropList = new ArrayList<>();
Map<Long, QueryQueue> currentQueueCopyMap = new HashMap<>();
readLock();
try {
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
WorkloadGroup wg = entry.getValue();
QueryQueue tmpQ = new QueryQueue(wg.getId(), wg.getMaxConcurrency(),
wg.getMaxQueueSize(), wg.getQueueTimeout(), wg.getVersion());
newPropList.add(tmpQ);
}
for (Map.Entry<Long, QueryQueue> entry : idToQueryQueue.entrySet()) {
currentQueueCopyMap.put(entry.getKey(), entry.getValue());
}
} finally {
readUnlock();
}
for (QueryQueue newPropQq : newPropList) {
QueryQueue currentQueryQueue = currentQueueCopyMap.get(newPropQq.getWgId());
if (currentQueryQueue == null) {
continue;
}
if (newPropQq.getPropVersion() > currentQueryQueue.getPropVersion()) {
currentQueryQueue.resetQueueProperty(newPropQq.getMaxConcurrency(), newPropQq.getMaxQueueSize(),
newPropQq.getQueueTimeout(), newPropQq.getPropVersion());
}
if (LOG.isDebugEnabled()) {
LOG.debug(currentQueryQueue.debugString()); // for test debug
}
}
}
public WorkloadGroupMgr() {
super("workload-group-thread", Config.query_queue_update_interval_ms);
// if no fe image exist, we should append internal group here.
appendInternalWorkloadGroup();
}
public static WorkloadGroupMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
WorkloadGroupMgr ret = GsonUtils.GSON.fromJson(json, WorkloadGroupMgr.class);
ret.appendInternalWorkloadGroup();
return ret;
}
public void appendInternalWorkloadGroup() {
Map<String, String> properties = Maps.newHashMap();
properties.put(WorkloadGroup.CPU_SHARE, "1024");
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
properties.put(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT, "true");
WorkloadGroup defaultValWg = new WorkloadGroup(DEFAULT_GROUP_ID.longValue(), DEFAULT_GROUP_NAME,
properties);
// when doris version is 2.0, user create a normal group with id 12345
// when doris upgrade from 2.0 to 2.1.2, Doris may create a workload id with 1
// then doris could contain two normal workload group with id 12345 and 1
// so we should check duplicate workload group when Fe starts
// and remove invalid workload group.
// case 1: no images exist or has an image but has no normal wg,
// insert a normal group with id 1 and default value directly.
// case 2: image exits and has a normal group, then do nothing.
Set<Long> invalidNormalWg = new HashSet<>();
for (WorkloadGroup curWg : idToWorkloadGroup.values()) {
if (DEFAULT_GROUP_NAME.equals(curWg.getName()) && DEFAULT_GROUP_ID.longValue() != curWg.getId()) {
invalidNormalWg.add(curWg.getId());
}
}
for (Long wgId : invalidNormalWg) {
idToWorkloadGroup.remove(wgId);
}
WorkloadGroup curNormalWg = idToWorkloadGroup.get(DEFAULT_GROUP_ID);
if (curNormalWg == null) {
curNormalWg = defaultValWg;
idToWorkloadGroup.put(curNormalWg.getId(), curNormalWg);
}
nameToWorkloadGroup.put(curNormalWg.getName(), curNormalWg);
}
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public List<TPipelineWorkloadGroup> getWorkloadGroup(ConnectContext context) throws UserException {
String groupName = getWorkloadGroupNameAndCheckPriv(context);
List<TPipelineWorkloadGroup> workloadGroups = Lists.newArrayList();
readLock();
try {
WorkloadGroup workloadGroup = nameToWorkloadGroup.get(groupName);
if (workloadGroup == null) {
throw new UserException("Workload group " + groupName + " does not exist");
}
workloadGroups.add(workloadGroup.toThrift());
context.setWorkloadGroupName(groupName);
} finally {
readUnlock();
}
return workloadGroups;
}
public long getWorkloadGroup(UserIdentity currentUser, String groupName) throws UserException {
Long workloadId = getWorkloadGroupIdByName(groupName);
if (workloadId == null) {
throw new UserException("Workload group " + groupName + " does not exist");
}
if (!Env.getCurrentEnv().getAccessManager()
.checkWorkloadGroupPriv(currentUser, groupName, PrivPredicate.USAGE)) {
ErrorReport.reportAnalysisException(
"Access denied; you need (at least one of) the %s privilege(s) to use workload group '%s'.",
ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN", groupName);
}
return workloadId.longValue();
}
public List<TPipelineWorkloadGroup> getTWorkloadGroupById(long wgId) {
List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList();
readLock();
try {
WorkloadGroup wg = idToWorkloadGroup.get(wgId);
if (wg != null) {
tWorkloadGroups.add(wg.toThrift());
}
} finally {
readUnlock();
}
return tWorkloadGroups;
}
public List<TPipelineWorkloadGroup> getWorkloadGroupByUser(UserIdentity user, boolean checkAuth)
throws UserException {
String groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser());
List<TPipelineWorkloadGroup> ret = new ArrayList<>();
WorkloadGroup wg = null;
readLock();
try {
if (groupName == null || groupName.isEmpty()) {
wg = nameToWorkloadGroup.get(DEFAULT_GROUP_NAME);
if (wg == null) {
throw new RuntimeException("can not find normal workload group for user " + user);
}
} else {
wg = nameToWorkloadGroup.get(groupName);
if (wg == null) {
throw new UserException(
"can not find workload group " + groupName + " for user " + user);
}
}
if (checkAuth && !Env.getCurrentEnv().getAccessManager()
.checkWorkloadGroupPriv(user, wg.getName(), PrivPredicate.USAGE)) {
ErrorReport.reportAnalysisException(
"Access denied; you need (at least one of) the %s privilege(s) to use workload group '%s'."
+ " used id=(%s)",
ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN", wg.getName(), user.toString());
}
ret.add(wg.toThrift());
} finally {
readUnlock();
}
return ret;
}
public List<TopicInfo> getPublishTopicInfo() {
List<TopicInfo> workloadGroups = new ArrayList();
readLock();
try {
for (WorkloadGroup wg : idToWorkloadGroup.values()) {
workloadGroups.add(wg.toTopicInfo());
}
} finally {
readUnlock();
}
return workloadGroups;
}
public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context) throws UserException {
String groupName = getWorkloadGroupNameAndCheckPriv(context);
writeLock();
try {
WorkloadGroup wg = nameToWorkloadGroup.get(groupName);
if (wg == null) {
throw new UserException("Workload group " + groupName + " does not exist");
}
QueryQueue queryQueue = idToQueryQueue.get(wg.getId());
if (queryQueue == null) {
queryQueue = new QueryQueue(wg.getId(), wg.getMaxConcurrency(), wg.getMaxQueueSize(),
wg.getQueueTimeout(), wg.getVersion());
idToQueryQueue.put(wg.getId(), queryQueue);
}
return queryQueue;
} finally {
writeUnlock();
}
}
public Map<String, List<String>> getWorkloadGroupQueryDetail() {
Map<String, List<String>> ret = Maps.newHashMap();
readLock();
try {
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
Long wgId = entry.getKey();
WorkloadGroup wg = entry.getValue();
QueryQueue qq = idToQueryQueue.get(wgId);
List<String> valueList = new ArrayList<>(2);
if (qq == null) {
valueList.add("0");
valueList.add("0");
} else {
Pair<Integer, Integer> qdtail = qq.getQueryQueueDetail();
valueList.add(String.valueOf(qdtail.first));
valueList.add(String.valueOf(qdtail.second));
}
ret.put(wg.getName(), valueList);
}
} finally {
readUnlock();
}
return ret;
}
private String getWorkloadGroupNameAndCheckPriv(ConnectContext context) throws AnalysisException {
String groupName = context.getSessionVariable().getWorkloadGroup();
if (Strings.isNullOrEmpty(groupName)) {
groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(context.getQualifiedUser());
}
if (Strings.isNullOrEmpty(groupName)) {
groupName = DEFAULT_GROUP_NAME;
}
if (!Env.getCurrentEnv().getAccessManager().checkWorkloadGroupPriv(context, groupName, PrivPredicate.USAGE)) {
ErrorReport.reportAnalysisException(
"Access denied; you need (at least one of) the %s privilege(s) to use workload group '%s'.",
ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN", groupName);
}
return groupName;
}
public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlException {
WorkloadGroup workloadGroup = WorkloadGroup.create(stmt.getWorkloadGroupName(), stmt.getProperties());
String workloadGroupName = workloadGroup.getName();
writeLock();
try {
if (nameToWorkloadGroup.containsKey(workloadGroupName)) {
if (stmt.isIfNotExists()) {
return;
}
throw new DdlException("workload group " + workloadGroupName + " already exist");
}
if (idToWorkloadGroup.size() >= Config.workload_group_max_num) {
throw new DdlException(
"workload group number can not be exceed " + Config.workload_group_max_num);
}
checkGlobalUnlock(workloadGroup, null);
nameToWorkloadGroup.put(workloadGroupName, workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup);
} finally {
writeUnlock();
}
LOG.info("Create workload group success: {}", workloadGroup);
}
// NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit
// when create/alter workload group with same tag.
// when oldWg is null it means caller is an alter stmt.
private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException {
Optional<Set<String>> newWgTag = newWg.getTag();
Set<String> newWgTagSet = null;
if (newWgTag.isPresent()) {
newWgTagSet = newWgTag.get();
} else {
newWgTagSet = new HashSet<>();
newWgTagSet.add(null);
}
for (String newWgOneTag : newWgTagSet) {
double sumOfAllMemLimit = 0;
// 1 get sum value of all wg which has same tag without current wg
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
WorkloadGroup wg = entry.getValue();
Optional<Set<String>> wgTag = wg.getTag();
if (oldWg != null && entry.getKey() == oldWg.getId()) {
continue;
}
if (newWgOneTag == null) {
if (wgTag.isPresent()) {
continue;
}
} else if (!wgTag.isPresent() || (!wgTag.get().contains(newWgOneTag))) {
continue;
}
if (wg.getMemoryLimitPercentWhenCalSum() > 0) {
sumOfAllMemLimit += wg.getMemoryLimitPercentWhenCalSum();
}
}
// 2 sum current wg value
sumOfAllMemLimit += newWg.getMemoryLimitPercentWhenCalSum();
// 3 check total sum
if (Config.enable_wg_memory_sum_limit && sumOfAllMemLimit > 100.0 + 1e-6) {
throw new DdlException(
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + (
newWgTag.isPresent() ? newWgTag.get() : "")
+ " cannot be greater than 100.0%. current sum val:" + sumOfAllMemLimit);
}
}
}
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException {
String workloadGroupName = stmt.getWorkloadGroupName();
Map<String, String> properties = stmt.getProperties();
if (properties.size() == 0) {
throw new DdlException("alter workload group should contain at least one property");
}
WorkloadGroup newWorkloadGroup;
writeLock();
try {
if (!nameToWorkloadGroup.containsKey(workloadGroupName)) {
throw new DdlException("workload group(" + workloadGroupName + ") does not exist.");
}
WorkloadGroup currentWorkloadGroup = nameToWorkloadGroup.get(workloadGroupName);
newWorkloadGroup = WorkloadGroup.copyAndUpdate(currentWorkloadGroup, properties);
checkGlobalUnlock(newWorkloadGroup, currentWorkloadGroup);
nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup);
idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup);
// NOTE: used for regression test query queue
if (Config.enable_alter_queue_prop_sync) {
resetQueryQueueProp();
}
Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup);
} finally {
writeUnlock();
}
LOG.info("Alter resource success: {}", newWorkloadGroup);
}
public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException {
String workloadGroupName = stmt.getWorkloadGroupName();
if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
throw new DdlException("Dropping default workload group " + workloadGroupName + " is not allowed");
}
// if a workload group exists in user property, it should not be dropped
// user need to reset user property first
Pair<Boolean, String> ret = Env.getCurrentEnv().getAuth().isWorkloadGroupInUse(workloadGroupName);
if (ret.first) {
throw new DdlException("workload group " + workloadGroupName + " is set for user " + ret.second
+ ", you can reset the user's property(eg, "
+ "set property for " + ret.second + " 'default_workload_group'='xxx'; ), "
+ "then you can drop the group.");
}
// A group with related policies should not be deleted.
Long wgId = getWorkloadGroupIdByName(workloadGroupName);
if (wgId != null) {
boolean groupHasPolicy = Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
.checkWhetherGroupHasPolicy(wgId.longValue());
if (groupHasPolicy) {
throw new DdlException(
"workload group " + workloadGroupName + " can't be dropped, because it has related policy");
}
}
writeLock();
try {
if (!nameToWorkloadGroup.containsKey(workloadGroupName)) {
if (stmt.isIfExists()) {
return;
}
throw new DdlException("workload group " + workloadGroupName + " does not exist");
}
WorkloadGroup workloadGroup = nameToWorkloadGroup.get(workloadGroupName);
long groupId = workloadGroup.getId();
idToWorkloadGroup.remove(groupId);
nameToWorkloadGroup.remove(workloadGroupName);
idToQueryQueue.remove(groupId);
Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new DropWorkloadGroupOperationLog(groupId));
} finally {
writeUnlock();
}
LOG.info("Drop workload group success: {}", workloadGroupName);
}
private void insertWorkloadGroup(WorkloadGroup workloadGroup) {
writeLock();
try {
// when wg named normal but id is not DEFAULT_GROUP_ID,
// then we should abort it to avoid duplicate normal group
if (DEFAULT_GROUP_NAME.equals(workloadGroup.getName())
&& DEFAULT_GROUP_ID.longValue() != workloadGroup.getId()) {
return;
}
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup);
} finally {
writeUnlock();
}
}
public boolean isWorkloadGroupExists(String workloadGroupName) {
readLock();
try {
return nameToWorkloadGroup.containsKey(workloadGroupName);
} finally {
readUnlock();
}
}
public void replayCreateWorkloadGroup(WorkloadGroup workloadGroup) {
insertWorkloadGroup(workloadGroup);
}
public void replayAlterWorkloadGroup(WorkloadGroup workloadGroup) {
insertWorkloadGroup(workloadGroup);
}
public void replayDropWorkloadGroup(DropWorkloadGroupOperationLog operationLog) {
long id = operationLog.getId();
writeLock();
try {
if (!idToWorkloadGroup.containsKey(id)) {
return;
}
WorkloadGroup workloadGroup = idToWorkloadGroup.get(id);
nameToWorkloadGroup.remove(workloadGroup.getName());
idToWorkloadGroup.remove(id);
} finally {
writeUnlock();
}
}
public List<List<String>> getResourcesInfo(PatternMatcher matcher) {
UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity();
List<List<String>> rows = procNode.fetchResult(currentUserIdentity).getRows();
for (Iterator<List<String>> it = rows.iterator(); it.hasNext(); ) {
List<String> row = it.next();
if (matcher != null && !matcher.match(row.get(1))) {
it.remove();
}
}
return rows;
}
public List<List<String>> getResourcesInfo(TUserIdentity tCurrentUserIdentity) {
UserIdentity currentUserIdentity = UserIdentity.fromThrift(tCurrentUserIdentity);
return procNode.fetchResult(currentUserIdentity).getRows();
}
public Long getWorkloadGroupIdByName(String name) {
readLock();
try {
WorkloadGroup wg = nameToWorkloadGroup.get(name);
if (wg == null) {
return null;
}
return wg.getId();
} finally {
readUnlock();
}
}
public Map<Long, String> getIdToNameMap() {
Map<Long, String> ret = Maps.newHashMap();
readLock();
try {
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
ret.put(entry.getKey(), entry.getValue().getName());
}
return ret;
} finally {
readUnlock();
}
}
public String getWorkloadGroupNameById(Long id) {
readLock();
try {
WorkloadGroup wg = idToWorkloadGroup.get(id);
if (wg == null) {
return null;
}
return wg.getName();
} finally {
readUnlock();
}
}
// for ut
public Map<String, WorkloadGroup> getNameToWorkloadGroup() {
return nameToWorkloadGroup;
}
// for ut
public Map<Long, WorkloadGroup> getIdToWorkloadGroup() {
return idToWorkloadGroup;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
@Override
public void gsonPostProcess() throws IOException {
idToWorkloadGroup.forEach(
(id, workloadGroup) -> nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup));
}
public class ResourceProcNode {
public ProcResult fetchResult(UserIdentity currentUserIdentity) {
BaseProcResult result = new BaseProcResult();
result.setNames(WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES);
readLock();
try {
for (WorkloadGroup workloadGroup : idToWorkloadGroup.values()) {
if (!Env.getCurrentEnv().getAccessManager().checkWorkloadGroupPriv(currentUserIdentity,
workloadGroup.getName(), PrivPredicate.SHOW_WORKLOAD_GROUP)) {
continue;
}
workloadGroup.getProcNodeData(result, idToQueryQueue.get(workloadGroup.getId()));
}
} finally {
readUnlock();
}
return result;
}
}
}