StoragePolicy.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.policy;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.qe.ShowResultSetMetaData;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * Save policy for storage migration.
 **/
@Data
public class StoragePolicy extends Policy {
    public static final String DEFAULT_STORAGE_POLICY_NAME = "default_storage_policy";

    public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName, Optional<Policy> defaultPolicy)
            throws DdlException {
        if (!defaultPolicy.isPresent()) {
            return false;
        }

        if (storagePolicyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && (
                ((StoragePolicy) defaultPolicy.get()).getStorageResource() == null)) {
            throw new DdlException("Use default storage policy, but not give s3 info,"
                    + " please use alter resource to add default storage policy S3 info.");
        }
        return true;
    }

    public static final ShowResultSetMetaData STORAGE_META_DATA =
            ShowResultSetMetaData.builder()
                .addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
                .addColumn(new Column("Id", ScalarType.createVarchar(20)))
                .addColumn(new Column("Version", ScalarType.createVarchar(20)))
                .addColumn(new Column("Type", ScalarType.createVarchar(20)))
                .addColumn(new Column("StorageResource", ScalarType.createVarchar(20)))
                .addColumn(new Column("CooldownDatetime", ScalarType.createVarchar(20)))
                .addColumn(new Column("CooldownTtl", ScalarType.createVarchar(20)))
                .build();

    private static final Logger LOG = LogManager.getLogger(StoragePolicy.class);
    // required
    public static final String STORAGE_RESOURCE = "storage_resource";
    // optional
    public static final String COOLDOWN_DATETIME = "cooldown_datetime";
    public static final String COOLDOWN_TTL = "cooldown_ttl";

    // for ttl format
    private static final String TTL_WEEK = "week";
    private static final String TTL_DAY = "day";
    private static final String TTL_DAY_SIMPLE = "d";
    private static final String TTL_HOUR = "hour";
    private static final String TTL_HOUR_SIMPLE = "h";
    private static final long ONE_HOUR_S = 3600;
    private static final long ONE_DAY_S = 24 * ONE_HOUR_S;
    private static final long ONE_WEEK_S = 7 * ONE_DAY_S;

    @SerializedName(value = "storageResource")
    private String storageResource = null;

    @SerializedName(value = "cooldownTimestampMs")
    private long cooldownTimestampMs = -1;

    // time unit: seconds
    @SerializedName(value = "cooldownTtl")
    private long cooldownTtl = -1;

    // for Gson fromJson
    public StoragePolicy() {
        super(PolicyTypeEnum.STORAGE);
    }

    /**
     * Policy for Storage Migration.
     *
     * @param policyId policy id
     * @param policyName policy name
     * @param storageResource resource name for storage
     * @param cooldownTimestampMs cool down time
     * @param cooldownTtl seconds for cooldownTtl
     */
    public StoragePolicy(long policyId, final String policyName, final String storageResource,
            final long cooldownTimestampMs, long cooldownTtl) {
        super(policyId, PolicyTypeEnum.STORAGE, policyName);
        this.storageResource = storageResource;
        this.cooldownTimestampMs = cooldownTimestampMs;
        this.cooldownTtl = cooldownTtl;
    }

    /**
     * Policy for Storage Migration.
     *
     * @param policyId policy id
     * @param policyName policy name
     */
    public StoragePolicy(long policyId, final String policyName) {
        super(policyId, PolicyTypeEnum.STORAGE, policyName);
    }

    public static StoragePolicy ofCheck(String policyName) {
        StoragePolicy storagePolicy = new StoragePolicy();
        storagePolicy.policyName = policyName;
        return storagePolicy;
    }

    /**
     * Init props for storage policy.
     *
     * @param props properties for storage policy
     */
    public void init(final Map<String, String> props, boolean ifNotExists) throws AnalysisException {
        if (props == null) {
            throw new AnalysisException("properties config is required");
        }
        checkRequiredProperty(props, STORAGE_RESOURCE);
        this.storageResource = props.get(STORAGE_RESOURCE);
        boolean hasCooldownDatetime = props.containsKey(COOLDOWN_DATETIME);
        boolean hasCooldownTtl = props.containsKey(COOLDOWN_TTL);

        if (hasCooldownDatetime && hasCooldownTtl) {
            throw new AnalysisException(COOLDOWN_DATETIME + " and " + COOLDOWN_TTL + " can't be set together.");
        }
        if (!hasCooldownDatetime && !hasCooldownTtl) {
            throw new AnalysisException(COOLDOWN_DATETIME + " or " + COOLDOWN_TTL + " must be set");
        }
        if (hasCooldownDatetime) {
            try {
                this.cooldownTimestampMs = LocalDateTime
                        .parse(props.get(COOLDOWN_DATETIME), TimeUtils.getDatetimeFormatWithTimeZone())
                        .atZone(TimeUtils.getDorisZoneId()).toInstant().toEpochMilli();
            } catch (DateTimeParseException e) {
                throw new AnalysisException(String.format("cooldown_datetime format error: %s",
                        props.get(COOLDOWN_DATETIME)), e);
            }
            // ttl would be set as -1 when using datetime
            this.cooldownTtl = -1;
        }
        if (hasCooldownTtl) {
            // second
            // this.cooldownTtlMs = (getMsByCooldownTtl(props.get(COOLDOWN_TTL)) / 1000);
            this.cooldownTtl = getSecondsByCooldownTtl(props.get(COOLDOWN_TTL));
        }

        checkResourceIsExist(this.storageResource);
        if (!addResourceReference() && !ifNotExists) {
            throw new AnalysisException("this policy has been added to s3 or hdfs resource, policy has been created.");
        }
    }

    private static Resource checkResourceIsExist(final String storageResource) throws AnalysisException {
        Resource resource =
                Optional.ofNullable(Env.getCurrentEnv().getResourceMgr().getResource(storageResource))
                    .orElseThrow(() -> new AnalysisException("storage resource doesn't exist: " + storageResource));

        Map<String, String> properties = resource.getCopiedProperties();
        switch (resource.getType()) {
            case S3:
                if (!properties.containsKey(S3Properties.ROOT_PATH)) {
                    throw new AnalysisException(String.format(
                        "Missing [%s] in '%s' resource", S3Properties.ROOT_PATH, storageResource));
                }
                if (!properties.containsKey(S3Properties.BUCKET)) {
                    throw new AnalysisException(String.format(
                        "Missing [%s] in '%s' resource", S3Properties.BUCKET, storageResource));
                }
                break;
            case HDFS:
                if (!properties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
                    throw new AnalysisException(String.format(
                        "Missing [%s] in '%s' resource", HdfsResource.HADOOP_FS_NAME, storageResource));
                }
                break;
            default:
                throw new AnalysisException(
                    "current storage policy just support resource type S3_COOLDOWN or HDFS_COOLDOWN");
        }
        return resource;
    }

    /**
     * Use for SHOW POLICY.
     **/
    public List<String> getShowInfo() throws AnalysisException {
        readLock();
        try {
            if (cooldownTimestampMs == -1) {
                return Lists.newArrayList(this.policyName, String.valueOf(this.id), String.valueOf(this.version),
                        this.type.name(), this.storageResource, "-1", String.valueOf(this.cooldownTtl));
            }
            return Lists.newArrayList(this.policyName, String.valueOf(this.id), String.valueOf(this.version),
                    this.type.name(), this.storageResource, TimeUtils.longToTimeString(this.cooldownTimestampMs),
                    String.valueOf(this.cooldownTtl));
        } finally {
            readUnlock();
        }
    }

    @Override
    public void gsonPostProcess() throws IOException {}

    @Override
    public StoragePolicy clone() {
        return new StoragePolicy(this.id, this.policyName, this.storageResource, this.cooldownTimestampMs,
                this.cooldownTtl);
    }

    @Override
    public boolean matchPolicy(Policy checkedPolicyCondition) {
        if (!(checkedPolicyCondition instanceof StoragePolicy)) {
            return false;
        }
        StoragePolicy storagePolicy = (StoragePolicy) checkedPolicyCondition;
        return (storagePolicy.getStorageResource() == null
                        || storagePolicy.getStorageResource().equals(this.storageResource))
                && checkMatched(storagePolicy.getType(), storagePolicy.getPolicyName());
    }

    @Override
    public boolean matchPolicy(DropPolicyLog checkedDropCondition) {
        return checkMatched(checkedDropCondition.getType(), checkedDropCondition.getPolicyName());
    }

    /**
     * check required key in properties.
     *
     * @param props properties for storage policy
     * @param propertyKey key for property
     * @throws AnalysisException exception for properties error
     */
    private void checkRequiredProperty(final Map<String, String> props, String propertyKey) throws AnalysisException {
        String value = props.get(propertyKey);

        if (Strings.isNullOrEmpty(value)) {
            throw new AnalysisException("Missing [" + propertyKey + "] in properties.");
        }
    }

    @Override
    public boolean isInvalid() {
        return false;
    }

    /**
     * Get milliseconds by cooldownTtl, 1week=604800000 1day=1d=86400000, 1hour=1h=3600000
     * @param cooldownTtl cooldown ttl
     * @return millisecond for cooldownTtl
     */
    public static long getSecondsByCooldownTtl(String cooldownTtl) throws AnalysisException {
        cooldownTtl = cooldownTtl.replace(TTL_DAY, TTL_DAY_SIMPLE).replace(TTL_HOUR, TTL_HOUR_SIMPLE);
        long cooldownTtlSeconds = 0;
        try {
            if (cooldownTtl.endsWith(TTL_DAY_SIMPLE)) {
                cooldownTtlSeconds = Long.parseLong(cooldownTtl.replace(TTL_DAY_SIMPLE, "").trim()) * ONE_DAY_S;
            } else if (cooldownTtl.endsWith(TTL_HOUR_SIMPLE)) {
                cooldownTtlSeconds = Long.parseLong(cooldownTtl.replace(TTL_HOUR_SIMPLE, "").trim()) * ONE_HOUR_S;
            } else if (cooldownTtl.endsWith(TTL_WEEK)) {
                cooldownTtlSeconds = Long.parseLong(cooldownTtl.replace(TTL_WEEK, "").trim()) * ONE_WEEK_S;
            } else {
                cooldownTtlSeconds = Long.parseLong(cooldownTtl.trim());
            }
        } catch (NumberFormatException e) {
            LOG.error("getSecByCooldownTtl failed.", e);
            throw new AnalysisException("getSecByCooldownTtl failed.", e);
        }
        if (cooldownTtlSeconds < 0) {
            LOG.error("cooldownTtl can't be less than 0");
            throw new AnalysisException("cooldownTtl can't be less than 0");
        }
        return cooldownTtlSeconds;
    }

    public void checkProperties(Map<String, String> properties) throws AnalysisException {
        // check properties
        Map<String, String> copiedProperties = Maps.newHashMap(properties);

        copiedProperties.remove(STORAGE_RESOURCE);
        copiedProperties.remove(COOLDOWN_DATETIME);
        copiedProperties.remove(COOLDOWN_TTL);

        if (!copiedProperties.isEmpty()) {
            throw new AnalysisException("Unknown Storage policy properties: " + copiedProperties);
        }
    }

    public void modifyProperties(Map<String, String> properties) throws DdlException, AnalysisException {
        this.toString();
        // check cooldown date time and ttl.
        long cooldownTtlMs = this.cooldownTtl;
        long cooldownTimestampMs = this.cooldownTimestampMs;
        if (properties.containsKey(COOLDOWN_TTL)) {
            if (properties.get(COOLDOWN_TTL).isEmpty()) {
                cooldownTtlMs = -1;
            } else {
                cooldownTtlMs = getSecondsByCooldownTtl(properties.get(COOLDOWN_TTL));
            }
        }
        if (properties.containsKey(COOLDOWN_DATETIME)) {
            if (properties.get(COOLDOWN_DATETIME).isEmpty()) {
                cooldownTimestampMs = -1;
            } else {
                try {
                    cooldownTimestampMs = LocalDateTime.parse(properties.get(COOLDOWN_DATETIME),
                            TimeUtils.getDatetimeFormatWithTimeZone()).atZone(TimeUtils.getDorisZoneId()).toInstant()
                            .toEpochMilli();
                } catch (DateTimeParseException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        if (cooldownTtlMs >= 0 && cooldownTimestampMs >= 0) {
            throw new AnalysisException(COOLDOWN_DATETIME + " and " + COOLDOWN_TTL + " can't be set together.");
        }
        if (cooldownTtlMs < 0 && cooldownTimestampMs < 0) {
            throw new AnalysisException(COOLDOWN_DATETIME + " or " + COOLDOWN_TTL + " must be set");
        }

        String storageResource = properties.get(STORAGE_RESOURCE);
        if (storageResource != null) {
            checkResourceIsExist(storageResource);
        }
        if (this.policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && this.storageResource == null
                && storageResource == null) {
            throw new DdlException("first time set default storage policy, but not give storageResource");
        }
        // modify properties
        writeLock();
        this.cooldownTtl = cooldownTtlMs;
        this.cooldownTimestampMs = cooldownTimestampMs;
        if (storageResource != null) {
            this.storageResource = storageResource;
        }
        ++version;
        writeUnlock();
    }

    public boolean addResourceReference() {
        if (storageResource != null) {
            Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(storageResource);
            if (resource != null) {
                return resource.addReference(policyName, ReferenceType.POLICY);
            }
        }
        return false;
    }

    public boolean removeResourceReference() {
        if (storageResource != null) {
            Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(storageResource);
            if (resource != null) {
                return resource.removeReference(policyName, ReferenceType.POLICY);
            }
        }
        return false;
    }
}