WorkloadGroup.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.resource.workloadgroup;

import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TWgSlotMemoryPolicy;
import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class WorkloadGroup implements Writable, GsonPostProcessable {
    private static final Logger LOG = LogManager.getLogger(WorkloadGroup.class);

    public static final String CPU_SHARE = "cpu_share";

    public static final String CPU_HARD_LIMIT = "cpu_hard_limit";

    public static final String MAX_CPU_PERCENT = "max_cpu_percent";

    public static final String MIN_CPU_PERCENT = "min_cpu_percent";

    public static final String MEMORY_LIMIT = "memory_limit";

    public static final String MAX_MEMORY_PERCENT = "max_memory_percent";

    public static final String MIN_MEMORY_PERCENT = "min_memory_percent";

    public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";

    public static final String MAX_CONCURRENCY = "max_concurrency";

    public static final String MAX_QUEUE_SIZE = "max_queue_size";

    public static final String QUEUE_TIMEOUT = "queue_timeout";

    public static final String SCAN_THREAD_NUM = "scan_thread_num";

    public static final String MAX_REMOTE_SCAN_THREAD_NUM = "max_remote_scan_thread_num";

    public static final String MIN_REMOTE_SCAN_THREAD_NUM = "min_remote_scan_thread_num";

    public static final String MEMORY_LOW_WATERMARK = "memory_low_watermark";

    public static final String MEMORY_HIGH_WATERMARK = "memory_high_watermark";

    public static final String SLOT_MEMORY_POLICY = "slot_memory_policy";

    public static final String TAG = "tag";

    public static final String READ_BYTES_PER_SECOND = "read_bytes_per_second";

    public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second";

    // deprecated, use MEMORY_LOW_WATERMARK and MEMORY_HIGH_WATERMARK instead.
    public static final String SPILL_THRESHOLD_LOW_WATERMARK = "spill_threshold_low_watermark";
    public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark";

    public static final String COMPUTE_GROUP = "compute_group";

    // NOTE(wb): all property is not required, some properties default value is set in be
    // default value is as followed
    private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
            .add(MAX_CPU_PERCENT).add(MIN_CPU_PERCENT)
            .add(MAX_MEMORY_PERCENT).add(MIN_MEMORY_PERCENT)
            .add(MAX_CONCURRENCY).add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT)
            .add(SCAN_THREAD_NUM).add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
            .add(MEMORY_LOW_WATERMARK).add(MEMORY_HIGH_WATERMARK)
            .add(COMPUTE_GROUP).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND)
            .add(SLOT_MEMORY_POLICY).build();


    public static final int MAX_CPU_PERCENT_DEFAULT_VALUE = 100;
    // Memory limit is a string value ending with % in BE, so it is different from other limits
    // other limit is a number.
    public static final String MAX_MEMORY_PERCENT_DEFAULT_VALUE = "100";
    public static final int MEMORY_LOW_WATERMARK_DEFAULT_VALUE = 75;
    public static final int MEMORY_HIGH_WATERMARK_DEFAULT_VALUE = 85;

    private static final Map<String, String> ALL_PROPERTIES_DEFAULT_VALUE_MAP = Maps.newHashMap();

    static {
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_CONCURRENCY, String.valueOf(Integer.MAX_VALUE));
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_QUEUE_SIZE, "0");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(QUEUE_TIMEOUT, "0");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(SCAN_THREAD_NUM, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_REMOTE_SCAN_THREAD_NUM, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MIN_REMOTE_SCAN_THREAD_NUM, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "75%");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "85%");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(COMPUTE_GROUP, "");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(READ_BYTES_PER_SECOND, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(REMOTE_READ_BYTES_PER_SECOND, "-1");
    }

    public static final String SLOT_MEMORY_POLICY_DEFAULT_VALUE = "none";
    public static final HashSet<String> AVAILABLE_SLOT_MEMORY_POLICY_VALUES = new HashSet<String>() {{
            add("none");
            add("fixed");
            add("dynamic");
        }};

    @SerializedName(value = "id")
    private long id;

    @SerializedName(value = "name")
    private String name;

    @SerializedName(value = "properties")
    private Map<String, String> properties;

    // Version update required after alter resource group
    @SerializedName(value = "version")
    private long version;

    private QueryQueue queryQueue;

    WorkloadGroup(long id, String name, Map<String, String> properties) {
        this(id, name, properties, 0);
    }

    private WorkloadGroup(long id, String name, Map<String, String> properties, long version) {
        this.id = id;
        this.name = name;
        this.properties = properties;
        this.version = version;

        if (properties.containsKey(SLOT_MEMORY_POLICY)) {
            String slotPolicy = properties.get(SLOT_MEMORY_POLICY);
            this.properties.put(SLOT_MEMORY_POLICY, slotPolicy);
        } else {
            this.properties.put(SLOT_MEMORY_POLICY, SLOT_MEMORY_POLICY_DEFAULT_VALUE);
        }

        if (properties.containsKey(MAX_CPU_PERCENT)) {
            String cpuHardLimitStr = properties.get(MAX_CPU_PERCENT);
            if (cpuHardLimitStr.endsWith("%")) {
                cpuHardLimitStr = cpuHardLimitStr.substring(0, cpuHardLimitStr.length() - 1);
            }
            this.properties.put(MAX_CPU_PERCENT, cpuHardLimitStr);
        } else {
            this.properties.put(MAX_CPU_PERCENT, MAX_CPU_PERCENT_DEFAULT_VALUE + "");
        }

        if (properties.containsKey(MIN_CPU_PERCENT)) {
            String cpuHardLimitStr = properties.get(MIN_CPU_PERCENT);
            if (cpuHardLimitStr.endsWith("%")) {
                cpuHardLimitStr = cpuHardLimitStr.substring(0, cpuHardLimitStr.length() - 1);
            }
            this.properties.put(MIN_CPU_PERCENT, cpuHardLimitStr);
        } else {
            this.properties.put(MIN_CPU_PERCENT, "0");
        }

        if (properties.containsKey(MAX_MEMORY_PERCENT)) {
            String memHardLimitStr = properties.get(MAX_MEMORY_PERCENT);
            if (memHardLimitStr.endsWith("%")) {
                memHardLimitStr = memHardLimitStr.substring(0, memHardLimitStr.length() - 1);
            }
            this.properties.put(MAX_MEMORY_PERCENT, memHardLimitStr);
        } else {
            this.properties.put(MAX_MEMORY_PERCENT, MAX_MEMORY_PERCENT_DEFAULT_VALUE);
        }

        if (properties.containsKey(MIN_MEMORY_PERCENT)) {
            String minMemLimitStr = properties.get(MIN_MEMORY_PERCENT);
            if (minMemLimitStr.endsWith("%")) {
                minMemLimitStr = minMemLimitStr.substring(0, minMemLimitStr.length() - 1);
            }
            this.properties.put(MIN_MEMORY_PERCENT, minMemLimitStr);
        } else {
            this.properties.put(MIN_MEMORY_PERCENT, "0");
        }

        if (properties.containsKey(MEMORY_LOW_WATERMARK)) {
            String lowWatermarkStr = properties.get(MEMORY_LOW_WATERMARK);
            if (lowWatermarkStr.endsWith("%")) {
                lowWatermarkStr = lowWatermarkStr.substring(0, lowWatermarkStr.length() - 1);
            }
            this.properties.put(MEMORY_LOW_WATERMARK, lowWatermarkStr);
        } else {
            this.properties.put(MEMORY_LOW_WATERMARK, MEMORY_LOW_WATERMARK_DEFAULT_VALUE + "");
        }
        if (properties.containsKey(MEMORY_HIGH_WATERMARK)) {
            String highWatermarkStr = properties.get(MEMORY_HIGH_WATERMARK);
            if (highWatermarkStr.endsWith("%")) {
                highWatermarkStr = highWatermarkStr.substring(0, highWatermarkStr.length() - 1);
            }
            this.properties.put(MEMORY_HIGH_WATERMARK, highWatermarkStr);
        } else {
            this.properties.put(MEMORY_HIGH_WATERMARK, MEMORY_HIGH_WATERMARK_DEFAULT_VALUE + "");
        }
        initQueryQueue();
    }

    // new resource group
    public static WorkloadGroup create(String name, Map<String, String> properties) throws DdlException {
        checkProperties(properties);
        WorkloadGroup newWorkloadGroup = new WorkloadGroup(Env.getCurrentEnv().getNextId(), name, properties);
        return newWorkloadGroup;
    }

    // alter resource group
    public static WorkloadGroup copyAndUpdate(WorkloadGroup currentWorkloadGroup, Map<String, String> updateProperties)
            throws DdlException {
        Map<String, String> newProperties = new HashMap<>(currentWorkloadGroup.getProperties());
        for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
            newProperties.put(kv.getKey(), kv.getValue());
        }

        checkProperties(newProperties);
        WorkloadGroup newWorkloadGroup = new WorkloadGroup(
                currentWorkloadGroup.getId(), currentWorkloadGroup.getName(),
                newProperties, currentWorkloadGroup.getVersion() + 1);
        return newWorkloadGroup;
    }

    private static void checkProperties(Map<String, String> properties) throws DdlException {
        for (String propertyName : properties.keySet()) {
            if (!ALL_PROPERTIES_NAME.contains(propertyName)) {
                throw new DdlException("Property " + propertyName + " is not supported, maybe it is deprecated.");
            }
        }

        int minCpuPercent = 0;
        if (properties.containsKey(MIN_CPU_PERCENT)) {
            String inputValue = properties.get(MIN_CPU_PERCENT);
            String minCpuPercentStr = inputValue;
            try {
                if (minCpuPercentStr.endsWith("%")) {
                    minCpuPercentStr = minCpuPercentStr.substring(0, minCpuPercentStr.length() - 1);
                }

                minCpuPercent = Integer.parseInt(minCpuPercentStr);
                if (minCpuPercent < 0 || minCpuPercent > 100) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MIN_CPU_PERCENT + " value has to be in [0,100], but input value is "
                                + inputValue);
            }
        } else {
            properties.put(MIN_CPU_PERCENT, "0");
        }

        int maxCpuPercent = 0;
        if (properties.containsKey(MAX_CPU_PERCENT)) {
            String inputValue = properties.get(MAX_CPU_PERCENT);
            String maxCpuPercentStr = inputValue;
            try {
                if (maxCpuPercentStr.endsWith("%")) {
                    maxCpuPercentStr = maxCpuPercentStr.substring(0, maxCpuPercentStr.length() - 1);
                }

                maxCpuPercent = Integer.parseInt(maxCpuPercentStr);
                if (maxCpuPercent < 1 || maxCpuPercent > 100) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_CPU_PERCENT + " value has to be in [1,100], but input value is "
                                + inputValue);
            }
        } else {
            properties.put(MAX_CPU_PERCENT, "100");
            maxCpuPercent = 100;
        }

        if (maxCpuPercent < minCpuPercent) {
            throw new DdlException(MAX_CPU_PERCENT + "'s value " + maxCpuPercent
                    + " has to be great or equal than "
                    + MIN_CPU_PERCENT + " " + minCpuPercent);
        }

        int maxMemPercent = 0;
        if (properties.containsKey(MAX_MEMORY_PERCENT)) {
            String inputValue = properties.get(MAX_MEMORY_PERCENT);
            String maxMemPercentStr = inputValue;
            try {
                if (maxMemPercentStr.endsWith("%")) {
                    maxMemPercentStr = maxMemPercentStr.substring(0, maxMemPercentStr.length() - 1);
                }

                maxMemPercent = Integer.parseInt(maxMemPercentStr);
                if (maxMemPercent < 1 || maxMemPercent > 100) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_MEMORY_PERCENT + " value has to be in [1,100], but input value is "
                                + inputValue);
            }
        } else {
            properties.put(MAX_MEMORY_PERCENT, "100");
            maxMemPercent = 100;
        }

        int minMemPercent = 0;
        if (properties.containsKey(MIN_MEMORY_PERCENT)) {
            String inputValue = properties.get(MIN_MEMORY_PERCENT);
            String minMemPercentStr = inputValue;
            try {
                if (minMemPercentStr.endsWith("%")) {
                    minMemPercentStr = minMemPercentStr.substring(0, minMemPercentStr.length() - 1);
                }

                minMemPercent = Integer.parseInt(minMemPercentStr);
                if (minMemPercent < 0 || minMemPercent > 100) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MIN_MEMORY_PERCENT + " value has to be in [0,100], but input value is "
                                + inputValue);
            }
        } else {
            properties.put(MIN_MEMORY_PERCENT, "0");
            minMemPercent = 0;
        }

        if (maxMemPercent < minMemPercent) {
            throw new DdlException(MAX_MEMORY_PERCENT + "'s value " + maxMemPercent
                    + " has to be great or equal than "
                    + MIN_MEMORY_PERCENT + " " + minMemPercent);
        }

        if (properties.containsKey(SLOT_MEMORY_POLICY)) {
            String value = properties.get(SLOT_MEMORY_POLICY).toLowerCase();
            if (!AVAILABLE_SLOT_MEMORY_POLICY_VALUES.contains(value)) {
                throw new DdlException("The value of '" + SLOT_MEMORY_POLICY
                        + "' must be one of none, fixed, dynamic.");
            }
        }

        if (properties.containsKey(SCAN_THREAD_NUM)) {
            String value = properties.get(SCAN_THREAD_NUM);
            try {
                int intValue = Integer.parseInt(value);
                if (intValue <= 0 && intValue != -1) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + SCAN_THREAD_NUM + " value is -1 or a positive integer. but input value is "
                                + value);
            }
        }

        int maxRemoteScanNum = -1;
        if (properties.containsKey(MAX_REMOTE_SCAN_THREAD_NUM)) {
            String value = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
            try {
                int intValue = Integer.parseInt(value);
                if (intValue <= 0 && intValue != -1) {
                    throw new NumberFormatException();
                }
                maxRemoteScanNum = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_REMOTE_SCAN_THREAD_NUM
                                + " value is -1 or a positive integer. but input value is " + value);
            }
        }

        int minRemoteScanNum = -1;
        if (properties.containsKey(MIN_REMOTE_SCAN_THREAD_NUM)) {
            String value = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
            try {
                int intValue = Integer.parseInt(value);
                if (intValue <= 0 && intValue != -1) {
                    throw new NumberFormatException();
                }
                minRemoteScanNum = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MIN_REMOTE_SCAN_THREAD_NUM
                                + " value is -1 or a positive integer. but input value is " + value);
            }
        }

        if ((maxRemoteScanNum == -1 && minRemoteScanNum != -1) || (maxRemoteScanNum != -1 && minRemoteScanNum == -1)) {
            throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " and " + MIN_REMOTE_SCAN_THREAD_NUM
                    + " must be specified simultaneously");
        } else if (maxRemoteScanNum < minRemoteScanNum) {
            throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " must bigger or equal " + MIN_REMOTE_SCAN_THREAD_NUM);
        }

        // check queue property
        if (properties.containsKey(MAX_CONCURRENCY)) {
            String inputValue = properties.get(MAX_CONCURRENCY);
            try {
                if (Integer.parseInt(inputValue) < 0) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_CONCURRENCY
                                + " value is an integer greater than or equal to 0, but input value is "
                                + inputValue);
            }
        }
        if (properties.containsKey(MAX_QUEUE_SIZE)) {
            String inputValue = properties.get(MAX_QUEUE_SIZE);
            try {
                if (Integer.parseInt(inputValue) < 0) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_QUEUE_SIZE
                                + " value is an integer greater than or equal to 0, but input value is "
                                + inputValue);
            }
        }
        if (properties.containsKey(QUEUE_TIMEOUT)) {
            String inputValue = properties.get(QUEUE_TIMEOUT);
            try {
                if (Integer.parseInt(inputValue) < 0) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + QUEUE_TIMEOUT
                                + " value is an integer greater than or equal to 0, but input value is "
                                + inputValue);
            }
        }

        int lowWaterMark = MEMORY_LOW_WATERMARK_DEFAULT_VALUE;
        if (properties.containsKey(MEMORY_LOW_WATERMARK)) {
            String lowVal = properties.get(MEMORY_LOW_WATERMARK);
            if (lowVal.endsWith("%")) {
                lowVal = lowVal.substring(0, lowVal.length() - 1);
            }
            try {
                int intValue = Integer.parseInt(lowVal);
                if ((intValue < 1 || intValue > 100)) {
                    throw new NumberFormatException();
                }
                lowWaterMark = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MEMORY_LOW_WATERMARK
                                + " value is an integer value between 1 and 100, but input value is "
                                + lowVal);
            }
        }

        int highWaterMark = MEMORY_HIGH_WATERMARK_DEFAULT_VALUE;
        if (properties.containsKey(MEMORY_HIGH_WATERMARK)) {
            String highVal = properties.get(MEMORY_HIGH_WATERMARK);
            if (highVal.endsWith("%")) {
                highVal = highVal.substring(0, highVal.length() - 1);
            }
            try {
                int intValue = Integer.parseInt(highVal);
                if ((intValue < 1 || intValue > 100)) {
                    throw new NumberFormatException();
                }
                highWaterMark = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MEMORY_HIGH_WATERMARK
                                + " value is an integer value between 1 and 100, but input value is "
                                + highVal);
            }
        }

        if (lowWaterMark > highWaterMark) {
            throw new DdlException(MEMORY_HIGH_WATERMARK + "(" + highWaterMark + ") should bigger than "
                    + MEMORY_LOW_WATERMARK + "(" + lowWaterMark + ")");
        }

        if (properties.containsKey(READ_BYTES_PER_SECOND)) {
            String readBytesVal = properties.get(READ_BYTES_PER_SECOND);
            try {
                long longVal = Long.parseLong(readBytesVal);
                if (longVal <= 0 && longVal != -1) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + READ_BYTES_PER_SECOND
                                + " value should be -1 or an positive integer, but input value is "
                                + readBytesVal);
            }
        }

        if (properties.containsKey(REMOTE_READ_BYTES_PER_SECOND)) {
            String readBytesVal = properties.get(REMOTE_READ_BYTES_PER_SECOND);
            try {
                long longVal = Long.parseLong(readBytesVal);
                if (longVal <= 0 && longVal != -1) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException("The allowed " + REMOTE_READ_BYTES_PER_SECOND
                        + " value should be -1 or an positive integer, but input value is " + readBytesVal);
            }
        }
    }

    public long getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public QueryQueue getQueryQueue() {
        return queryQueue;
    }

    public WorkloadGroupKey getWorkloadGroupKey() {
        return WorkloadGroupKey.get(this.getComputeGroup(), this.getName());
    }

    public Map<String, String> getProperties() {
        return properties;
    }

    public long getVersion() {
        return version;
    }

    public int getMinCpuPercent() {
        String minCpuPercentStr = properties.get(MIN_CPU_PERCENT);
        if (minCpuPercentStr.endsWith("%")) {
            return Integer.valueOf(minCpuPercentStr.substring(0, minCpuPercentStr.length() - 1));
        }
        return Integer.valueOf(minCpuPercentStr);
    }

    public int getMaxCpuPercent() {
        String maxCpuPercentStr = properties.get(MAX_CPU_PERCENT);
        if (maxCpuPercentStr.endsWith("%")) {
            return Integer.valueOf(maxCpuPercentStr.substring(0, maxCpuPercentStr.length() - 1));
        }
        return Integer.valueOf(maxCpuPercentStr);
    }

    public int getMinMemoryPercent() {
        String minMemPercentStr = properties.get(MIN_MEMORY_PERCENT);
        if (minMemPercentStr.endsWith("%")) {
            return Integer.valueOf(minMemPercentStr.substring(0,
                    minMemPercentStr.length() - 1));
        }
        return Integer.valueOf(minMemPercentStr);
    }

    public int getMaxMemoryPercent() {
        String maxMemPercentStr = properties.get(MAX_MEMORY_PERCENT);
        if (maxMemPercentStr.endsWith("%")) {
            return Integer.valueOf(maxMemPercentStr.substring(0,
                    maxMemPercentStr.length() - 1));
        }
        return Integer.valueOf(maxMemPercentStr);
    }

    public void trimProperties() {
        if (properties == null) {
            return;
        }
        Set<String> keySet = properties.keySet();
        for (String key : keySet) {
            String value = properties.get(key);
            String trimmedValue = value.trim();
            properties.put(key, trimmedValue);
        }
    }

    public void getProcNodeData(BaseProcResult result) {
        List<String> row = new ArrayList<>();
        row.add(String.valueOf(id));
        row.add(name);
        Pair<Integer, Integer> queryQueueDetail = queryQueue.getQueryQueueDetail();
        // skip id,name,running query,waiting query
        for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
            String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
            if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
                row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.first));
            } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
                row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.second));
            } else if (COMPUTE_GROUP.equals(key)) {
                String val = properties.get(key);
                if (!StringUtils.isEmpty(val) && Config.isCloudMode()) {
                    try {
                        String cgName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterNameByClusterId(
                                val);
                        if (!StringUtils.isEmpty(cgName)) {
                            val = cgName;
                        }
                    } catch (Throwable t) {
                        LOG.debug("get compute group failed, ", t);
                    }
                }
                row.add(val);
            } else {
                String val = properties.get(key);
                if (StringUtils.isEmpty(val)) {
                    row.add(ALL_PROPERTIES_DEFAULT_VALUE_MAP.get(key));
                } else if (MIN_CPU_PERCENT.equals(key)
                        || MAX_CPU_PERCENT.equals(key)
                        || MIN_MEMORY_PERCENT.equals(key)
                        || MAX_MEMORY_PERCENT.equals(key)
                        || MEMORY_LOW_WATERMARK.equals(key)
                        || MEMORY_HIGH_WATERMARK.equals(key)) {
                    row.add(val + "%");
                } else {
                    row.add(val);
                }
            }
        }
        result.addRow(row);
    }

    public String getComputeGroup() {
        String ret = properties.get(COMPUTE_GROUP);
        return StringUtils.isEmpty(ret) ? WorkloadGroupMgr.EMPTY_COMPUTE_GROUP : ret;
    }

    @Override
    public String toString() {
        return GsonUtils.GSON.toJson(this);
    }

    public TPipelineWorkloadGroup toThrift() {
        return new TPipelineWorkloadGroup().setId(id);
    }

    public static TWgSlotMemoryPolicy findSlotPolicyValueByString(String slotPolicy) {
        if (slotPolicy.equalsIgnoreCase("none")) {
            return TWgSlotMemoryPolicy.NONE;
        } else if (slotPolicy.equalsIgnoreCase("fixed")) {
            return TWgSlotMemoryPolicy.FIXED;
        } else if (slotPolicy.equalsIgnoreCase("dynamic")) {
            return TWgSlotMemoryPolicy.DYNAMIC;
        } else {
            throw new RuntimeException("Could not find policy using " + slotPolicy);
        }
    }

    public TopicInfo toTopicInfo() {
        TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
        tWorkloadGroupInfo.setId(this.id);

        tWorkloadGroupInfo.setName(name);
        tWorkloadGroupInfo.setVersion(version);

        String minCpuPercentStr = properties.get(MIN_CPU_PERCENT);
        if (minCpuPercentStr != null) {
            tWorkloadGroupInfo.setMinCpuPercent(this.getMinCpuPercent());
        }

        String maxCpuPercentStr = properties.get(MAX_CPU_PERCENT);
        if (maxCpuPercentStr != null) {
            tWorkloadGroupInfo.setMaxCpuPercent(this.getMaxCpuPercent());
        }

        String maxMemPercentStr = properties.get(MAX_MEMORY_PERCENT);
        if (maxMemPercentStr != null) {
            tWorkloadGroupInfo.setMaxMemoryPercent(this.getMaxMemoryPercent());
        }

        String minMemPercentStr = properties.get(MIN_MEMORY_PERCENT);
        if (minMemPercentStr != null) {
            tWorkloadGroupInfo.setMinMemoryPercent(this.getMinMemoryPercent());
        }

        String slotMemoryPolicyStr = properties.get(SLOT_MEMORY_POLICY);
        if (slotMemoryPolicyStr != null) {
            tWorkloadGroupInfo.setSlotMemoryPolicy(findSlotPolicyValueByString(slotMemoryPolicyStr));
        }

        String scanThreadNumStr = properties.get(SCAN_THREAD_NUM);
        if (scanThreadNumStr != null) {
            tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr));
        }

        String maxRemoteScanThreadNumStr = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
        if (maxRemoteScanThreadNumStr != null) {
            tWorkloadGroupInfo.setMaxRemoteScanThreadNum(Integer.parseInt(maxRemoteScanThreadNumStr));
        }

        String minRemoteScanThreadNumStr = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
        if (minRemoteScanThreadNumStr != null) {
            tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr));
        }

        String memoryLowWatermarkStr = properties.get(MEMORY_LOW_WATERMARK);
        if (memoryLowWatermarkStr != null) {
            tWorkloadGroupInfo.setMemoryLowWatermark(Integer.parseInt(memoryLowWatermarkStr));
        }

        String memoryHighWatermarkStr = properties.get(MEMORY_HIGH_WATERMARK);
        if (memoryHighWatermarkStr != null) {
            tWorkloadGroupInfo.setMemoryHighWatermark(Integer.parseInt(memoryHighWatermarkStr));
        }

        String readBytesPerSecStr = properties.get(READ_BYTES_PER_SECOND);
        if (readBytesPerSecStr != null) {
            tWorkloadGroupInfo.setReadBytesPerSecond(Long.valueOf(readBytesPerSecStr));
        }

        String remoteReadBytesPerSecStr = properties.get(REMOTE_READ_BYTES_PER_SECOND);
        if (remoteReadBytesPerSecStr != null) {
            tWorkloadGroupInfo.setRemoteReadBytesPerSecond(Long.valueOf(remoteReadBytesPerSecStr));
        }

        String cgStr = properties.get(COMPUTE_GROUP);
        if (!StringUtils.isEmpty(cgStr)) {
            tWorkloadGroupInfo.setTag(cgStr);
        }

        String totalQuerySlotCountStr = properties.get(MAX_CONCURRENCY);
        if (totalQuerySlotCountStr != null) {
            tWorkloadGroupInfo.setTotalQuerySlotCount(Integer.parseInt(totalQuerySlotCountStr));
        }

        TopicInfo topicInfo = new TopicInfo();
        topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
        return topicInfo;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this);
        Text.writeString(out, json);
    }

    // This API is used in journal deserialize API.
    public static WorkloadGroup read(DataInput in) throws IOException {
        String json = Text.readString(in);
        WorkloadGroup workloadGroup = GsonUtils.GSON.fromJson(json, WorkloadGroup.class);
        return workloadGroup;
    }

    // When GsonUtils.GSON from json is called, this API is called.
    @Override
    public void gsonPostProcess() throws IOException {
        // Do not use -1 as default value, using 100%
        if (properties.containsKey(MEMORY_LIMIT)) {
            String memHardLimitStr = properties.get(MEMORY_LIMIT);
            // If the input is -1, it means use all memory, in this version we could change it to 100% now
            // since sum of all group's memory could be larger than 100%
            if (memHardLimitStr.equals("-1")) {
                memHardLimitStr = MAX_MEMORY_PERCENT_DEFAULT_VALUE;
            } else if (memHardLimitStr.endsWith("%")) {
                memHardLimitStr = memHardLimitStr.substring(0, memHardLimitStr.length() - 1);
            }
            // During upgrade if the workload group's overcommit property == true, then it means not control
            // memory usage in BE, so that we reset max memory percent to 100% directly.
            boolean isOverCommit = true;
            String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
            if (memOvercommitStr != null) {
                isOverCommit = Boolean.valueOf(memOvercommitStr);
            }
            if (isOverCommit) {
                memHardLimitStr = MAX_MEMORY_PERCENT_DEFAULT_VALUE;
            }
            this.properties.put(MAX_MEMORY_PERCENT, memHardLimitStr);
        }

        if (!properties.containsKey(MAX_MEMORY_PERCENT)) {
            this.properties.put(MAX_MEMORY_PERCENT, MAX_MEMORY_PERCENT_DEFAULT_VALUE);
        }

        if (!properties.containsKey(MIN_MEMORY_PERCENT)) {
            this.properties.put(MIN_MEMORY_PERCENT, "0");
        }

        if (properties.containsKey(CPU_HARD_LIMIT)) {
            String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT);
            if (cpuHardLimitStr.equals("-1")) {
                cpuHardLimitStr = MAX_CPU_PERCENT_DEFAULT_VALUE + "";
            } else if (cpuHardLimitStr.endsWith("%")) {
                cpuHardLimitStr = cpuHardLimitStr.substring(0, cpuHardLimitStr.length() - 1);
            }
            this.properties.put(MAX_CPU_PERCENT, cpuHardLimitStr);
        }

        if (!properties.containsKey(MAX_CPU_PERCENT)) {
            this.properties.put(MAX_CPU_PERCENT, MAX_CPU_PERCENT_DEFAULT_VALUE + "");
        }

        if (!properties.containsKey(MIN_CPU_PERCENT)) {
            this.properties.put(MIN_CPU_PERCENT, "0");
        }

        if (!properties.containsKey(MEMORY_LOW_WATERMARK)) {
            this.properties.put(MEMORY_LOW_WATERMARK, MEMORY_LOW_WATERMARK_DEFAULT_VALUE + "");
        }

        if (!properties.containsKey(MEMORY_HIGH_WATERMARK)) {
            this.properties.put(MEMORY_HIGH_WATERMARK, MEMORY_HIGH_WATERMARK_DEFAULT_VALUE + "");
        }

        properties.remove(CPU_SHARE);
        properties.remove(CPU_HARD_LIMIT);
        properties.remove(ENABLE_MEMORY_OVERCOMMIT);
        properties.remove(MEMORY_LIMIT);
        properties.remove(SPILL_THRESHOLD_HIGH_WATERMARK);
        properties.remove(SPILL_THRESHOLD_LOW_WATERMARK);
        // The from json method just uses reflection logic to create a new workload group
        // but workload group's contructor need create other objects, like queue, so need
        // init queue here after workload group is created from json
        initQueryQueue();
    }

    private void initQueryQueue() {
        int maxConcurrency = Integer.MAX_VALUE;
        int maxQueueSize = 0;
        int queueTimeout = 0;
        if (properties.containsKey(MAX_CONCURRENCY)) {
            maxConcurrency = Integer.parseInt(properties.get(MAX_CONCURRENCY));
        } else {
            maxConcurrency = Integer.MAX_VALUE;
            properties.put(MAX_CONCURRENCY, String.valueOf(maxConcurrency));
        }
        if (properties.containsKey(MAX_QUEUE_SIZE)) {
            maxQueueSize = Integer.parseInt(properties.get(MAX_QUEUE_SIZE));
        } else {
            maxQueueSize = 0;
            properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize));
        }
        if (properties.containsKey(QUEUE_TIMEOUT)) {
            queueTimeout = Integer.parseInt(properties.get(QUEUE_TIMEOUT));
        } else {
            queueTimeout = 0;
            properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout));
        }
        queryQueue = new QueryQueue(id, maxConcurrency, maxQueueSize, queueTimeout);
    }
}