PolicyMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.policy;

import org.apache.doris.analysis.AlterPolicyStmt;
import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.ShowPolicyStmt;
import org.apache.doris.analysis.ShowStoragePolicyUsingStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Table;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.PushStoragePolicyTask;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
 * Management policy and cache it.
 **/
public class PolicyMgr implements Writable {
    private static final Logger LOG = LogManager.getLogger(PolicyMgr.class);

    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

    @SerializedName(value = "typeToPolicyMap")
    private Map<PolicyTypeEnum, List<Policy>> typeToPolicyMap = Maps.newConcurrentMap();

    // ctlName -> dbName -> tableName -> List<RowPolicy>
    private Map<String, Map<String, Map<String, List<RowPolicy>>>> tablePolicies = Maps.newConcurrentMap();

    private void writeLock() {
        lock.writeLock().lock();
    }

    private void writeUnlock() {
        lock.writeLock().unlock();
    }

    private void readLock() {
        lock.readLock().lock();
    }

    private void readUnlock() {
        lock.readLock().unlock();
    }

    /**
     * Create default storage policy used by master.
     **/
    public void createDefaultStoragePolicy() {
        writeLock();
        try {
            Optional<Policy> hasDefault = findPolicy(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME, PolicyTypeEnum.STORAGE);
            if (hasDefault.isPresent()) {
                // already exist default storage policy, just return.
                return;
            }
            long policyId = Env.getCurrentEnv().getNextId();
            StoragePolicy defaultStoragePolicy = new StoragePolicy(policyId, StoragePolicy.DEFAULT_STORAGE_POLICY_NAME);
            unprotectedAdd(defaultStoragePolicy);
            Env.getCurrentEnv().getEditLog().logCreatePolicy(defaultStoragePolicy);
        } finally {
            writeUnlock();
        }
        LOG.info("Create default storage success.");
    }

    public void createPolicy(Policy policy, boolean isIfNotExists) throws UserException {
        writeLock();
        try {
            boolean storagePolicyExists = false;
            if (PolicyTypeEnum.STORAGE == policy.getType()) {
                // The name of the storage policy remains globally unique until it is renamed by user.
                // So we could just compare the policy name to check if there are redundant ones.
                // Otherwise two storage policy share one same name but with different resource name
                // will not be filtered. See github #25025 for more details.
                storagePolicyExists = getPoliciesByType(PolicyTypeEnum.STORAGE)
                        .stream().anyMatch(p -> p.getPolicyName().equals(policy.getPolicyName()));
            }
            if (storagePolicyExists || existPolicy(policy)) {
                if (isIfNotExists) {
                    return;
                }
                throw new DdlException("the policy " + policy.getPolicyName() + " already create");
            }
            unprotectedAdd(policy);
            Env.getCurrentEnv().getEditLog().logCreatePolicy(policy);
        } finally {
            writeUnlock();
        }
    }

    /**
     * Create policy through http api.
     **/
    public void addPolicy(Policy policy) throws UserException {
        writeLock();
        try {
            boolean storagePolicyExists = false;
            if (PolicyTypeEnum.STORAGE == policy.getType()) {
                // The name of the storage policy remains globally unique until it is renamed by user.
                // So we could just compare the policy name to check if there are redundant ones.
                // Otherwise two storage policy share one same name but with different resource name
                // will not be filtered. See github #25025 for more details.
                storagePolicyExists = getPoliciesByType(PolicyTypeEnum.STORAGE)
                        .stream().anyMatch(p -> p.getPolicyName().equals(policy.getPolicyName()));
            }
            if (storagePolicyExists || existPolicy(policy)) {
                throw new DdlException("the policy " + policy.getPolicyName() + " already create");
            }
            unprotectedAdd(policy);
            Env.getCurrentEnv().getEditLog().logCreatePolicy(policy);
        } finally {
            writeUnlock();
        }
    }

    public void dropPolicy(DropPolicyLog dropPolicyLog, boolean ifExists) throws DdlException, AnalysisException {
        if (dropPolicyLog.getType() == PolicyTypeEnum.STORAGE) {
            List<Database> databases = Env.getCurrentEnv().getInternalCatalog().getDbs();
            for (Database db : databases) {
                List<Table> tables = db.getTables();
                for (Table table : tables) {
                    if (table instanceof OlapTable) {
                        OlapTable olapTable = (OlapTable) table;
                        String tableName = table.getDisplayName();
                        PartitionInfo partitionInfo = olapTable.getPartitionInfo();
                        for (Long partitionId : olapTable.getPartitionIds()) {
                            String policyName = partitionInfo.getDataProperty(partitionId).getStoragePolicy();
                            if (policyName.equals(dropPolicyLog.getPolicyName())) {
                                throw new DdlException("the policy " + policyName + " is used by table: "
                                    + tableName);
                            }
                        }
                    }
                }
            }
        }
        writeLock();
        try {
            if (!existPolicy(dropPolicyLog)) {
                if (ifExists) {
                    return;
                }
                throw new DdlException("the policy " + dropPolicyLog.getPolicyName() + " not exist");
            }
            unprotectedDrop(dropPolicyLog);
            Env.getCurrentEnv().getEditLog().logDropPolicy(dropPolicyLog);
        } finally {
            writeUnlock();
        }
    }

    /**
     * Drop policy through stmt.
     **/
    public void dropPolicy(DropPolicyStmt stmt) throws DdlException, AnalysisException {
        DropPolicyLog dropPolicyLog = DropPolicyLog.fromDropStmt(stmt);
        dropPolicy(dropPolicyLog, stmt.isIfExists());
    }

    /**
     * Check whether the policy exist.
     *
     * @param checkedPolicy policy condition to check
     * @return exist or not
     */
    public boolean existPolicy(Policy checkedPolicy) {
        readLock();
        try {
            List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
            return policies.stream().anyMatch(policy -> policy.matchPolicy(checkedPolicy));
        } finally {
            readUnlock();
        }
    }

    /**
     * CCheck whether the policy exist for the DropPolicyLog.
     *
     * @param checkedDropPolicy policy log condition to check
     * @return exist or not
     */
    private boolean existPolicy(DropPolicyLog checkedDropPolicy) {
        readLock();
        try {
            List<Policy> policies = getPoliciesByType(checkedDropPolicy.getType());
            return policies.stream().anyMatch(policy -> policy.matchPolicy(checkedDropPolicy));
        } finally {
            readUnlock();
        }
    }

    /**
     * Get policy by type and name.
     *
     * @param checkedPolicy condition to get policy
     * @return Policy in typeToPolicyMap
     */
    public Policy getPolicy(Policy checkedPolicy) {
        readLock();
        try {
            List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
            for (Policy policy : policies) {
                if (policy.matchPolicy(checkedPolicy)) {
                    return policy;
                }
            }
            return null;
        } finally {
            readUnlock();
        }
    }

    public List<Policy> getCopiedPoliciesByType(PolicyTypeEnum policyType) {
        readLock();
        try {
            return ImmutableList.copyOf(getPoliciesByType(policyType));
        } finally {
            readUnlock();
        }
    }

    private List<Policy> getPoliciesByType(PolicyTypeEnum policyType) {
        if (typeToPolicyMap == null) {
            return new ArrayList<>();
        }
        return typeToPolicyMap.getOrDefault(policyType, new ArrayList<>());
    }

    public void replayCreate(Policy policy) {
        // for compatible
        if (policy instanceof RowPolicy) {
            RowPolicy rowPolicy = (RowPolicy) policy;
            if (StringUtils.isEmpty(rowPolicy.getCtlName())) {
                Optional<Database> db = Env.getCurrentEnv().getInternalCatalog().getDb(rowPolicy.getDbId());
                if (!db.isPresent()) {
                    LOG.warn("db may be dropped,ignore CreatePolicyLog. dbId:" + rowPolicy.getDbId());
                    return;
                }
                Optional<Table> table = db.get().getTable(rowPolicy.getTableId());
                if (!table.isPresent()) {
                    LOG.warn("table may be dropped,ignore CreatePolicyLog. tableId:" + rowPolicy.getTableId());
                    return;
                }
                rowPolicy.setCtlName(InternalCatalog.INTERNAL_CATALOG_NAME);
                rowPolicy.setDbName(db.get().getName());
                rowPolicy.setTableName(table.get().getName());
            }
        }
        unprotectedAdd(policy);
        if (policy instanceof StoragePolicy) {
            ((StoragePolicy) policy).addResourceReference();
        }
        LOG.info("replay create policy: {}", policy);
    }

    private void unprotectedAdd(Policy policy) {
        if (policy == null) {
            return;
        }
        List<Policy> dbPolicies = getPoliciesByType(policy.getType());
        dbPolicies.add(policy);
        typeToPolicyMap.put(policy.getType(), dbPolicies);
        if (PolicyTypeEnum.ROW == policy.getType()) {
            addTablePolicies((RowPolicy) policy);
        }

    }

    public void replayDrop(DropPolicyLog log) {
        // for compatible
        if (log.getType() == PolicyTypeEnum.ROW && StringUtils.isEmpty(log.getCtlName())) {
            Optional<Database> db = Env.getCurrentEnv().getInternalCatalog().getDb(log.getDbId());
            if (!db.isPresent()) {
                LOG.warn("db may be dropped,ignore DropPolicyLog. dbId:" + log.getDbId());
                return;
            }
            Optional<Table> table = db.get().getTable(log.getTableId());
            if (!table.isPresent()) {
                LOG.warn("table may be dropped,ignore DropPolicyLog. tableId:" + log.getTableId());
                return;
            }
            log.setCtlName(InternalCatalog.INTERNAL_CATALOG_NAME);
            log.setDbName(db.get().getName());
            log.setTableName(table.get().getName());
        }
        unprotectedDrop(log);
        LOG.info("replay drop policy log: {}", log);
    }

    public void replayStoragePolicyAlter(StoragePolicy log) {
        List<Policy> policies = getPoliciesByType(log.getType());
        policies.removeIf(policy -> log.getPolicyName().equals(policy.getPolicyName()));
        policies.add(log);
        typeToPolicyMap.put(log.getType(), policies);
        LOG.info("replay alter policy log: {}", log);
    }

    private void unprotectedDrop(DropPolicyLog log) {
        List<Policy> policies = getPoliciesByType(log.getType());
        policies.removeIf(policy -> {
            if (policy.matchPolicy(log)) {
                if (policy instanceof StoragePolicy) {
                    ((StoragePolicy) policy).removeResourceReference();
                    StoragePolicy storagePolicy = (StoragePolicy) policy;
                    LOG.info("the policy {} with id {} resource {} has been dropped",
                            storagePolicy.getPolicyName(), storagePolicy.getId(), storagePolicy.getStorageResource());
                }
                if (policy instanceof RowPolicy) {
                    dropTablePolicies((RowPolicy) policy);
                }
                return true;
            }
            return false;
        });
        typeToPolicyMap.put(log.getType(), policies);
    }

    public List<RowPolicy> getUserPolicies(String ctlName, String dbName, String tableName, UserIdentity user) {
        List<RowPolicy> res = Lists.newArrayList();
        // Make a judgment in advance to reduce the number of times to obtain getRoles
        if (!tablePolicies.containsKey(ctlName) || !tablePolicies.get(ctlName).containsKey(dbName)
                || !tablePolicies.get(ctlName).get(dbName).containsKey(tableName)) {
            return res;
        }
        Set<String> roles = Env.getCurrentEnv().getAccessManager().getAuth().getRolesByUserWithLdap(user).stream()
                .map(role -> ClusterNamespace.getNameFromFullName(role.getRoleName())).collect(Collectors.toSet());
        readLock();
        try {
            // double check in lock,avoid NPE
            if (!tablePolicies.containsKey(ctlName) || !tablePolicies.get(ctlName).containsKey(dbName)
                    || !tablePolicies.get(ctlName).get(dbName).containsKey(tableName)) {
                return res;
            }
            List<RowPolicy> policys = tablePolicies.get(ctlName).get(dbName).get(tableName);
            for (RowPolicy rowPolicy : policys) {
                // on rowPolicy to user
                if ((rowPolicy.getUser() != null && rowPolicy.getUser().getQualifiedUser()
                        .equals(user.getQualifiedUser()))
                        || !StringUtils.isEmpty(rowPolicy.getRoleName()) && roles.contains(rowPolicy.getRoleName())) {
                    res.add(rowPolicy);
                }
            }
            return res;
        } finally {
            readUnlock();
        }
    }

    private ShowResultSet getShowPolicy(Policy finalCheckedPolicy, PolicyTypeEnum type) throws AnalysisException {
        List<List<String>> rows = Lists.newArrayList();
        readLock();
        try {
            List<Policy> policies = getPoliciesByType(type).stream()
                    .filter(p -> p.matchPolicy(finalCheckedPolicy)).collect(Collectors.toList());
            for (Policy policy : policies) {
                if (policy.isInvalid()) {
                    continue;
                }

                if (policy instanceof StoragePolicy && ((StoragePolicy) policy).getStorageResource() == null) {
                    // default storage policy not init.
                    continue;
                }

                rows.add(policy.getShowInfo());
            }
            if (type == PolicyTypeEnum.STORAGE) {
                return new ShowResultSet(StoragePolicy.STORAGE_META_DATA, rows);
            }
            return new ShowResultSet(RowPolicy.ROW_META_DATA, rows);
        } finally {
            readUnlock();
        }
    }



    /**
     * Show Row Policy.
     **/
    public ShowResultSet showRowPolicy(UserIdentity user, String role) throws AnalysisException {
        RowPolicy rowPolicy = new RowPolicy();
        if (user != null) {
            rowPolicy.setUser(user);
        }
        if (!StringUtils.isEmpty(role)) {
            rowPolicy.setRoleName(role);
        }

        final Policy finalCheckedPolicy = rowPolicy;
        return getShowPolicy(finalCheckedPolicy, PolicyTypeEnum.ROW);
    }

    /**
     * Show Storage Policy.
     **/
    public ShowResultSet showStoragePolicy() throws AnalysisException {
        Policy finalCheckedPolicy = new StoragePolicy();
        return getShowPolicy(finalCheckedPolicy, PolicyTypeEnum.STORAGE);
    }

    /**
     * Show policy through stmt.
     **/
    public ShowResultSet showPolicy(ShowPolicyStmt showStmt) throws AnalysisException {
        Policy checkedPolicy = null;
        switch (showStmt.getType()) {
            case STORAGE:
                checkedPolicy = new StoragePolicy();
                break;
            case ROW:
            default:
                RowPolicy rowPolicy = new RowPolicy();
                if (showStmt.getUser() != null) {
                    rowPolicy.setUser(showStmt.getUser());
                }
                if (!StringUtils.isEmpty(showStmt.getRoleName())) {
                    rowPolicy.setRoleName(showStmt.getRoleName());
                }
                checkedPolicy = rowPolicy;
        }
        final Policy finalCheckedPolicy = checkedPolicy;
        return getShowPolicy(finalCheckedPolicy, showStmt.getType());
    }

    /**
     * Return objects which is using the storage policy
     **/
    public List<List<String>> showStoragePolicyUsing(String targetPolicyName) throws AnalysisException {
        List<List<String>> rows = Lists.newArrayList();

        readLock();
        try {
            List<Database> databases = Env.getCurrentEnv().getInternalCatalog().getDbs();
            // show for all storage policies
            if (Strings.isNullOrEmpty(targetPolicyName)) {
                for (Database db : databases) {
                    List<Table> tables = db.getTables();
                    for (Table table : tables) {
                        if (!(table instanceof OlapTable)) {
                            continue;
                        }

                        Map<String, List<String>> policyToPartitionsMap = new HashMap<>();
                        OlapTable olapTable = (OlapTable) table;
                        PartitionInfo partitionInfo = olapTable.getPartitionInfo();
                        // classify a table's all partitions by storage policy
                        for (Long partitionId : olapTable.getPartitionIds()) {
                            String policyName = partitionInfo.getDataProperty(partitionId).getStoragePolicy();
                            if (StringUtils.isEmpty(policyName)) {
                                continue;
                            }
                            if (policyToPartitionsMap.containsKey(policyName)) {
                                policyToPartitionsMap.get(policyName)
                                        .add(olapTable.getPartition(partitionId).getName());
                            } else {
                                List<String> partitionList = new ArrayList<>();
                                partitionList.add(olapTable.getPartition(partitionId).getName());
                                policyToPartitionsMap.put(policyName, partitionList);
                            }
                        }

                        //output, all partitions with same storage policy in a table will be shown in one line
                        if (policyToPartitionsMap.size() == 1) {
                            String[] policyArray = policyToPartitionsMap.keySet().toArray(new String[0]);
                            List<String> partitionsList = new ArrayList<>(policyToPartitionsMap.values()).get(0);
                            if (partitionsList.size() == olapTable.getPartitionNum()) {
                                List<String> row = Arrays.asList(policyArray[0], db.getName(), olapTable.getName(),
                                        "ALL");
                                rows.add(row);
                            } else {
                                List<String> row = Arrays.asList(policyArray[0], db.getName(), olapTable.getName(),
                                        String.join(",", partitionsList));
                                rows.add(row);
                            }
                        } else {
                            for (Map.Entry<String, List<String>> entry : policyToPartitionsMap.entrySet()) {
                                List<String> row = Arrays.asList(entry.getKey(), db.getName(), olapTable.getName(),
                                        String.join(",", entry.getValue()));
                                rows.add(row);
                            }
                        }
                    }
                }
            } else {
                // show for specific storage policy
                for (Database db : databases) {
                    List<Table> tables = db.getTables();
                    for (Table table : tables) {
                        if (!(table instanceof OlapTable)) {
                            continue;
                        }

                        OlapTable olapTable = (OlapTable) table;
                        int partitionMatchNum = 0;
                        StringBuilder matchPartitionsSB = new StringBuilder();
                        PartitionInfo partitionInfo = olapTable.getPartitionInfo();
                        for (Long partitionId : olapTable.getPartitionIds()) {
                            String policyName = partitionInfo.getDataProperty(partitionId).getStoragePolicy();
                            if (policyName.equals(targetPolicyName)) {
                                partitionMatchNum++;
                                matchPartitionsSB.append(olapTable.getPartition(partitionId).getName()).append(",");
                            }
                        }

                        if (partitionMatchNum == 0) {
                            continue;
                        }

                        String matchPartitionsStr = "ALL";
                        if (partitionMatchNum < olapTable.getPartitionNum()) {
                            matchPartitionsStr = matchPartitionsSB.toString();
                            matchPartitionsStr = matchPartitionsStr.substring(0, matchPartitionsStr.length() - 1);
                        }

                        List<String> row = Arrays.asList(targetPolicyName, db.getName(), olapTable.getName(),
                                matchPartitionsStr);
                        rows.add(row);
                    }
                }
            }
            return rows;
        } finally {
            readUnlock();
        }
    }

    /**
     * Show objects which is using the storage policy
     **/
    public ShowResultSet showStoragePolicyUsing(ShowStoragePolicyUsingStmt showStmt) throws AnalysisException {
        String targetPolicyName = showStmt.getPolicyName();
        List<List<String>> rows = showStoragePolicyUsing(targetPolicyName);

        return new ShowResultSet(showStmt.getMetaData(), rows);
    }

    private void addTablePolicies(RowPolicy policy) {
        if (policy.getUser() != null) {
            policy.getUser().setIsAnalyzed();
        }
        List<RowPolicy> policys = getOrCreateTblPolicies(policy.getCtlName(), policy.getDbName(),
                policy.getTableName());
        policys.add(policy);
    }

    private void dropTablePolicies(RowPolicy policy) {
        List<RowPolicy> policys = getOrCreateTblPolicies(policy.getCtlName(), policy.getDbName(),
                policy.getTableName());
        policys.removeIf(p -> p.matchPolicy(policy));
    }

    private List<RowPolicy> getOrCreateTblPolicies(String ctlName, String dbName, String tableName) {
        Map<String, List<RowPolicy>> dbPolicyMap = getOrCreateDbPolicyMap(ctlName, dbName);
        if (!dbPolicyMap.containsKey(tableName)) {
            dbPolicyMap.put(tableName, Lists.newArrayList());
        }
        return dbPolicyMap.get(tableName);
    }

    private Map<String, List<RowPolicy>> getOrCreateDbPolicyMap(String ctlName, String dbName) {
        Map<String, Map<String, List<RowPolicy>>> ctlPolicyMap = getOrCreateCtlPolicyMap(ctlName);
        if (!ctlPolicyMap.containsKey(dbName)) {
            ctlPolicyMap.put(dbName, Maps.newConcurrentMap());
        }
        return ctlPolicyMap.get(dbName);
    }

    private Map<String, Map<String, List<RowPolicy>>> getOrCreateCtlPolicyMap(String ctlName) {
        if (!tablePolicies.containsKey(ctlName)) {
            tablePolicies.put(ctlName, Maps.newConcurrentMap());
        }
        return tablePolicies.get(ctlName);
    }

    private void compatible() {
        readLock();
        try {
            if (!typeToPolicyMap.containsKey(PolicyTypeEnum.ROW)) {
                return;
            }
            List<Policy> allPolicies = typeToPolicyMap.get(PolicyTypeEnum.ROW);
            List<Policy> compatiblePolicies = Lists.newArrayList();
            for (Policy policy : allPolicies) {
                RowPolicy rowPolicy = (RowPolicy) policy;
                if (StringUtils.isEmpty(rowPolicy.getCtlName())) {
                    Optional<Database> db = Env.getCurrentEnv().getInternalCatalog().getDb(rowPolicy.getDbId());
                    if (!db.isPresent()) {
                        LOG.warn("db may be dropped,ignore DropPolicyLog. dbId:" + rowPolicy.getDbId());
                        continue;
                    }
                    Optional<Table> table = db.get().getTable(rowPolicy.getTableId());
                    if (!table.isPresent()) {
                        LOG.warn("table may be dropped,ignore DropPolicyLog. tableId:" + rowPolicy.getTableId());
                        continue;
                    }
                    rowPolicy.setCtlName(InternalCatalog.INTERNAL_CATALOG_NAME);
                    rowPolicy.setDbName(db.get().getName());
                    rowPolicy.setTableName(table.get().getName());
                }
                compatiblePolicies.add(rowPolicy);
            }
            typeToPolicyMap.put(PolicyTypeEnum.ROW, compatiblePolicies);
        } finally {
            readUnlock();
        }
    }

    /**
     * The merge policy cache needs to be regenerated after the update.
     **/
    private void updateTablePolicies() {
        readLock();
        try {
            if (!typeToPolicyMap.containsKey(PolicyTypeEnum.ROW)) {
                return;
            }
            List<Policy> allPolicies = typeToPolicyMap.get(PolicyTypeEnum.ROW);
            for (Policy policy : allPolicies) {
                addTablePolicies((RowPolicy) policy);
            }

        } finally {
            readUnlock();
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        Text.writeString(out, GsonUtils.GSON.toJson(this));
    }

    /**
     * Read policyMgr from file.
     **/
    public static PolicyMgr read(DataInput in) throws IOException {
        String json = Text.readString(in);
        PolicyMgr policyMgr = GsonUtils.GSON.fromJson(json, PolicyMgr.class);
        // for compatible
        policyMgr.compatible();
        // update merge policy cache and userPolicySet
        policyMgr.updateTablePolicies();
        return policyMgr;
    }

    /**
     * Find policy by policy name and type
     **/
    public Optional<Policy> findPolicy(final String policyName, PolicyTypeEnum policyType) {
        readLock();
        try {
            List<Policy> policiesByType = getPoliciesByType(policyType);
            return policiesByType.stream().filter(policy -> policy.getPolicyName().equals(policyName)).findAny();
        } finally {
            readUnlock();
        }
    }

    /*
     * Alter policy by policyName and properties.
     **/
    public void alterPolicy(String storagePolicyName, Map<String, String> properties)
                throws DdlException, AnalysisException {
        if (findPolicy(storagePolicyName, PolicyTypeEnum.ROW).isPresent()) {
            throw new DdlException("Current not support alter row policy");
        }

        Optional<Policy> policy = findPolicy(storagePolicyName, PolicyTypeEnum.STORAGE);
        StoragePolicy storagePolicy = (StoragePolicy) policy.orElseThrow(
                () -> new DdlException("Storage policy(" + storagePolicyName + ") dose not exist."));
        storagePolicy.modifyProperties(properties);

        // log alter
        Env.getCurrentEnv().getEditLog().logAlterStoragePolicy(storagePolicy);
        AgentBatchTask batchTask = new AgentBatchTask();
        for (long backendId : Env.getCurrentSystemInfo().getAllBackendsByAllCluster().keySet()) {
            PushStoragePolicyTask pushStoragePolicyTask = new PushStoragePolicyTask(backendId,
                    Collections.singletonList(storagePolicy), Collections.emptyList(), Collections.emptyList());
            batchTask.addTask(pushStoragePolicyTask);
        }
        AgentTaskExecutor.submit(batchTask);
        LOG.info("Alter storage policy success. policy: {}", storagePolicy);
    }

    /*
     * Alter policy by stmt.
     **/
    public void alterPolicy(AlterPolicyStmt stmt) throws DdlException, AnalysisException {
        alterPolicy(stmt.getPolicyName(), stmt.getProperties());
    }

    /**
     * Check storage policy whether exist by policy name.
     **/
    public void checkStoragePolicyExist(String storagePolicyName) throws DdlException {
        if (Strings.isNullOrEmpty(storagePolicyName)) {
            return;
        }
        readLock();
        try {
            List<Policy> policiesByType = Env.getCurrentEnv().getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
            policiesByType.stream().filter(policy -> policy.getPolicyName().equals(storagePolicyName)).findAny()
                    .orElseThrow(() -> new DdlException("Storage policy does not exist. name: " + storagePolicyName));
            Optional<Policy> hasDefaultPolicy = policiesByType.stream()
                    .filter(policy -> policy.getPolicyName().equals(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME))
                    .findAny();
            StoragePolicy.checkDefaultStoragePolicyValid(storagePolicyName, hasDefaultPolicy);
        } finally {
            readUnlock();
        }
    }

    public boolean checkStoragePolicyIfSameResource(String policyName, String anotherPolicyName) {
        Optional<Policy> policy = findPolicy(policyName, PolicyTypeEnum.STORAGE);
        Optional<Policy> policy1 = findPolicy(anotherPolicyName, PolicyTypeEnum.STORAGE);
        if (policy1.isPresent() && policy.isPresent()) {
            StoragePolicy storagePolicy = (StoragePolicy) policy.get();
            StoragePolicy storagePolicy1 = (StoragePolicy) policy1.get();
            return storagePolicy1.getStorageResource().equals(storagePolicy.getStorageResource());
        }
        return false;
    }
}