WorkloadGroup.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.resource.workloadgroup;

import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TWgSlotMemoryPolicy;
import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

public class WorkloadGroup implements Writable, GsonPostProcessable {
    private static final Logger LOG = LogManager.getLogger(WorkloadGroup.class);

    public static final String CPU_SHARE = "cpu_share";

    public static final String CPU_HARD_LIMIT = "cpu_hard_limit";

    public static final String MEMORY_LIMIT = "memory_limit";

    public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";

    public static final String WRITE_BUFFER_RATIO = "write_buffer_ratio";

    public static final String MAX_CONCURRENCY = "max_concurrency";

    public static final String MAX_QUEUE_SIZE = "max_queue_size";

    public static final String QUEUE_TIMEOUT = "queue_timeout";

    public static final String SCAN_THREAD_NUM = "scan_thread_num";

    public static final String MAX_REMOTE_SCAN_THREAD_NUM = "max_remote_scan_thread_num";

    public static final String MIN_REMOTE_SCAN_THREAD_NUM = "min_remote_scan_thread_num";

    public static final String MEMORY_LOW_WATERMARK = "memory_low_watermark";

    public static final String MEMORY_HIGH_WATERMARK = "memory_high_watermark";

    public static final String SLOT_MEMORY_POLICY = "slot_memory_policy";

    public static final String TAG = "tag";

    public static final String READ_BYTES_PER_SECOND = "read_bytes_per_second";

    public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second";

    // deprecated, use MEMORY_LOW_WATERMARK and MEMORY_HIGH_WATERMARK instead.
    public static final String SPILL_THRESHOLD_LOW_WATERMARK = "spill_threshold_low_watermark";
    public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark";

    public static final String COMPUTE_GROUP = "compute_group";

    // NOTE(wb): all property is not required, some properties default value is set in be
    // default value is as followed
    // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
    private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
            .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
            .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
            .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
            .add(MEMORY_LOW_WATERMARK).add(MEMORY_HIGH_WATERMARK)
            .add(COMPUTE_GROUP).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND)
            .add(WRITE_BUFFER_RATIO).add(SLOT_MEMORY_POLICY).build();


    private static final ImmutableMap<String, String> DEPRECATED_PROPERTIES_NAME =
            new ImmutableMap.Builder<String, String>()
                    .put(SPILL_THRESHOLD_LOW_WATERMARK, MEMORY_LOW_WATERMARK)
                    .put(SPILL_THRESHOLD_HIGH_WATERMARK, MEMORY_HIGH_WATERMARK).build();

    public static final int MEMORY_LOW_WATERMARK_DEFAULT_VALUE = 75;
    public static final int MEMORY_HIGH_WATERMARK_DEFAULT_VALUE = 85;

    private static final Map<String, String> ALL_PROPERTIES_DEFAULT_VALUE_MAP = Maps.newHashMap();

    static {
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(CPU_SHARE, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(CPU_HARD_LIMIT, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LIMIT, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(ENABLE_MEMORY_OVERCOMMIT, "true");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_CONCURRENCY, String.valueOf(Integer.MAX_VALUE));
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_QUEUE_SIZE, "0");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(QUEUE_TIMEOUT, "0");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(SCAN_THREAD_NUM, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_REMOTE_SCAN_THREAD_NUM, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MIN_REMOTE_SCAN_THREAD_NUM, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "75%");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "85%");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(COMPUTE_GROUP, "");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(READ_BYTES_PER_SECOND, "-1");
        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(REMOTE_READ_BYTES_PER_SECOND, "-1");
    }

    public static final int WRITE_BUFFER_RATIO_DEFAULT_VALUE = 20;
    public static final String SLOT_MEMORY_POLICY_DEFAULT_VALUE = "none";
    public static final HashSet<String> AVAILABLE_SLOT_MEMORY_POLICY_VALUES = new HashSet<String>() {{
            add("none");
            add("fixed");
            add("dynamic");
        }};

    @SerializedName(value = "id")
    private long id;

    @SerializedName(value = "name")
    private String name;

    @SerializedName(value = "properties")
    private Map<String, String> properties;

    // Version update required after alter resource group
    @SerializedName(value = "version")
    private long version;

    private double memoryLimitPercent = 0;
    private int maxConcurrency = Integer.MAX_VALUE;
    private int maxQueueSize = 0;
    private int queueTimeout = 0;

    private int cpuHardLimit = 0;

    WorkloadGroup(long id, String name, Map<String, String> properties) {
        this(id, name, properties, 0);
    }

    private WorkloadGroup(long id, String name, Map<String, String> properties, long version) {
        this.id = id;
        this.name = name;
        this.properties = properties;
        this.version = version;
        if (properties.containsKey(MEMORY_LIMIT)) {
            setMemLimitPercent(properties);
        }

        if (properties.containsKey(WRITE_BUFFER_RATIO)) {
            String loadBufLimitStr = properties.get(WRITE_BUFFER_RATIO);
            if (loadBufLimitStr.endsWith("%")) {
                loadBufLimitStr = loadBufLimitStr.substring(0, loadBufLimitStr.length() - 1);
            }
            this.properties.put(WRITE_BUFFER_RATIO, loadBufLimitStr);
        } else {
            this.properties.put(WRITE_BUFFER_RATIO, WRITE_BUFFER_RATIO_DEFAULT_VALUE + "");
        }

        if (properties.containsKey(SLOT_MEMORY_POLICY)) {
            String slotPolicy = properties.get(SLOT_MEMORY_POLICY);
            this.properties.put(SLOT_MEMORY_POLICY, slotPolicy);
        } else {
            this.properties.put(SLOT_MEMORY_POLICY, SLOT_MEMORY_POLICY_DEFAULT_VALUE);
        }

        if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
            properties.put(ENABLE_MEMORY_OVERCOMMIT, properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
        }
        if (properties.containsKey(CPU_HARD_LIMIT)) {
            String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT);
            if (cpuHardLimitStr.endsWith("%")) {
                cpuHardLimitStr = cpuHardLimitStr.substring(0, cpuHardLimitStr.length() - 1);
            }
            this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr);
            this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr);
        }
        if (properties.containsKey(MEMORY_LOW_WATERMARK)) {
            String lowWatermarkStr = properties.get(MEMORY_LOW_WATERMARK);
            if (lowWatermarkStr.endsWith("%")) {
                lowWatermarkStr = lowWatermarkStr.substring(0, lowWatermarkStr.length() - 1);
            }
            this.properties.put(MEMORY_LOW_WATERMARK, lowWatermarkStr);
        }
        if (properties.containsKey(MEMORY_HIGH_WATERMARK)) {
            String highWatermarkStr = properties.get(MEMORY_HIGH_WATERMARK);
            if (highWatermarkStr.endsWith("%")) {
                highWatermarkStr = highWatermarkStr.substring(0, highWatermarkStr.length() - 1);
            }
            this.properties.put(MEMORY_HIGH_WATERMARK, highWatermarkStr);
        }
        resetQueueProperty(properties);
    }

    private void resetQueueProperty(Map<String, String> properties) {
        if (properties.containsKey(MAX_CONCURRENCY)) {
            this.maxConcurrency = Integer.parseInt(properties.get(MAX_CONCURRENCY));
        } else {
            this.maxConcurrency = Integer.MAX_VALUE;
            properties.put(MAX_CONCURRENCY, String.valueOf(this.maxConcurrency));
        }
        if (properties.containsKey(MAX_QUEUE_SIZE)) {
            this.maxQueueSize = Integer.parseInt(properties.get(MAX_QUEUE_SIZE));
        } else {
            this.maxQueueSize = 0;
            properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize));
        }
        if (properties.containsKey(QUEUE_TIMEOUT)) {
            this.queueTimeout = Integer.parseInt(properties.get(QUEUE_TIMEOUT));
        } else {
            this.queueTimeout = 0;
            properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout));
        }
    }

    // new resource group
    public static WorkloadGroup create(String name, Map<String, String> properties) throws DdlException {
        checkProperties(properties);
        WorkloadGroup newWorkloadGroup = new WorkloadGroup(Env.getCurrentEnv().getNextId(), name, properties);
        return newWorkloadGroup;
    }

    // alter resource group
    public static WorkloadGroup copyAndUpdate(WorkloadGroup currentWorkloadGroup, Map<String, String> updateProperties)
            throws DdlException {
        Map<String, String> newProperties = new HashMap<>(currentWorkloadGroup.getProperties());
        for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
            newProperties.put(kv.getKey(), kv.getValue());
        }

        checkProperties(newProperties);
        WorkloadGroup newWorkloadGroup = new WorkloadGroup(
                currentWorkloadGroup.getId(), currentWorkloadGroup.getName(),
                newProperties, currentWorkloadGroup.getVersion() + 1);
        return newWorkloadGroup;
    }

    private static void checkProperties(Map<String, String> properties) throws DdlException {
        for (String propertyName : properties.keySet()) {
            if (!ALL_PROPERTIES_NAME.contains(propertyName)) {
                throw new DdlException("Property " + propertyName + " is not supported.");
            }
        }

        if (properties.containsKey(CPU_SHARE)) {
            String inputValue = properties.get(CPU_SHARE);
            try {
                int cpuShareI = Integer.parseInt(inputValue);
                if (cpuShareI <= 0 && cpuShareI != -1) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + CPU_SHARE + " value is -1 or a positive integer, but input value is "
                                + inputValue);
            }
        }

        if (properties.containsKey(CPU_HARD_LIMIT)) {
            String inputValue = properties.get(CPU_HARD_LIMIT);
            String cpuHardLimit = inputValue;
            try {
                boolean endWithSign = false;
                if (cpuHardLimit.endsWith("%")) {
                    cpuHardLimit = cpuHardLimit.substring(0, cpuHardLimit.length() - 1);
                    endWithSign = true;
                }

                int intVal = Integer.parseInt(cpuHardLimit);
                if (endWithSign && intVal == -1) {
                    throw new NumberFormatException();
                }
                if (!(intVal >= 1 && intVal <= 100) && -1 != intVal) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + WorkloadGroup.CPU_HARD_LIMIT
                                + " value is -1 or a positive integer between 1 and 100, but input value is "
                                + inputValue);
            }
        }

        if (properties.containsKey(MEMORY_LIMIT)) {
            String memoryLimitStr = properties.get(MEMORY_LIMIT);
            if (!memoryLimitStr.endsWith("%") && !"-1".equals(memoryLimitStr)) {
                throw new DdlException(
                        MEMORY_LIMIT + " requires a percentage value which ends with a '%' or -1, but input value is "
                                + memoryLimitStr);
            }
            if (!"-1".equals(memoryLimitStr)) {
                try {
                    double memLimitD = Double.parseDouble(memoryLimitStr.substring(0, memoryLimitStr.length() - 1));
                    if (memLimitD <= 0) {
                        throw new NumberFormatException();
                    }
                } catch (NumberFormatException e) {
                    throw new DdlException("The allowed " + MEMORY_LIMIT
                            + " value is a positive floating point number, but input value is " + memoryLimitStr);
                }
            }
        }

        if (properties.containsKey(WRITE_BUFFER_RATIO)) {
            String writeBufSizeStr = properties.get(WRITE_BUFFER_RATIO);
            String memLimitErr = WRITE_BUFFER_RATIO + " " + writeBufSizeStr
                    + " requires a positive int number.";
            if (writeBufSizeStr.endsWith("%")) {
                writeBufSizeStr = writeBufSizeStr.substring(0, writeBufSizeStr.length() - 1);
            }
            try {
                if (Integer.parseInt(writeBufSizeStr) < 0) {
                    throw new DdlException(memLimitErr);
                }
            } catch (NumberFormatException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(memLimitErr, e);
                }
                throw new DdlException(memLimitErr);
            }
        }

        if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
            String inputValue = properties.get(ENABLE_MEMORY_OVERCOMMIT);
            String value = inputValue.toLowerCase();
            if (!("true".equals(value) || "false".equals(value))) {
                throw new DdlException(
                        "The value of '" + ENABLE_MEMORY_OVERCOMMIT + "' must be true or false. but input value is "
                                + inputValue);
            }
        }

        if (properties.containsKey(SLOT_MEMORY_POLICY)) {
            String value = properties.get(SLOT_MEMORY_POLICY).toLowerCase();
            if (!AVAILABLE_SLOT_MEMORY_POLICY_VALUES.contains(value)) {
                throw new DdlException("The value of '" + SLOT_MEMORY_POLICY
                        + "' must be one of none, fixed, dynamic.");
            }
        }

        if (properties.containsKey(SCAN_THREAD_NUM)) {
            String value = properties.get(SCAN_THREAD_NUM);
            try {
                int intValue = Integer.parseInt(value);
                if (intValue <= 0 && intValue != -1) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + SCAN_THREAD_NUM + " value is -1 or a positive integer. but input value is "
                                + value);
            }
        }

        int maxRemoteScanNum = -1;
        if (properties.containsKey(MAX_REMOTE_SCAN_THREAD_NUM)) {
            String value = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
            try {
                int intValue = Integer.parseInt(value);
                if (intValue <= 0 && intValue != -1) {
                    throw new NumberFormatException();
                }
                maxRemoteScanNum = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_REMOTE_SCAN_THREAD_NUM
                                + " value is -1 or a positive integer. but input value is " + value);
            }
        }

        int minRemoteScanNum = -1;
        if (properties.containsKey(MIN_REMOTE_SCAN_THREAD_NUM)) {
            String value = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
            try {
                int intValue = Integer.parseInt(value);
                if (intValue <= 0 && intValue != -1) {
                    throw new NumberFormatException();
                }
                minRemoteScanNum = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MIN_REMOTE_SCAN_THREAD_NUM
                                + " value is -1 or a positive integer. but input value is " + value);
            }
        }

        if ((maxRemoteScanNum == -1 && minRemoteScanNum != -1) || (maxRemoteScanNum != -1 && minRemoteScanNum == -1)) {
            throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " and " + MIN_REMOTE_SCAN_THREAD_NUM
                    + " must be specified simultaneously");
        } else if (maxRemoteScanNum < minRemoteScanNum) {
            throw new DdlException(MAX_REMOTE_SCAN_THREAD_NUM + " must bigger or equal " + MIN_REMOTE_SCAN_THREAD_NUM);
        }

        // check queue property
        if (properties.containsKey(MAX_CONCURRENCY)) {
            String inputValue = properties.get(MAX_CONCURRENCY);
            try {
                if (Integer.parseInt(inputValue) < 0) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_CONCURRENCY
                                + " value is an integer greater than or equal to 0, but input value is "
                                + inputValue);
            }
        }
        if (properties.containsKey(MAX_QUEUE_SIZE)) {
            String inputValue = properties.get(MAX_QUEUE_SIZE);
            try {
                if (Integer.parseInt(inputValue) < 0) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MAX_QUEUE_SIZE
                                + " value is an integer greater than or equal to 0, but input value is "
                                + inputValue);
            }
        }
        if (properties.containsKey(QUEUE_TIMEOUT)) {
            String inputValue = properties.get(QUEUE_TIMEOUT);
            try {
                if (Integer.parseInt(inputValue) < 0) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + QUEUE_TIMEOUT
                                + " value is an integer greater than or equal to 0, but input value is "
                                + inputValue);
            }
        }

        int lowWaterMark = MEMORY_LOW_WATERMARK_DEFAULT_VALUE;
        if (properties.containsKey(MEMORY_LOW_WATERMARK)) {
            String lowVal = properties.get(MEMORY_LOW_WATERMARK);
            if (lowVal.endsWith("%")) {
                lowVal = lowVal.substring(0, lowVal.length() - 1);
            }
            try {
                int intValue = Integer.parseInt(lowVal);
                if ((intValue < 1 || intValue > 100)) {
                    throw new NumberFormatException();
                }
                lowWaterMark = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MEMORY_LOW_WATERMARK
                                + " value is an integer value between 1 and 100, but input value is "
                                + lowVal);
            }
        }

        int highWaterMark = MEMORY_HIGH_WATERMARK_DEFAULT_VALUE;
        if (properties.containsKey(MEMORY_HIGH_WATERMARK)) {
            String highVal = properties.get(MEMORY_HIGH_WATERMARK);
            if (highVal.endsWith("%")) {
                highVal = highVal.substring(0, highVal.length() - 1);
            }
            try {
                int intValue = Integer.parseInt(highVal);
                if ((intValue < 1 || intValue > 100)) {
                    throw new NumberFormatException();
                }
                highWaterMark = intValue;
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + MEMORY_HIGH_WATERMARK
                                + " value is an integer value between 1 and 100, but input value is "
                                + highVal);
            }
        }

        if (lowWaterMark > highWaterMark) {
            throw new DdlException(MEMORY_HIGH_WATERMARK + "(" + highWaterMark + ") should bigger than "
                    + MEMORY_LOW_WATERMARK + "(" + lowWaterMark + ")");
        }

        if (properties.containsKey(READ_BYTES_PER_SECOND)) {
            String readBytesVal = properties.get(READ_BYTES_PER_SECOND);
            try {
                long longVal = Long.parseLong(readBytesVal);
                if (longVal <= 0 && longVal != -1) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException(
                        "The allowed " + READ_BYTES_PER_SECOND
                                + " value should be -1 or an positive integer, but input value is "
                                + readBytesVal);
            }
        }

        if (properties.containsKey(REMOTE_READ_BYTES_PER_SECOND)) {
            String readBytesVal = properties.get(REMOTE_READ_BYTES_PER_SECOND);
            try {
                long longVal = Long.parseLong(readBytesVal);
                if (longVal <= 0 && longVal != -1) {
                    throw new NumberFormatException();
                }
            } catch (NumberFormatException e) {
                throw new DdlException("The allowed " + REMOTE_READ_BYTES_PER_SECOND
                        + " value should be -1 or an positive integer, but input value is " + readBytesVal);
            }
        }
    }

    public long getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public WorkloadGroupKey getWorkloadGroupKey() {
        return WorkloadGroupKey.get(this.getComputeGroup(), this.getName());
    }

    public Map<String, String> getProperties() {
        return properties;
    }

    public long getVersion() {
        return version;
    }

    public int getMaxConcurrency() {
        return maxConcurrency;
    }

    public int getMaxQueueSize() {
        return maxQueueSize;
    }

    public int getQueueTimeout() {
        return queueTimeout;
    }

    public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
        List<String> row = new ArrayList<>();
        row.add(String.valueOf(id));
        row.add(name);
        Pair<Integer, Integer> queryQueueDetail = qq != null ? qq.getQueryQueueDetail() : null;
        // skip id,name,running query,waiting query
        for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
            String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
            if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
                row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.first));
            } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
                row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.second));
            } else if (COMPUTE_GROUP.equals(key)) {
                String val = properties.get(key);
                if (!StringUtils.isEmpty(val) && Config.isCloudMode()) {
                    try {
                        String cgName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterNameByClusterId(
                                val);
                        if (!StringUtils.isEmpty(cgName)) {
                            val = cgName;
                        }
                    } catch (Throwable t) {
                        LOG.debug("get compute group failed, ", t);
                    }
                }
                row.add(val);
            } else {
                String val = properties.get(key);
                if (StringUtils.isEmpty(val)) {
                    row.add(ALL_PROPERTIES_DEFAULT_VALUE_MAP.get(key));
                } else if ((CPU_HARD_LIMIT.equals(key) && !"-1".equals(val))
                        || MEMORY_LOW_WATERMARK.equals(key)
                        || MEMORY_HIGH_WATERMARK.equals(key)) {
                    row.add(val + "%");
                } else {
                    row.add(val);
                }
            }
        }
        result.addRow(row);
    }

    public double getMemoryLimitPercentWhenCalSum() {
        return memoryLimitPercent == -1 ? 0 : memoryLimitPercent;
    }

    public double getMemoryLimitPercent() {
        return memoryLimitPercent;
    }

    public String getComputeGroup() {
        String ret = properties.get(COMPUTE_GROUP);
        return StringUtils.isEmpty(ret) ? WorkloadGroupMgr.EMPTY_COMPUTE_GROUP : ret;
    }

    @Override
    public String toString() {
        return GsonUtils.GSON.toJson(this);
    }

    public TPipelineWorkloadGroup toThrift() {
        return new TPipelineWorkloadGroup().setId(id);
    }

    public static TWgSlotMemoryPolicy findSlotPolicyValueByString(String slotPolicy) {
        if (slotPolicy.equalsIgnoreCase("none")) {
            return TWgSlotMemoryPolicy.NONE;
        } else if (slotPolicy.equalsIgnoreCase("fixed")) {
            return TWgSlotMemoryPolicy.FIXED;
        } else if (slotPolicy.equalsIgnoreCase("dynamic")) {
            return TWgSlotMemoryPolicy.DYNAMIC;
        } else {
            throw new RuntimeException("Could not find policy using " + slotPolicy);
        }
    }

    public TopicInfo toTopicInfo() {
        TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
        tWorkloadGroupInfo.setId(this.id);

        tWorkloadGroupInfo.setName(name);
        tWorkloadGroupInfo.setVersion(version);

        String cpuShareStr = properties.get(CPU_SHARE);
        if (cpuShareStr != null) {
            tWorkloadGroupInfo.setCpuShare(Long.valueOf(cpuShareStr));
        }

        String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT);
        if (cpuHardLimitStr != null) {
            tWorkloadGroupInfo.setCpuHardLimit(Integer.valueOf(cpuHardLimitStr));
        }

        String memLimitStr = properties.get(MEMORY_LIMIT);
        if (memLimitStr != null) {
            tWorkloadGroupInfo.setMemLimit(memLimitStr);
        }
        String writeBufferRatioStr = properties.get(WRITE_BUFFER_RATIO);
        if (writeBufferRatioStr != null) {
            tWorkloadGroupInfo.setWriteBufferRatio(Integer.parseInt(writeBufferRatioStr));
        }
        String slotMemoryPolicyStr = properties.get(SLOT_MEMORY_POLICY);
        if (slotMemoryPolicyStr != null) {
            tWorkloadGroupInfo.setSlotMemoryPolicy(findSlotPolicyValueByString(slotMemoryPolicyStr));
        }
        String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
        if (memOvercommitStr != null) {
            tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
        }

        String scanThreadNumStr = properties.get(SCAN_THREAD_NUM);
        if (scanThreadNumStr != null) {
            tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr));
        }

        String maxRemoteScanThreadNumStr = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
        if (maxRemoteScanThreadNumStr != null) {
            tWorkloadGroupInfo.setMaxRemoteScanThreadNum(Integer.parseInt(maxRemoteScanThreadNumStr));
        }

        String minRemoteScanThreadNumStr = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
        if (minRemoteScanThreadNumStr != null) {
            tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr));
        }

        String memoryLowWatermarkStr = properties.get(MEMORY_LOW_WATERMARK);
        if (memoryLowWatermarkStr != null) {
            tWorkloadGroupInfo.setMemoryLowWatermark(Integer.parseInt(memoryLowWatermarkStr));
        }

        String memoryHighWatermarkStr = properties.get(MEMORY_HIGH_WATERMARK);
        if (memoryHighWatermarkStr != null) {
            tWorkloadGroupInfo.setMemoryHighWatermark(Integer.parseInt(memoryHighWatermarkStr));
        }

        String readBytesPerSecStr = properties.get(READ_BYTES_PER_SECOND);
        if (readBytesPerSecStr != null) {
            tWorkloadGroupInfo.setReadBytesPerSecond(Long.valueOf(readBytesPerSecStr));
        }

        String remoteReadBytesPerSecStr = properties.get(REMOTE_READ_BYTES_PER_SECOND);
        if (remoteReadBytesPerSecStr != null) {
            tWorkloadGroupInfo.setRemoteReadBytesPerSecond(Long.valueOf(remoteReadBytesPerSecStr));
        }

        String cgStr = properties.get(COMPUTE_GROUP);
        if (!StringUtils.isEmpty(cgStr)) {
            tWorkloadGroupInfo.setTag(cgStr);
        }

        String totalQuerySlotCountStr = properties.get(MAX_CONCURRENCY);
        if (totalQuerySlotCountStr != null) {
            tWorkloadGroupInfo.setTotalQuerySlotCount(Integer.parseInt(totalQuerySlotCountStr));
        }

        TopicInfo topicInfo = new TopicInfo();
        topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
        return topicInfo;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this);
        Text.writeString(out, json);
    }

    public static WorkloadGroup read(DataInput in) throws IOException {
        String json = Text.readString(in);
        WorkloadGroup workloadGroup = GsonUtils.GSON.fromJson(json, WorkloadGroup.class);
        // "spill_threshold_low_watermark" and "spill_threshold_high_watermark"
        // are renamed and deprecated, replace them with "memory_low_watermark"
        // and "memory_high_watermark"
        for (String key : DEPRECATED_PROPERTIES_NAME.keySet()) {
            if (workloadGroup.properties.containsKey(key)) {
                String value = workloadGroup.properties.get(key);
                workloadGroup.properties.remove(key);
                workloadGroup.properties.put(DEPRECATED_PROPERTIES_NAME.get(key), value);
            }
        }
        return workloadGroup;
    }

    void setMemLimitPercent(Map<String, String> props) {
        String memoryLimitString = props.get(MEMORY_LIMIT);
        this.memoryLimitPercent = "-1".equals(memoryLimitString) ? -1
                : Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() - 1));
    }

    @Override
    public void gsonPostProcess() throws IOException {
        if (properties.containsKey(MEMORY_LIMIT)) {
            setMemLimitPercent(properties);
        }

        if (properties.containsKey(CPU_HARD_LIMIT)) {
            this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT));
        }

        this.resetQueueProperty(this.properties);
    }
}