WorkloadGroupMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.resource.workloadgroup;

import org.apache.doris.analysis.AlterWorkloadGroupStmt;
import org.apache.doris.analysis.CreateWorkloadGroupStmt;
import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.DropWorkloadGroupOperationLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroupMgr;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TopicInfo;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPostProcessable {

    public static final String DEFAULT_GROUP_NAME = "normal";

    public static final Long DEFAULT_GROUP_ID = 1L;

    public static final ImmutableList<String> WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
            .add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
            .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
            .add(WorkloadGroup.WRITE_BUFFER_RATIO)
            .add(WorkloadGroup.SLOT_MEMORY_POLICY)
            .add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
            .add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
            .add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
            .add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
            .add(WorkloadGroup.MEMORY_LOW_WATERMARK).add(WorkloadGroup.MEMORY_HIGH_WATERMARK)
            .add(WorkloadGroup.COMPUTE_GROUP)
            .add(WorkloadGroup.READ_BYTES_PER_SECOND).add(WorkloadGroup.REMOTE_READ_BYTES_PER_SECOND)
            .add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
            .build();

    private static final Logger LOG = LogManager.getLogger(WorkloadGroupMgr.class);
    @SerializedName(value = "idToWorkloadGroup")
    private final Map<Long, WorkloadGroup> idToWorkloadGroup = Maps.newHashMap();
    private final Map<WorkloadGroupKey, WorkloadGroup> keyToWorkloadGroup = Maps.newHashMap();
    private final Map<Long, QueryQueue> idToQueryQueue = Maps.newHashMap();
    private final ResourceProcNode procNode = new ResourceProcNode();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public static final String EMPTY_COMPUTE_GROUP = "";

    @Override
    protected void runAfterCatalogReady() {
        try {
            resetQueryQueueProp();
        } catch (Throwable e) {
            LOG.warn("reset query queue failed, ", e);
        }
    }

    public void resetQueryQueueProp() {
        List<QueryQueue> newPropList = new ArrayList<>();
        Map<Long, QueryQueue> currentQueueCopyMap = new HashMap<>();
        readLock();
        try {
            for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
                WorkloadGroup wg = entry.getValue();
                QueryQueue tmpQ = new QueryQueue(wg.getId(), wg.getMaxConcurrency(),
                        wg.getMaxQueueSize(), wg.getQueueTimeout(), wg.getVersion());
                newPropList.add(tmpQ);
            }
            for (Map.Entry<Long, QueryQueue> entry : idToQueryQueue.entrySet()) {
                currentQueueCopyMap.put(entry.getKey(), entry.getValue());
            }
        } finally {
            readUnlock();
        }

        for (QueryQueue newPropQq : newPropList) {
            QueryQueue currentQueryQueue = currentQueueCopyMap.get(newPropQq.getWgId());
            if (currentQueryQueue == null) {
                continue;
            }
            if (newPropQq.getPropVersion() > currentQueryQueue.getPropVersion()) {
                currentQueryQueue.resetQueueProperty(newPropQq.getMaxConcurrency(), newPropQq.getMaxQueueSize(),
                        newPropQq.getQueueTimeout(), newPropQq.getPropVersion());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(currentQueryQueue.debugString()); // for test debug
            }
        }
    }

    public WorkloadGroupMgr() {
        super("workload-group-thread", Config.query_queue_update_interval_ms);
    }

    public static WorkloadGroupMgr read(DataInput in) throws IOException {
        String json = Text.readString(in);
        WorkloadGroupMgr ret = GsonUtils.GSON.fromJson(json, WorkloadGroupMgr.class);
        return ret;
    }

    private void readLock() {
        lock.readLock().lock();
    }

    private void readUnlock() {
        lock.readLock().unlock();
    }

    private void writeLock() {
        lock.writeLock().lock();
    }

    private void writeUnlock() {
        lock.writeLock().unlock();
    }

    private WorkloadGroup getWorkloadGroupByComputeGroupUnlock(WorkloadGroupKey wgKey)
            throws DdlException {
        WorkloadGroup wg = keyToWorkloadGroup.get(wgKey);
        if (wg == null) {
            throw new DdlException(
                    "Can not find workload group " + wgKey.getWorkloadGroupName() + " in compute group "
                            + wgKey.getComputeGroup() + ".");
        }
        return wg;
    }

    public List<TPipelineWorkloadGroup> getWorkloadGroup(ConnectContext context) throws UserException {
        String wgName = getWorkloadGroupNameAndCheckPriv(context);
        Set<String> cgNames = context.getComputeGroup().getNames();

        List<TPipelineWorkloadGroup> workloadGroups = Lists.newArrayList();
        readLock();
        try {
            for (String cgName : cgNames) {
                WorkloadGroup workloadGroup = getWorkloadGroupByComputeGroupUnlock(
                        WorkloadGroupKey.get(cgName, wgName));
                workloadGroups.add(workloadGroup.toThrift());
            }
            context.setWorkloadGroupName(wgName);
        } finally {
            readUnlock();
        }
        return workloadGroups;
    }

    public List<TopicInfo> getPublishTopicInfo() {
        List<TopicInfo> workloadGroups = new ArrayList();
        readLock();
        try {
            for (WorkloadGroup wg : idToWorkloadGroup.values()) {
                workloadGroups.add(wg.toTopicInfo());
            }
        } finally {
            readUnlock();
        }
        return workloadGroups;
    }

    public QueryQueue getWorkloadGroupQueryQueue(Set<Long> wgIdSet) throws UserException {
        writeLock();
        try {
            QueryQueue queryQueue = null;
            for (long wgId : wgIdSet) {
                WorkloadGroup wg = idToWorkloadGroup.get(wgId);
                if (wg == null) {
                    continue;
                }
                QueryQueue tmpQueue = idToQueryQueue.get(wg.getId());
                if (tmpQueue == null) {
                    tmpQueue = new QueryQueue(wg.getId(), wg.getMaxConcurrency(), wg.getMaxQueueSize(),
                            wg.getQueueTimeout(), wg.getVersion());
                    idToQueryQueue.put(wg.getId(), tmpQueue);
                    queryQueue = tmpQueue;
                    break;
                }
                if (queryQueue == null) {
                    queryQueue = tmpQueue;
                } else {
                    Pair<Integer, Integer> detail1 = queryQueue.getQueryQueueDetail();
                    Pair<Integer, Integer> detail2 = tmpQueue.getQueryQueueDetail();
                    if (detail2.first < detail1.first) {
                        queryQueue = tmpQueue;
                    }
                }
            }
            if (queryQueue == null) {
                throw new DdlException("Can not find query queue for workload group: " + wgIdSet);
            }
            return queryQueue;
        } finally {
            writeUnlock();
        }
    }

    public Map<String, List<String>> getWorkloadGroupQueryDetail() {
        Map<String, List<String>> ret = Maps.newHashMap();
        readLock();
        try {
            for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
                Long wgId = entry.getKey();
                WorkloadGroup wg = entry.getValue();
                QueryQueue qq = idToQueryQueue.get(wgId);
                List<String> valueList = new ArrayList<>(2);
                if (qq == null) {
                    valueList.add("0");
                    valueList.add("0");
                } else {
                    Pair<Integer, Integer> qdtail = qq.getQueryQueueDetail();
                    valueList.add(String.valueOf(qdtail.first));
                    valueList.add(String.valueOf(qdtail.second));
                }
                ret.put(wg.getWorkloadGroupKey().toString(), valueList);
            }
        } finally {
            readUnlock();
        }
        return ret;
    }

    private String getWorkloadGroupNameAndCheckPriv(ConnectContext context) throws AnalysisException {
        String groupName = context.getSessionVariable().getWorkloadGroup();
        if (Strings.isNullOrEmpty(groupName)) {
            groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(context.getQualifiedUser());
        }
        if (Strings.isNullOrEmpty(groupName)) {
            groupName = DEFAULT_GROUP_NAME;
        }
        if (!Env.getCurrentEnv().getAccessManager().checkWorkloadGroupPriv(context, groupName, PrivPredicate.USAGE)) {
            ErrorReport.reportAnalysisException(
                    "Access denied; you need (at least one of) the %s privilege(s) to use workload group '%s'. "
                            + "User: %s",
                    ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN", groupName,
                    context.getCurrentUserIdentity());
        }
        return groupName;
    }

    public void createWorkloadGroup(String computeGroup, WorkloadGroup workloadGroup, boolean isIfNotExists)
            throws DdlException {
        WorkloadGroupKey wgKey = WorkloadGroupKey.get(computeGroup, workloadGroup.getName());
        writeLock();
        try {
            if (keyToWorkloadGroup.containsKey(wgKey)) {
                if (isIfNotExists) {
                    return;
                }
                throw new DdlException(
                        "Compute group " + wgKey.getComputeGroup() + " already has workload group "
                                + wgKey.getWorkloadGroupName() + ".");
            }
            checkGlobalUnlock(workloadGroup, null);
            keyToWorkloadGroup.put(wgKey, workloadGroup);
            idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
            Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup);
        } finally {
            writeUnlock();
        }
        LOG.info("Create workload group {} for compute group {} success.", workloadGroup, computeGroup);
    }

    public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlException {
        throw new DdlException("Unsupported create statement");
    }

    // NOTE: used for checking sum value of 100%  for cpu_hard_limit and memory_limit
    //  when create/alter workload group with same tag.
    //  when oldWg is not null it means caller is an alter stmt.
    private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException {
        String newWgCg = newWg.getComputeGroup();

        double sumOfAllMemLimit = 0;
        int wgNumOfCurrentCg = 0;
        boolean isAlterStmt = oldWg != null;
        boolean isCreateStmt = !isAlterStmt;

        // 1 get sum value of all wg which has same tag without current wg
        for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
            WorkloadGroup wg = entry.getValue();
            String curWgCg = wg.getComputeGroup();

            if (newWgCg.equals(entry.getValue().getComputeGroup())) {
                wgNumOfCurrentCg++;
            }

            if (isAlterStmt && entry.getKey() == oldWg.getId()) {
                continue;
            }

            if (!newWgCg.equals(curWgCg)) {
                continue;
            }

            if (wg.getMemoryLimitPercentWhenCalSum() > 0) {
                sumOfAllMemLimit += wg.getMemoryLimitPercentWhenCalSum();
            }
        }

        // 2 sum current wg value
        sumOfAllMemLimit += newWg.getMemoryLimitPercentWhenCalSum();

        // 3 check total sum
        if (sumOfAllMemLimit > 100.0 + 1e-6) {
            throw new DdlException(
                    "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within compute group " + (
                            newWgCg)
                            + " can not be greater than 100.0%. current sum val:" + sumOfAllMemLimit);
        }

        // 4 check wg num
        if (isCreateStmt && wgNumOfCurrentCg >= Config.workload_group_max_num) {
            throw new DdlException(
                    "Workload group number in Compute Group " + newWgCg + "can not exceed "
                            + Config.workload_group_max_num);
        }
    }

    public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException {
        throw new DdlException("Unsupported alter statement");
    }

    public void alterWorkloadGroup(String computeGroup, String workloadGroupName, Map<String, String> properties)
            throws DdlException {
        if (properties.size() == 0) {
            throw new DdlException("Alter workload group should contain at least one property");
        }

        WorkloadGroup newWorkloadGroup;
        WorkloadGroupKey wgKey = WorkloadGroupKey.get(computeGroup, workloadGroupName);
        writeLock();
        try {
            WorkloadGroup currentWorkloadGroup = getWorkloadGroupByComputeGroupUnlock(wgKey);
            newWorkloadGroup = WorkloadGroup.copyAndUpdate(currentWorkloadGroup, properties);
            checkGlobalUnlock(newWorkloadGroup, currentWorkloadGroup);
            keyToWorkloadGroup.put(wgKey, newWorkloadGroup);
            idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup);
            Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup);
        } finally {
            writeUnlock();
        }
        LOG.info("Alter workload group {} for compute group {} success: {}", newWorkloadGroup, computeGroup);
    }

    public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException {
        throw new DdlException("Unsupported drop statement.");
    }

    public void dropWorkloadGroup(String computeGroup, String workloadGroupName, boolean ifExists)
            throws DdlException {
        if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
            throw new DdlException(
                    "Dropping workload group " + workloadGroupName + " is not allowed.");
        }

        // if a workload group exists in user property, it should not be dropped
        // user need to reset user property first
        Pair<Boolean, String> ret = Env.getCurrentEnv().getAuth().isWorkloadGroupInUse(workloadGroupName);
        if (ret.first) {
            throw new DdlException("Workload group " + workloadGroupName + " is set for user " + ret.second
                    + ", you can reset the user's property(eg, "
                    + "set property for " + ret.second + " 'default_workload_group'='xxx'; ), "
                    + "then you can drop the group.");
        }

        WorkloadGroupKey wgKey = WorkloadGroupKey.get(computeGroup, workloadGroupName);

        // A group with related policies should not be deleted.
        Long wgId = null;
        readLock();
        try {
            WorkloadGroup wg = keyToWorkloadGroup.get(wgKey);
            if (wg != null) {
                wgId = wg.getId();
            }
        } finally {
            readUnlock();
        }
        if (wgId != null) {
            boolean groupHasPolicy = Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
                    .checkWhetherGroupHasPolicy(wgId.longValue());
            if (groupHasPolicy) {
                throw new DdlException(
                        "Workload group " + workloadGroupName + " can't be dropped, because it has related policy");
            }
        }

        writeLock();
        try {
            if (!keyToWorkloadGroup.containsKey(wgKey)) {
                if (ifExists) {
                    return;
                }
                throw new DdlException(
                        "Can not find workload group " + wgKey.getWorkloadGroupName() + " in compute group "
                                + wgKey.getComputeGroup() + ".");
            }
            WorkloadGroup workloadGroup = keyToWorkloadGroup.get(wgKey);
            keyToWorkloadGroup.remove(wgKey);
            idToWorkloadGroup.remove(workloadGroup.getId());
            idToQueryQueue.remove(workloadGroup.getId());
            Env.getCurrentEnv().getEditLog()
                    .logDropWorkloadGroup(new DropWorkloadGroupOperationLog(workloadGroup.getId()));
        } finally {
            writeUnlock();
        }
        LOG.info("Drop workload group success: {} for compute group {}", wgKey.getWorkloadGroupName(),
                wgKey.getComputeGroup());
    }

    private void insertWorkloadGroup(WorkloadGroup workloadGroup) {
        writeLock();
        try {
            LOG.info("[init_wg] replay before, id size: {}, name size: {}, id map: {}, name map: {}",
                    idToWorkloadGroup.size(), keyToWorkloadGroup.size(), idToWorkloadGroup, keyToWorkloadGroup);
            idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
            keyToWorkloadGroup.put(workloadGroup.getWorkloadGroupKey(), workloadGroup);
            LOG.info("[init_wg] replay after, id size: {}, name size: {}, id map: {}, name map: {}",
                    idToWorkloadGroup.size(), keyToWorkloadGroup.size(), idToWorkloadGroup, keyToWorkloadGroup);
        } finally {
            writeUnlock();
        }
    }

    public void replayCreateWorkloadGroup(WorkloadGroup workloadGroup) {
        insertWorkloadGroup(workloadGroup);
    }

    public void replayAlterWorkloadGroup(WorkloadGroup workloadGroup) {
        insertWorkloadGroup(workloadGroup);
    }

    public void replayDropWorkloadGroup(DropWorkloadGroupOperationLog operationLog) {
        long id = operationLog.getId();
        writeLock();
        try {
            if (!idToWorkloadGroup.containsKey(id)) {
                return;
            }
            WorkloadGroup workloadGroup = idToWorkloadGroup.get(id);
            keyToWorkloadGroup.remove(workloadGroup.getWorkloadGroupKey());
            idToWorkloadGroup.remove(id);
        } finally {
            writeUnlock();
        }
    }

    public List<List<String>> getResourcesInfo(PatternMatcher matcher) {
        UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity();
        List<List<String>> rows = procNode.fetchResult(currentUserIdentity).getRows();
        for (Iterator<List<String>> it = rows.iterator(); it.hasNext(); ) {
            List<String> row = it.next();
            if (matcher != null && !matcher.match(row.get(1))) {
                it.remove();
            }
        }
        return rows;
    }

    public List<List<String>> getResourcesInfo(TUserIdentity tCurrentUserIdentity) {
        UserIdentity currentUserIdentity = UserIdentity.fromThrift(tCurrentUserIdentity);
        return procNode.fetchResult(currentUserIdentity).getRows();
    }

    public Map<Long, String> getIdToNameMap() {
        Map<Long, String> ret = Maps.newHashMap();
        readLock();
        try {
            for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
                ret.put(entry.getKey(), entry.getValue().getName());
            }
            return ret;
        } finally {
            readUnlock();
        }
    }

    public String getWorkloadGroupNameById(Long id) {
        readLock();
        try {
            WorkloadGroup wg = idToWorkloadGroup.get(id);
            if (wg == null) {
                return null;
            }
            return wg.getName();
        } finally {
            readUnlock();
        }
    }

    // for ut
    public Map<WorkloadGroupKey, WorkloadGroup> getNameToWorkloadGroup() {
        return keyToWorkloadGroup;
    }

    // for ut
    public Map<Long, WorkloadGroup> getIdToWorkloadGroup() {
        return idToWorkloadGroup;
    }

    // for ut
    public Map<Long, QueryQueue> getIdToQueryQueue() {
        return idToQueryQueue;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this);
        Text.writeString(out, json);
    }

    @Override
    public void gsonPostProcess() throws IOException {
        LOG.info("[init_wg] gson before, id size: {}, name size: {}, id map: {}, name map: {}",
                idToWorkloadGroup.size(), keyToWorkloadGroup.size(), idToWorkloadGroup, keyToWorkloadGroup);
        for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
            keyToWorkloadGroup.put(entry.getValue().getWorkloadGroupKey(), entry.getValue());
        }
        LOG.info("[init_wg] gson before, id size: {}, name size: {}, id map: {}, name map: {}",
                idToWorkloadGroup.size(), keyToWorkloadGroup.size(), idToWorkloadGroup, keyToWorkloadGroup);
    }

    public class ResourceProcNode {
        public ProcResult fetchResult(UserIdentity currentUserIdentity) {
            BaseProcResult result = new BaseProcResult();
            result.setNames(WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES);
            readLock();
            try {
                for (WorkloadGroup workloadGroup : idToWorkloadGroup.values()) {
                    if (!Env.getCurrentEnv().getAccessManager().checkWorkloadGroupPriv(currentUserIdentity,
                            workloadGroup.getName(), PrivPredicate.SHOW_WORKLOAD_GROUP)) {
                        continue;
                    }
                    workloadGroup.getProcNodeData(result, idToQueryQueue.get(workloadGroup.getId()));
                }
            } finally {
                readUnlock();
            }
            return result;
        }
    }


    public List<WorkloadGroup> getOldWorkloadGroup() {
        List<WorkloadGroup> oldWgList = Lists.newArrayList();
        readLock();
        try {
            for (Map.Entry<WorkloadGroupKey, WorkloadGroup> entry : keyToWorkloadGroup.entrySet()) {
                if (EMPTY_COMPUTE_GROUP.equals(entry.getKey().getComputeGroup())) {
                    oldWgList.add(entry.getValue());
                }
            }
        } finally {
            readUnlock();
        }
        return oldWgList;
    }

    public void checkWorkloadGroup() {
        try {
            Set<String> cgIds = Env.getCurrentEnv().getComputeGroupMgr().getAllComputeGroupIds();
            if (cgIds.size() == 0) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("no compute group can be found, skip check workload group.");
                }
                return;
            }
            createNormalWorkloadGroup(cgIds);
            dropTombstoneNormalWorkloadGroup(cgIds);
        } catch (Throwable e) {
            LOG.warn("check workload group failed, ", e);
        }
    }

    private void createNormalWorkloadGroup(Set<String> cgIds) {
        Set<String> cgWithoutNormalWg = Sets.newHashSet();
        readLock();
        try {
            for (String cgId : cgIds) {
                WorkloadGroupKey wgKey = WorkloadGroupKey.get(cgId, DEFAULT_GROUP_NAME);
                if (!keyToWorkloadGroup.containsKey(wgKey)) {
                    cgWithoutNormalWg.add(cgId);
                }
            }
        } finally {
            readUnlock();
        }

        if (cgWithoutNormalWg.size() != 0) {
            writeLock();
            try {
                for (String cgId : cgWithoutNormalWg) {
                    Map<String, String> properties = Maps.newHashMap();
                    properties.put(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT, "true");
                    properties.put(WorkloadGroup.COMPUTE_GROUP, cgId);
                    WorkloadGroup normalWg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), DEFAULT_GROUP_NAME,
                            properties);
                    idToWorkloadGroup.put(normalWg.getId(), normalWg);
                    keyToWorkloadGroup.put(normalWg.getWorkloadGroupKey(), normalWg);
                    Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(normalWg);
                    LOG.info("[init_wg]Create default workload group {} for {} success.", normalWg, cgId);
                }
            } finally {
                writeUnlock();
            }
        }
    }

    // tombstone workload group means its compute group is dropped.
    // normal wg is managed by doris, so tombstone normal wg
    // should be dropped by doris too.
    private void dropTombstoneNormalWorkloadGroup(Set<String> currentCgIds) {
        // 1 find tombstone wg
        List<WorkloadGroup> tombstoneWgList = Lists.newArrayList();
        readLock();
        try {
            for (WorkloadGroup wg : idToWorkloadGroup.values()) {
                WorkloadGroupKey wgKey = wg.getWorkloadGroupKey();
                if (DEFAULT_GROUP_NAME.equals(wgKey.getWorkloadGroupName())
                        && !EMPTY_COMPUTE_GROUP.equals(wgKey.getComputeGroup())
                        && !currentCgIds.contains(wgKey.getComputeGroup())) {
                    tombstoneWgList.add(wg);
                }
            }
        } finally {
            readUnlock();
        }

        // 2 drop tombstone wg
        if (tombstoneWgList.size() != 0) {
            writeLock();
            try {
                for (WorkloadGroup wg : tombstoneWgList) {
                    keyToWorkloadGroup.remove(wg.getWorkloadGroupKey());
                    idToWorkloadGroup.remove(wg.getId());
                    idToQueryQueue.remove(wg.getId());
                    Env.getCurrentEnv().getEditLog()
                            .logDropWorkloadGroup(new DropWorkloadGroupOperationLog(wg.getId()));
                    LOG.info("Drop tombstone normal workload group {} success.", wg);
                }
            } finally {
                writeUnlock();
            }
        }
    }

    public void bindWorkloadGroupToComputeGroup(Set<String> cgSet, WorkloadGroup oldWg) {
        writeLock();
        try {
            WorkloadGroupKey oldKey = WorkloadGroupKey.get(EMPTY_COMPUTE_GROUP, oldWg.getName());
            // it means old compute group has been dropped, just return;
            if (!keyToWorkloadGroup.containsKey(oldKey)) {
                LOG.info("[init_wg]Old workload group {} has been dropped, skip it.", oldWg.getName());
                return;
            }
            // create new workload group for all compute group.
            for (String computeGroup : cgSet) {
                WorkloadGroupKey newKey = WorkloadGroupKey.get(computeGroup, oldWg.getName());
                if (keyToWorkloadGroup.containsKey(newKey)) {
                    LOG.info("[init_wg]Workload group {} already exists in compute group {}, skip it.",
                            oldWg.getName(), computeGroup);
                    continue;
                }
                Map<String, String> newProp = Maps.newHashMap();
                for (Map.Entry<String, String> entry : oldWg.getProperties().entrySet()) {
                    newProp.put(entry.getKey(), entry.getValue());
                }
                newProp.put(WorkloadGroup.COMPUTE_GROUP, computeGroup);
                WorkloadGroup newWg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), oldWg.getName(),
                        newProp);
                keyToWorkloadGroup.put(newKey, newWg);
                idToWorkloadGroup.put(newWg.getId(), newWg);
                Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(newWg);
                LOG.info("[init_wg]Create workload group {} for compute group {} success.", oldWg.getName(),
                        computeGroup);
            }

            // drop old workload group
            keyToWorkloadGroup.remove(oldKey);
            idToWorkloadGroup.remove(oldWg.getId());
            idToQueryQueue.remove(oldWg.getId());
            Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new DropWorkloadGroupOperationLog(oldWg.getId()));
            LOG.info("[init_wg]Drop old workload group {} success.", oldWg);
        } catch (Throwable t) {
            LOG.error("[init_wg]Error happens when drop old workload group, {}, {}", cgSet, oldWg.getName(), t);
        } finally {
            writeUnlock();
        }
    }

    public boolean isWorkloadGroupExists(String wgName) {
        readLock();
        try {
            for (WorkloadGroup wg : idToWorkloadGroup.values()) {
                if (wg.getName().equals(wgName)) {
                    return true;
                }
            }
            return false;
        } finally {
            readUnlock();
        }
    }

    public boolean isWorkloadGroupExists(WorkloadGroupKey wgKey) {
        readLock();
        try {
            return keyToWorkloadGroup.containsKey(wgKey);
        } finally {
            readUnlock();
        }
    }

    public void createNormalWorkloadGroupForUT() {
        Map<String, String> properties = Maps.newHashMap();
        properties.put(WorkloadGroup.COMPUTE_GROUP, Tag.VALUE_DEFAULT_TAG);
        WorkloadGroup normalWg = new WorkloadGroup(DEFAULT_GROUP_ID, DEFAULT_GROUP_NAME,
                properties);
        idToWorkloadGroup.put(normalWg.getId(), normalWg);
        keyToWorkloadGroup.put(normalWg.getWorkloadGroupKey(), normalWg);
    }

    public void convertOldWgToNewWg() {
        try {
            List<WorkloadGroup> oldWgList = getOldWorkloadGroup();
            if (oldWgList.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[init_wg]There is no old workload group, just return.");
                }
                return;
            }

            ComputeGroupMgr cgMgr = Env.getCurrentEnv().getComputeGroupMgr();
            Set<String> idSet = cgMgr.getAllComputeGroupIds();
            if (idSet.size() == 0) {
                return;
            }

            readLock();
            try {
                LOG.info("[init_wg] bind wg before, id size: {}, name size: {}, id map: {}, name map: {}",
                        idToWorkloadGroup.size(), keyToWorkloadGroup.size(), idToWorkloadGroup, keyToWorkloadGroup);
            } finally {
                readUnlock();
            }

            for (WorkloadGroup wg : oldWgList) {
                bindWorkloadGroupToComputeGroup(idSet, wg);
            }

            readLock();
            try {
                LOG.info("[init_wg] bind wg after, id size: {}, name size: {}, id map: {}, name map: {}",
                        idToWorkloadGroup.size(), keyToWorkloadGroup.size(), idToWorkloadGroup, keyToWorkloadGroup);
            } finally {
                readUnlock();
            }
        } catch (Throwable t) {
            LOG.warn("[init_wg]error happens when convert old wg to new wg, ", t);
        }
    }

}