WorkloadSchedPolicy.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TWorkloadAction;
import org.apache.doris.thrift.TWorkloadActionType;
import org.apache.doris.thrift.TWorkloadCondition;
import org.apache.doris.thrift.TWorkloadMetricType;
import org.apache.doris.thrift.TWorkloadSchedPolicy;
import org.apache.doris.thrift.TopicInfo;
import com.esotericsoftware.minlog.Log;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
public static final String ENABLED = "enabled";
public static final String PRIORITY = "priority";
public static final String WORKLOAD_GROUP = "workload_group";
public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build();
@SerializedName(value = "id")
long id;
@SerializedName(value = "name")
String name;
@SerializedName(value = "version")
int version;
@SerializedName(value = "enabled")
private volatile boolean enabled;
@SerializedName(value = "priority")
private volatile int priority;
@SerializedName(value = "wgIdList")
private List<Long> workloadGroupIdList = new ArrayList<>();
@SerializedName(value = "conditionMetaList")
List<WorkloadConditionMeta> conditionMetaList;
// we regard action as a command, map's key is command, map's value is args, so it's a command list
@SerializedName(value = "actionMetaList")
List<WorkloadActionMeta> actionMetaList;
private List<WorkloadCondition> workloadConditionList;
private List<WorkloadAction> workloadActionList;
private Boolean isFePolicy = null;
// for ut
public WorkloadSchedPolicy() {
}
// for ut
public void setWorkloadConditionList(List<WorkloadCondition> workloadConditionList) {
this.workloadConditionList = workloadConditionList;
}
public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloadConditionList,
List<WorkloadAction> workloadActionList, Map<String, String> properties, List<Long> wgIdList) {
this.id = id;
this.name = name;
this.workloadConditionList = workloadConditionList;
this.workloadActionList = workloadActionList;
String enabledStr = properties.get(ENABLED);
this.enabled = enabledStr == null ? true : Boolean.parseBoolean(enabledStr);
String priorityStr = properties.get(PRIORITY);
this.priority = priorityStr == null ? 0 : Integer.parseInt(priorityStr);
this.workloadGroupIdList = wgIdList;
this.version = 0;
}
// return true, this means all conditions in policy can match queryInfo's data
// return false,
// 1 metric not match
// 2 condition value not match query info's value
public boolean isMatch(WorkloadQueryInfo queryInfo) {
for (WorkloadCondition condition : workloadConditionList) {
WorkloadMetricType metricType = condition.getMetricType();
String value = queryInfo.metricMap.get(metricType);
if (value == null) {
return false; // query info's metric must match all condition's metric
}
if (!condition.eval(value)) {
return false;
}
}
return true;
}
public boolean isEnabled() {
return enabled;
}
public int getPriority() {
return priority;
}
public void execAction(WorkloadQueryInfo queryInfo) {
for (WorkloadAction action : workloadActionList) {
action.exec(queryInfo);
}
}
// move > log, cancel > log
// move and cancel can not exist at same time
public WorkloadActionType getFirstActionType() {
WorkloadActionType retType = null;
for (WorkloadAction action : workloadActionList) {
WorkloadActionType currentActionType = action.getWorkloadActionType();
if (retType == null) {
retType = currentActionType;
continue;
}
if (currentActionType == WorkloadActionType.MOVE_QUERY_TO_GROUP
|| currentActionType == WorkloadActionType.CANCEL_QUERY) {
return currentActionType;
}
}
return retType;
}
public void updatePropertyIfNotNull(Map<String, String> property, List<Long> wgIdList) {
String enabledStr = property.get(ENABLED);
if (enabledStr != null) {
this.enabled = Boolean.parseBoolean(enabledStr);
}
String priorityStr = property.get(PRIORITY);
if (priorityStr != null) {
this.priority = Integer.parseInt(priorityStr);
}
String workloadGroupIdStr = property.get(WORKLOAD_GROUP);
// workloadGroupIdStr != null means user set workload group property,
// then we should overwrite policy's workloadGroupIdList
// if workloadGroupIdStr.length == 0, it means the policy should match all query.
if (workloadGroupIdStr != null) {
this.workloadGroupIdList = wgIdList;
}
}
void incrementVersion() {
this.version++;
}
public void setConditionMeta(List<WorkloadConditionMeta> conditionMeta) {
this.conditionMetaList = conditionMeta;
}
public void setActionMeta(List<WorkloadActionMeta> actionMeta) {
this.actionMetaList = actionMeta;
}
public long getId() {
return id;
}
public String getName() {
return name;
}
public long getVersion() {
return version;
}
public List<Long> getWorkloadGroupIdList() {
return this.workloadGroupIdList;
}
public List<WorkloadConditionMeta> getConditionMetaList() {
return conditionMetaList;
}
public List<WorkloadActionMeta> getActionMetaList() {
return actionMetaList;
}
// true, current policy can only run in FE;
// false, current policy can only run in BE
public boolean isFePolicy() {
if (isFePolicy == null) {
isFePolicy = false;
for (WorkloadAction action : workloadActionList) {
if (WorkloadSchedPolicyMgr.FE_ACTION_SET.contains(action.getWorkloadActionType())) {
isFePolicy = true;
break;
}
}
}
return isFePolicy;
}
public TopicInfo toTopicInfo() {
TWorkloadSchedPolicy tPolicy = new TWorkloadSchedPolicy();
tPolicy.setId(id);
tPolicy.setName(name);
tPolicy.setVersion(version);
tPolicy.setPriority(priority);
tPolicy.setEnabled(enabled);
tPolicy.setWgIdList(workloadGroupIdList);
List<TWorkloadCondition> condList = new ArrayList();
for (WorkloadConditionMeta cond : conditionMetaList) {
TWorkloadCondition tCond = new TWorkloadCondition();
TWorkloadMetricType metricType = WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName);
if (metricType == null) {
return null;
}
tCond.setMetricName(metricType);
tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op));
tCond.setValue(cond.value);
condList.add(tCond);
}
List<TWorkloadAction> actionList = new ArrayList();
for (WorkloadActionMeta action : actionMetaList) {
TWorkloadAction tAction = new TWorkloadAction();
TWorkloadActionType tActionType = WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action);
if (tActionType == null) {
return null;
}
tAction.setAction(tActionType);
tAction.setActionArgs(action.actionArgs);
actionList.add(tAction);
}
tPolicy.setConditionList(condList);
tPolicy.setActionList(actionList);
TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadSchedPolicy(tPolicy);
return topicInfo;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static WorkloadSchedPolicy read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, WorkloadSchedPolicy.class);
}
@Override
public void gsonPostProcess() throws IOException {
List<WorkloadCondition> policyConditionList = new ArrayList<>();
for (WorkloadConditionMeta cm : conditionMetaList) {
try {
WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm);
policyConditionList.add(cond);
} catch (UserException ue) {
Log.error("unexpected condition data error when replay log ", ue);
}
}
this.workloadConditionList = policyConditionList;
List<WorkloadAction> actionList = new ArrayList<>();
for (WorkloadActionMeta actionMeta : actionMetaList) {
try {
WorkloadAction ret = WorkloadAction.createWorkloadAction(actionMeta);
actionList.add(ret);
} catch (UserException ue) {
Log.error("unexpected action data error when replay log ", ue);
}
}
this.workloadActionList = actionList;
}
}