WorkloadSchedPolicyMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.analysis.AlterWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.CreateWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.DropWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.thrift.TCompareOperator;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TWorkloadActionType;
import org.apache.doris.thrift.TWorkloadMetricType;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(WorkloadSchedPolicyMgr.class);
@SerializedName(value = "idToPolicy")
private Map<Long, WorkloadSchedPolicy> idToPolicy = Maps.newConcurrentMap();
private Map<String, WorkloadSchedPolicy> nameToPolicy = Maps.newHashMap();
private PolicyProcNode policyProcNode = new PolicyProcNode();
public WorkloadSchedPolicyMgr() {
super("workload-sched-thread", Config.workload_sched_policy_interval_ms);
}
public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
= new ImmutableList.Builder<String>()
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
.add("WorkloadGroup")
.build();
public static final ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP
= new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>()
.put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
.put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
.put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL)
.put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
.put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build();
public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET
= new ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.SET_SESSION_VARIABLE).build();
public static final ImmutableSet<WorkloadMetricType> FE_METRIC_SET
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.USERNAME)
.build();
public static final ImmutableSet<WorkloadActionType> BE_ACTION_SET
= new ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.MOVE_QUERY_TO_GROUP)
.add(WorkloadActionType.CANCEL_QUERY).build();
public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
.add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
// used for convert fe type to thrift type
public static final ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP
= new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>()
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
.put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS)
.put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES)
.put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
public static final ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP
= new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>()
.put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP)
.put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build();
public static final Map<String, WorkloadMetricType> STRING_METRIC_MAP = new HashMap<>();
public static final Map<String, WorkloadActionType> STRING_ACTION_MAP = new HashMap<>();
static {
for (WorkloadMetricType metricType : FE_METRIC_SET) {
STRING_METRIC_MAP.put(metricType.toString(), metricType);
}
for (WorkloadMetricType metricType : BE_METRIC_SET) {
STRING_METRIC_MAP.put(metricType.toString(), metricType);
}
for (WorkloadActionType actionType : FE_ACTION_SET) {
STRING_ACTION_MAP.put(actionType.toString(), actionType);
}
for (WorkloadActionType actionType : BE_ACTION_SET) {
STRING_ACTION_MAP.put(actionType.toString(), actionType);
}
}
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static Comparator<WorkloadSchedPolicy> policyComparator = new Comparator<WorkloadSchedPolicy>() {
@Override
public int compare(WorkloadSchedPolicy p1, WorkloadSchedPolicy p2) {
return p2.getPriority() - p1.getPriority();
}
};
@Override
protected void runAfterCatalogReady() {
try {
// todo(wb) add more query info source, not only comes from connectionmap
// 1 get query info map
Map<Integer, ConnectContext> connectMap = ExecuteEnv.getInstance().getScheduler()
.getConnectionMap();
List<WorkloadQueryInfo> queryInfoList = new ArrayList<>();
// a snapshot for connect context
Set<Integer> keySet = new HashSet<>();
keySet.addAll(connectMap.keySet());
for (Integer connectId : keySet) {
ConnectContext cctx = connectMap.get(connectId);
if (cctx == null || cctx.isKilled()) {
continue;
}
String username = cctx.getQualifiedUser();
WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo();
policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId());
policyQueryInfo.tUniqueId = cctx.queryId();
policyQueryInfo.context = cctx;
policyQueryInfo.metricMap = new HashMap<>();
policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username);
queryInfoList.add(policyQueryInfo);
}
// 2 exec policy
if (queryInfoList.size() > 0) {
execPolicy(queryInfoList);
}
} catch (Throwable t) {
LOG.error("[policy thread]error happens when exec policy");
}
}
public void createWorkloadSchedPolicy(String policyName, boolean isIfNotExists,
List<WorkloadConditionMeta> originConditions, List<WorkloadActionMeta> originActions,
Map<String, String> propMap) throws UserException {
// 1 create condition
List<WorkloadCondition> policyConditionList = new ArrayList<>();
for (WorkloadConditionMeta cm : originConditions) {
WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm);
policyConditionList.add(cond);
}
boolean feCondition = checkPolicyCondition(policyConditionList);
// 2 create action
List<WorkloadAction> policyActionList = new ArrayList<>();
for (WorkloadActionMeta workloadActionMeta : originActions) {
// todo(wb) support move action
WorkloadAction ret = WorkloadAction.createWorkloadAction(workloadActionMeta);
policyActionList.add(ret);
}
boolean feAction = checkPolicyAction(policyActionList);
if (feAction != feCondition) {
throw new UserException("action and metric must run in FE together or run in BE together");
}
// 3 create policy
if (propMap == null) {
propMap = new HashMap<>();
}
List<Long> wgIdList = new ArrayList<>();
if (propMap.size() != 0) {
checkProperties(propMap, wgIdList);
}
writeLock();
try {
if (nameToPolicy.containsKey(policyName)) {
if (isIfNotExists) {
return;
} else {
throw new UserException("workload schedule policy " + policyName + " already exists ");
}
}
if (idToPolicy.size() >= Config.workload_max_policy_num) {
throw new UserException(
"workload scheduler policy num can not exceed " + Config.workload_max_policy_num);
}
long id = Env.getCurrentEnv().getNextId();
WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, policyName,
policyConditionList, policyActionList, propMap, wgIdList);
policy.setConditionMeta(originConditions);
policy.setActionMeta(originActions);
Env.getCurrentEnv().getEditLog().logCreateWorkloadSchedPolicy(policy);
idToPolicy.put(id, policy);
nameToPolicy.put(policyName, policy);
} finally {
writeUnlock();
}
}
public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt createStmt) throws UserException {
String policyName = createStmt.getPolicyName();
List<WorkloadConditionMeta> originConditions = createStmt.getConditions();
List<WorkloadActionMeta> originActions = createStmt.getActions();
Map<String, String> propMap = createStmt.getProperties();
boolean isIfNotExists = createStmt.isIfNotExists();
createWorkloadSchedPolicy(policyName, isIfNotExists, originConditions, originActions, propMap);
}
private boolean checkPolicyCondition(List<WorkloadCondition> conditionList) throws UserException {
if (conditionList.size() > Config.workload_max_condition_num_in_policy) {
throw new UserException(
"condition num in a policy can not exceed " + Config.workload_max_condition_num_in_policy);
}
boolean containsFeMetric = false;
boolean containsBeMetric = false;
for (WorkloadCondition cond : conditionList) {
if (FE_METRIC_SET.contains(cond.getMetricType())) {
containsFeMetric = true;
}
if (BE_METRIC_SET.contains(cond.getMetricType())) {
containsBeMetric = true;
}
if (containsFeMetric && containsBeMetric) {
throw new UserException(
"one policy can not contains fe and be metric, FE metric list is " + FE_METRIC_SET
+ ", BE metric list is " + BE_METRIC_SET);
}
}
return containsFeMetric;
}
private boolean checkPolicyAction(List<WorkloadAction> actionList) throws UserException {
if (actionList.size() > Config.workload_max_action_num_in_policy) {
throw new UserException(
"action num in one policy can not exceed " + Config.workload_max_action_num_in_policy);
}
Set<WorkloadActionType> actionTypeSet = new HashSet<>();
Set<String> setSessionVarSet = new HashSet<>();
boolean containsFeAction = false;
boolean containsBeAction = false;
for (WorkloadAction action : actionList) {
if (FE_ACTION_SET.contains(action.getWorkloadActionType())) {
containsFeAction = true;
}
if (BE_ACTION_SET.contains(action.getWorkloadActionType())) {
containsBeAction = true;
}
if (containsFeAction && containsBeAction) {
throw new UserException(
"one policy can not contains fe and be action, FE action list is " + FE_ACTION_SET
+ ", BE action list is " + BE_ACTION_SET);
}
// set session var cmd can be duplicate, but args can not be duplicate
if (action.getWorkloadActionType().equals(WorkloadActionType.SET_SESSION_VARIABLE)) {
WorkloadActionSetSessionVar setAction = (WorkloadActionSetSessionVar) action;
if (!setSessionVarSet.add(setAction.getVarName())) {
throw new UserException(
"duplicate set_session_variable action args one policy, " + setAction.getVarName());
}
} else if (!actionTypeSet.add(action.getWorkloadActionType())) {
throw new UserException("duplicate action in one policy");
}
}
if (actionTypeSet.contains(WorkloadActionType.CANCEL_QUERY) && actionTypeSet.contains(
WorkloadActionType.MOVE_QUERY_TO_GROUP)) {
throw new UserException(String.format("%s and %s can not exist in one policy at same time",
WorkloadActionType.CANCEL_QUERY, WorkloadActionType.MOVE_QUERY_TO_GROUP));
}
return containsFeAction;
}
public void execPolicy(List<WorkloadQueryInfo> queryInfoList) {
// 1 get a snapshot of policy
Set<Long> policyIdSet = new HashSet<>();
readLock();
try {
for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) {
if (entry.getValue().isFePolicy()) {
policyIdSet.add(entry.getKey());
}
}
} finally {
readUnlock();
}
for (WorkloadQueryInfo queryInfo : queryInfoList) {
try {
// 1 check policy is match
Map<WorkloadActionType, Queue<WorkloadSchedPolicy>> matchedPolicyMap = Maps.newHashMap();
for (Long policyId : policyIdSet) {
WorkloadSchedPolicy policy = idToPolicy.get(policyId);
if (policy == null) {
continue;
}
if (policy.isEnabled() && policy.isMatch(queryInfo)) {
WorkloadActionType actionType = policy.getFirstActionType();
// add to priority queue
Queue<WorkloadSchedPolicy> queue = matchedPolicyMap.get(actionType);
if (queue == null) {
queue = new PriorityQueue<>(policyComparator);
matchedPolicyMap.put(actionType, queue);
}
queue.offer(policy);
}
}
if (matchedPolicyMap.size() == 0) {
continue;
}
// 2 pick higher priority policy when action conflicts
List<WorkloadSchedPolicy> pickedPolicyList = pickPolicy(matchedPolicyMap);
// 3 exec action
for (WorkloadSchedPolicy policy : pickedPolicyList) {
policy.execAction(queryInfo);
}
} catch (Throwable e) {
LOG.warn("exec policy with query {} failed ", queryInfo.queryId, e);
}
}
}
List<WorkloadSchedPolicy> pickPolicy(Map<WorkloadActionType, Queue<WorkloadSchedPolicy>> policyMap) {
// NOTE(wb) currently all action share the same comparator which use priority.
// But later we may design every action type's own comparator,
// such as if two move group action has the same priority but move to different group,
// then we may pick group by resource usage and query statistics.
// 1 only need one policy with move action which has the highest priority
WorkloadSchedPolicy policyWithMoveAction = null;
Queue<WorkloadSchedPolicy> moveActionQueue = policyMap.get(WorkloadActionType.MOVE_QUERY_TO_GROUP);
if (moveActionQueue != null) {
policyWithMoveAction = moveActionQueue.peek();
}
// 2 only need one policy with cancel action which has the highest priority
WorkloadSchedPolicy policyWithCancelQueryAction = null;
Queue<WorkloadSchedPolicy> canelQueryActionQueue = policyMap.get(WorkloadActionType.CANCEL_QUERY);
if (canelQueryActionQueue != null) {
policyWithCancelQueryAction = canelQueryActionQueue.peek();
}
// 3 compare policy with move action and cancel action
List<WorkloadSchedPolicy> ret = new ArrayList<>();
if (policyWithMoveAction != null && policyWithCancelQueryAction != null) {
if (policyWithMoveAction.getPriority() > policyWithCancelQueryAction.getPriority()) {
ret.add(policyWithMoveAction);
} else {
ret.add(policyWithCancelQueryAction);
}
} else {
if (policyWithCancelQueryAction != null) {
ret.add(policyWithCancelQueryAction);
} else if (policyWithMoveAction != null) {
ret.add(policyWithMoveAction);
}
}
// 4 add no-conflict policy
for (Map.Entry<WorkloadActionType, Queue<WorkloadSchedPolicy>> entry : policyMap.entrySet()) {
WorkloadActionType type = entry.getKey();
Queue<WorkloadSchedPolicy> policyQueue = entry.getValue();
if (!WorkloadActionType.CANCEL_QUERY.equals(type) && !WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(type)
&& policyQueue != null && policyQueue.size() > 0) {
WorkloadSchedPolicy pickedPolicy = policyQueue.peek();
ret.add(pickedPolicy);
}
}
Preconditions.checkArgument(ret.size() > 0, "should pick at least one policy");
return ret;
}
private void checkProperties(Map<String, String> properties, List<Long> wgIdList) throws UserException {
Set<String> allInputPropKeySet = new HashSet<>();
allInputPropKeySet.addAll(properties.keySet());
allInputPropKeySet.removeAll(WorkloadSchedPolicy.POLICY_PROPERTIES);
if (allInputPropKeySet.size() > 0) {
throw new UserException("illegal policy properties " + String.join(",", allInputPropKeySet));
}
String enabledStr = properties.get(WorkloadSchedPolicy.ENABLED);
if (enabledStr != null) {
if (!"true".equals(enabledStr) && !"false".equals(enabledStr)) {
throw new UserException("invalid enabled property value, it can only true or false with lower case");
}
}
String priorityStr = properties.get(WorkloadSchedPolicy.PRIORITY);
if (priorityStr != null) {
try {
Long prioLongVal = Long.parseLong(priorityStr);
if (prioLongVal < 0 || prioLongVal > 100) {
throw new UserException("policy's priority can only between 0 ~ 100");
}
} catch (NumberFormatException e) {
throw new UserException(
"invalid priority property value, it must be a number, input value=" + priorityStr);
}
}
String workloadGroupNameStr = properties.get(WorkloadSchedPolicy.WORKLOAD_GROUP);
if (workloadGroupNameStr != null && !workloadGroupNameStr.isEmpty()) {
String cg = Config.isCloudMode() ? Tag.VALUE_DEFAULT_COMPUTE_GROUP_NAME : Tag.VALUE_DEFAULT_TAG;
String wg = "";
String[] ss = workloadGroupNameStr.split("\\.");
if (ss.length == 1) {
wg = ss[0];
} else if (ss.length == 2) {
cg = ss[0];
wg = ss[1];
} else {
throw new UserException("invalid workload group format: " + workloadGroupNameStr);
}
ConnectContext tmpCtx = new ConnectContext();
tmpCtx.setComputeGroup(
Env.getCurrentEnv().getComputeGroupMgr().getComputeGroupByName(cg));
tmpCtx.getSessionVariable().setWorkloadGroup(wg);
tmpCtx.setCurrentUserIdentity(UserIdentity.ROOT);
Long wgId = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(tmpCtx)
.get(0).getId();
wgIdList.add(wgId);
}
}
public void alterWorkloadSchedPolicy(AlterWorkloadSchedPolicyStmt alterStmt) throws UserException {
alterWorkloadSchedPolicy(alterStmt.getPolicyName(), alterStmt.getProperties());
}
public void alterWorkloadSchedPolicy(String policyName, Map<String, String> properties) throws UserException {
writeLock();
try {
WorkloadSchedPolicy policy = nameToPolicy.get(policyName);
if (policy == null) {
throw new UserException("can not find workload schedule policy " + policyName);
}
List<Long> wgIdList = new ArrayList<>();
checkProperties(properties, wgIdList);
policy.updatePropertyIfNotNull(properties, wgIdList);
policy.incrementVersion();
Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy);
} finally {
writeUnlock();
}
}
public void dropWorkloadSchedPolicy(DropWorkloadSchedPolicyStmt dropStmt) throws UserException {
dropWorkloadSchedPolicy(dropStmt.getPolicyName(), dropStmt.isIfExists());
}
public void dropWorkloadSchedPolicy(String policyName, boolean isExists) throws UserException {
writeLock();
try {
WorkloadSchedPolicy schedPolicy = nameToPolicy.get(policyName);
if (schedPolicy == null) {
if (isExists) {
return;
} else {
throw new UserException("workload schedule policy " + policyName + " not exists");
}
}
long id = schedPolicy.getId();
idToPolicy.remove(id);
nameToPolicy.remove(policyName);
Env.getCurrentEnv().getEditLog().dropWorkloadSchedPolicy(id);
} finally {
writeUnlock();
}
}
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public List<TopicInfo> getPublishTopicInfoList() {
List<TopicInfo> topicInfoList = new ArrayList();
readLock();
try {
for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) {
if (entry.getValue().isFePolicy()) {
continue;
}
TopicInfo tInfo = entry.getValue().toTopicInfo();
if (tInfo != null) {
topicInfoList.add(tInfo);
}
}
} finally {
readUnlock();
}
return topicInfoList;
}
public void replayCreateWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
insertWorkloadSchedPolicy(policy);
}
public void replayAlterWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
insertWorkloadSchedPolicy(policy);
}
public void replayDropWorkloadSchedPolicy(long policyId) {
writeLock();
try {
WorkloadSchedPolicy policy = idToPolicy.get(policyId);
if (policy == null) {
return;
}
idToPolicy.remove(policyId);
nameToPolicy.remove(policy.getName());
} finally {
writeUnlock();
}
}
private void insertWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
writeLock();
try {
idToPolicy.put(policy.getId(), policy);
nameToPolicy.put(policy.getName(), policy);
} finally {
writeUnlock();
}
}
public List<List<String>> getShowPolicyInfo() {
UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity();
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}
public List<List<String>> getWorkloadSchedPolicyTvfInfo(TUserIdentity tcurrentUserIdentity) {
UserIdentity currentUserIdentity = UserIdentity.fromThrift(tcurrentUserIdentity);
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}
public boolean checkWhetherGroupHasPolicy(long wgId) {
readLock();
try {
for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) {
if (entry.getValue().getWorkloadGroupIdList().contains(wgId)) {
return true;
}
}
} finally {
readUnlock();
}
return false;
}
public class PolicyProcNode {
public ProcResult fetchResult(UserIdentity currentUserIdentity) {
BaseProcResult result = new BaseProcResult();
result.setNames(WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES);
Map<Long, String> idToNameMap = Env.getCurrentEnv().getWorkloadGroupMgr().getIdToNameMap();
readLock();
try {
for (WorkloadSchedPolicy policy : idToPolicy.values()) {
if (!Env.getCurrentEnv().getAccessManager().checkWorkloadGroupPriv(currentUserIdentity,
policy.getName(), PrivPredicate.SHOW_WORKLOAD_GROUP)) {
continue;
}
List<String> row = new ArrayList<>();
String pName = policy.getName();
row.add(String.valueOf(policy.getId()));
row.add(pName);
List<WorkloadConditionMeta> conditionList = policy.getConditionMetaList();
StringBuilder cmStr = new StringBuilder();
for (WorkloadConditionMeta cm : conditionList) {
cmStr.append(cm.toString()).append(";");
}
String retStr = cmStr.toString().toLowerCase();
row.add(retStr.substring(0, retStr.length() - 1));
List<WorkloadActionMeta> actionList = policy.getActionMetaList();
StringBuilder actionStr = new StringBuilder();
for (WorkloadActionMeta am : actionList) {
actionStr.append(am.toString()).append(";");
}
String retStr2 = actionStr.toString().toLowerCase();
row.add(retStr2.substring(0, retStr2.length() - 1));
row.add(String.valueOf(policy.getPriority()));
row.add(String.valueOf(policy.isEnabled()));
row.add(String.valueOf(policy.getVersion()));
List<Long> wgIdList = policy.getWorkloadGroupIdList();
if (wgIdList.size() == 0) {
row.add("");
} else {
Long wgId = wgIdList.get(0);
String wgName = idToNameMap.get(wgId);
if (wgName == null) {
row.add("null");
} else {
row.add(wgName);
}
}
result.addRow(row);
}
} finally {
readUnlock();
}
return result;
}
}
public static WorkloadSchedPolicyMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, WorkloadSchedPolicyMgr.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
@Override
public void gsonPostProcess() throws IOException {
idToPolicy.forEach(
(id, schedPolicy) -> nameToPolicy.put(schedPolicy.getName(), schedPolicy));
}
}