StorageVaultMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.analysis.CreateStorageVaultStmt;
import org.apache.doris.analysis.SetDefaultStorageVaultStmt;
import org.apache.doris.catalog.StorageVault.StorageVaultType;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.AlterObjStoreInfoRequest.Operation;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.nereids.trees.plans.commands.CreateStorageVaultCommand;
import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StorageVaultMgr {
    private static final Logger LOG = LogManager.getLogger(StorageVaultMgr.class);
    private static final ExecutorService ALTER_BE_SYNC_THREAD_POOL = Executors.newFixedThreadPool(1);
    private final SystemInfoService systemInfoService;
    // <VaultName, VaultId>
    private Pair<String, String> defaultVaultInfo;
    private Map<String, String> vaultNameToVaultId = new HashMap<>();
    private MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock();

    public StorageVaultMgr(SystemInfoService systemInfoService) {
        this.systemInfoService = systemInfoService;
    }

    public void createStorageVaultResource(CreateStorageVaultStmt stmt) throws Exception {
        switch (stmt.getStorageVaultType()) {
            case HDFS:
                createHdfsVault(StorageVault.fromStmt(stmt));
                break;
            case S3:
                createS3Vault(StorageVault.fromStmt(stmt));
                break;
            case UNKNOWN:
            default:
                throw new DdlException("Only support S3, HDFS storage vault.");
        }
        // Make BE eagerly fetch the storage vault info from Meta Service
        ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
    }

    public void createStorageVaultResource(CreateStorageVaultCommand command) throws Exception {
        switch (command.getVaultType()) {
            case HDFS:
                createHdfsVault(StorageVault.fromCommand(command));
                break;
            case S3:
                createS3Vault(StorageVault.fromCommand(command));
                break;
            case UNKNOWN:
            default:
                throw new DdlException("Only support S3, HDFS storage vault.");
        }
        // Make BE eagerly fetch the storage vault info from Meta Service
        ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
    }

    public void refreshVaultMap(Map<String, String> vaultMap, Pair<String, String> defaultVault) {
        try {
            rwLock.writeLock().lock();
            vaultNameToVaultId = vaultMap;
            defaultVaultInfo = defaultVault;
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    public String getVaultIdByName(String vaultName) {
        try {
            rwLock.readLock().lock();
            return vaultNameToVaultId.getOrDefault(vaultName, "");
        } finally {
            rwLock.readLock().unlock();
        }
    }

    public String getVaultNameById(String vaultId) {
        try {
            rwLock.readLock().lock();
            for (Map.Entry<String, String> entry : vaultNameToVaultId.entrySet()) {
                if (entry.getValue().equals(vaultId)) {
                    return entry.getKey();
                }
            }
            return "";
        } finally {
            rwLock.readLock().unlock();
        }
    }

    private void addStorageVaultToCache(String vaultName, String vaultId, boolean defaultVault) {
        try {
            rwLock.writeLock().lock();
            vaultNameToVaultId.put(vaultName, vaultId);
            if (defaultVault) {
                defaultVaultInfo = Pair.of(vaultName, vaultId);
            }
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    private void updateStorageVaultCache(String oldVaultName, String newVaultName, String vaultId) {
        try {
            rwLock.writeLock().lock();
            String cachedVaultId = vaultNameToVaultId.get(oldVaultName);
            vaultNameToVaultId.remove(oldVaultName);
            Preconditions.checkArgument(!Strings.isNullOrEmpty(cachedVaultId),
                    "Cached vault id %s is null or empty", cachedVaultId);
            Preconditions.checkArgument(cachedVaultId.equals(vaultId),
                    "Cached vault id not equal to remote storage. %s vs %s", cachedVaultId, vaultId);
            vaultNameToVaultId.put(newVaultName, vaultId);
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    private void updateDefaultStorageVaultCache(Pair<String, String> newDefaultVaultInfo) {
        try {
            rwLock.writeLock().lock();
            defaultVaultInfo = newDefaultVaultInfo;
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    private Cloud.StorageVaultPB.Builder buildAlterS3VaultRequest(Map<String, String> properties, String name)
            throws Exception {
        Cloud.ObjectStoreInfoPB.Builder objBuilder = S3Properties.getObjStoreInfoPB(properties);
        Cloud.StorageVaultPB.Builder alterObjVaultBuilder = Cloud.StorageVaultPB.newBuilder();
        alterObjVaultBuilder.setName(name);
        alterObjVaultBuilder.setObjInfo(objBuilder.build());
        if (properties.containsKey(StorageVault.PropertyKey.VAULT_NAME)) {
            alterObjVaultBuilder.setAlterName(properties.get(StorageVault.PropertyKey.VAULT_NAME));
        }
        return alterObjVaultBuilder;
    }

    private Cloud.StorageVaultPB.Builder buildAlterHdfsVaultRequest(Map<String, String> properties, String name)
            throws Exception {
        Cloud.HdfsVaultInfo hdfsInfos = HdfsStorageVault.generateHdfsParam(properties);
        Cloud.StorageVaultPB.Builder alterHdfsInfoBuilder = Cloud.StorageVaultPB.newBuilder();
        alterHdfsInfoBuilder.setName(name);
        alterHdfsInfoBuilder.setHdfsInfo(hdfsInfos);
        if (properties.containsKey(StorageVault.PropertyKey.VAULT_NAME)) {
            alterHdfsInfoBuilder.setAlterName(properties.get(StorageVault.PropertyKey.VAULT_NAME));
        }
        return alterHdfsInfoBuilder;
    }

    private Cloud.StorageVaultPB.Builder buildAlterStorageVaultRequest(StorageVaultType type,
            Map<String, String> properties, String name) throws Exception {
        Cloud.StorageVaultPB.Builder builder;
        if (type == StorageVaultType.S3) {
            builder = buildAlterS3VaultRequest(properties, name);
        } else if (type == StorageVaultType.HDFS) {
            builder = buildAlterHdfsVaultRequest(properties, name);
        } else {
            throw new DdlException("Unknown storage vault type");
        }
        return builder;
    }

    private Cloud.StorageVaultPB.Builder buildAlterStorageVaultRequest(StorageVault vault) throws Exception {
        Cloud.StorageVaultPB.Builder builder = buildAlterStorageVaultRequest(vault.getType(),
                vault.getCopiedProperties(), vault.getName());
        Cloud.StorageVaultPB.PathFormat.Builder pathBuilder = Cloud.StorageVaultPB.PathFormat.newBuilder();
        pathBuilder.setShardNum(vault.getNumShard());
        pathBuilder.setPathVersion(vault.getPathVersion());
        builder.setPathFormat(pathBuilder);
        return builder;
    }

    public void alterStorageVault(StorageVaultType type, Map<String, String> properties, String name) throws Exception {
        if (type == StorageVaultType.UNKNOWN) {
            throw new DdlException("Unknown storage vault type");
        }
        try {
            Cloud.AlterObjStoreInfoRequest.Builder request = Cloud.AlterObjStoreInfoRequest.newBuilder();
            if (type == StorageVaultType.S3) {
                properties.keySet().stream()
                        .filter(key -> !S3StorageVault.ALLOW_ALTER_PROPERTIES.contains(key))
                        .findAny()
                        .ifPresent(key -> {
                            throw new IllegalArgumentException("Alter property " + key + " is not allowed.");
                        });
                request.setOp(Operation.ALTER_S3_VAULT);
            } else if (type == StorageVaultType.HDFS) {
                properties.keySet().stream()
                        .filter(key -> HdfsStorageVault.FORBID_ALTER_PROPERTIES.contains(key)
                                || key.toLowerCase().contains(S3Properties.S3_PREFIX)
                                || key.toLowerCase().contains(S3Properties.PROVIDER))
                        .findAny()
                        .ifPresent(key -> {
                            throw new IllegalArgumentException("Alter property " + key + " is not allowed.");
                        });
                request.setOp(Operation.ALTER_HDFS_VAULT);
            }
            Cloud.StorageVaultPB.Builder vaultBuilder = buildAlterStorageVaultRequest(type, properties, name);
            request.setVault(vaultBuilder);
            Cloud.AlterObjStoreInfoResponse response =
                    MetaServiceProxy.getInstance().alterStorageVault(request.build());
            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
                LOG.warn("failed to alter storage vault response: {} ", response);
                throw new DdlException(response.getStatus().getMsg());
            }

            if (request.hasVault() && request.getVault().hasAlterName()) {
                updateStorageVaultCache(name, request.getVault().getAlterName(), response.getStorageVaultId());
                LOG.info("Succeed to alter storage vault, old name:{} new name: {} id:{}", name,
                        request.getVault().getAlterName(), response.getStorageVaultId());
            }

            // Make BE eagerly fetch the storage vault info from Meta Service
            ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
        } catch (RpcException e) {
            LOG.warn("failed to alter storage vault due to RpcException: {}", e);
            throw new DdlException(e.getMessage());
        }
    }

    @VisibleForTesting
    public void setDefaultStorageVault(SetDefaultStorageVaultStmt stmt) throws DdlException {
        setDefaultStorageVault(stmt.getStorageVaultName());
    }

    public void setDefaultStorageVault(String vaultName) throws DdlException {
        Cloud.AlterObjStoreInfoRequest.Builder builder = Cloud.AlterObjStoreInfoRequest.newBuilder();
        Cloud.StorageVaultPB.Builder vaultBuilder = Cloud.StorageVaultPB.newBuilder();
        vaultBuilder.setName(vaultName);
        builder.setVault(vaultBuilder.build());
        builder.setOp(Operation.SET_DEFAULT_VAULT);
        String vaultId;
        LOG.info("try to set vault {} as default vault", vaultName);
        try {
            Cloud.AlterObjStoreInfoResponse resp =
                    MetaServiceProxy.getInstance().alterStorageVault(builder.build());
            if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
                LOG.warn("failed to set default storage vault response: {}, vault name {}",
                        resp, vaultName);
                throw new DdlException(resp.getStatus().getMsg());
            }
            vaultId = resp.getStorageVaultId();
        } catch (RpcException e) {
            LOG.warn("failed to set default storage vault due to RpcException: {}, vault name {}",
                    e, vaultName);
            throw new DdlException(e.getMessage());
        }
        LOG.info("succeed to set {} as default vault, vault id {}", vaultName, vaultId);
        updateDefaultStorageVaultCache(Pair.of(vaultName, vaultId));
    }

    public void unsetDefaultStorageVault() throws DdlException {
        Cloud.AlterObjStoreInfoRequest.Builder builder = Cloud.AlterObjStoreInfoRequest.newBuilder();
        builder.setOp(Operation.UNSET_DEFAULT_VAULT);
        try {
            Cloud.AlterObjStoreInfoResponse resp =
                    MetaServiceProxy.getInstance().alterStorageVault(builder.build());
            if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
                LOG.warn("failed to unset default storage vault");
                throw new DdlException(resp.getStatus().getMsg());
            }
        } catch (RpcException e) {
            LOG.warn("failed to unset default storage vault");
            throw new DdlException(e.getMessage());
        }
        updateDefaultStorageVaultCache(null);
    }

    public Pair<String, String> getDefaultStorageVault() {
        try {
            rwLock.readLock().lock();
            return defaultVaultInfo;
        } finally {
            rwLock.readLock().unlock();
        }
    }

    @VisibleForTesting
    public void createHdfsVault(StorageVault vault) throws Exception {
        Cloud.StorageVaultPB.Builder alterHdfsInfoBuilder = buildAlterStorageVaultRequest(vault);
        Cloud.AlterObjStoreInfoRequest.Builder requestBuilder
                = Cloud.AlterObjStoreInfoRequest.newBuilder();
        requestBuilder.setOp(Cloud.AlterObjStoreInfoRequest.Operation.ADD_HDFS_INFO);
        requestBuilder.setVault(alterHdfsInfoBuilder.build());
        requestBuilder.setSetAsDefaultStorageVault(vault.setAsDefault());
        try {
            Cloud.AlterObjStoreInfoResponse response =
                    MetaServiceProxy.getInstance().alterStorageVault(requestBuilder.build());
            if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED
                    && vault.ifNotExists()) {
                LOG.info("Hdfs vault {} already existed", vault.getName());
                return;
            }
            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
                LOG.warn("failed to create hdfs storage vault, vault name {}, response: {} ",
                        vault.getName(), response);
                throw new DdlException(response.getStatus().getMsg());
            }

            LOG.info("Succeed to create hdfs vault {}, id {}, origin default vault replaced {}",
                    vault.getName(), response.getStorageVaultId(),
                    response.getDefaultStorageVaultReplaced());
            addStorageVaultToCache(vault.getName(), response.getStorageVaultId(), vault.setAsDefault());
        } catch (RpcException e) {
            LOG.warn("failed to alter storage vault due to RpcException: {}", e);
            throw new DdlException(e.getMessage());
        }
    }

    private void alterSyncVaultTask() {
        List<Backend> bes;
        try {
            // get system all backends
            bes = systemInfoService.getAllBackendsByAllCluster().values().asList();
        } catch (UserException e) {
            LOG.warn("failed to get current cluster backends: {}", e);
            return;
        }
        bes.forEach(backend -> {
            TNetworkAddress address = backend.getBrpcAddress();
            try {
                BackendServiceProxy.getInstance().alterVaultSync(address, PAlterVaultSyncRequest.newBuilder().build());
            } catch (RpcException e) {
                LOG.warn("failed to alter sync vault");
            }
        });
    }

    public void createS3Vault(StorageVault vault) throws Exception {
        Cloud.StorageVaultPB.Builder s3StorageVaultBuilder = buildAlterStorageVaultRequest(vault);
        Cloud.AlterObjStoreInfoRequest.Builder requestBuilder
                = Cloud.AlterObjStoreInfoRequest.newBuilder();
        requestBuilder.setOp(Cloud.AlterObjStoreInfoRequest.Operation.ADD_S3_VAULT);
        requestBuilder.setVault(s3StorageVaultBuilder);
        requestBuilder.setSetAsDefaultStorageVault(vault.setAsDefault());
        try {
            Cloud.AlterObjStoreInfoResponse response =
                    MetaServiceProxy.getInstance().alterStorageVault(requestBuilder.build());
            if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED
                    && vault.ifNotExists()) {
                LOG.info("S3 vault {} already existed", vault.getName());
                return;
            }
            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
                LOG.warn("failed to alter storage vault response: {} ", response);
                throw new DdlException(response.getStatus().getMsg());
            }

            LOG.info("Succeed to create s3 vault {}, id {}, origin default vault replaced {}",
                    vault.getName(), response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());
            addStorageVaultToCache(vault.getName(), response.getStorageVaultId(), vault.setAsDefault());
        } catch (RpcException e) {
            LOG.warn("failed to alter storage vault due to RpcException: {}", e);
            throw new DdlException(e.getMessage());
        }
    }
}