Repository.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.backup;

import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DatasourcePrintableMap;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.storage.BrokerProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.filesystem.spi.DorisInputFile;
import org.apache.doris.filesystem.spi.DorisOutputFile;
import org.apache.doris.filesystem.spi.FileEntry;
import org.apache.doris.filesystem.spi.FileIterator;
import org.apache.doris.filesystem.spi.Location;
import org.apache.doris.foundation.fs.FsStorageType;
import org.apache.doris.fs.FileSystemDescriptor;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/*
 * Repository represents a remote storage for backup to or restore from
 * File organization in repository is:
 *
 * * __palo_repository_repo_name/
 *   * __repo_info
 *   * __ss_my_ss1/
 *     * __meta__DJdwnfiu92n
 *     * __info_2018-01-01-08-00-00.OWdn90ndwpu
 *     * __info_2018-01-02-08-00-00.Dnvdio298da
 *     * __info_2018-01-03-08-00-00.O79adbneJWk
 *     * __ss_content/
 *       * __db_10001/
 *         * __tbl_10010/
 *         * __tbl_10020/
 *           * __part_10021/
 *           * __part_10031/
 *             * __idx_10041/
 *             * __idx_10020/
 *               * __10022/
 *               * __10023/
 *                 * __10023_seg1.dat.NUlniklnwDN67
 *                 * __10023_seg2.dat.DNW231dnklawd
 *                 * __10023.hdr.dnmwDDWI92dDko
 */
public class Repository implements Writable, GsonPostProcessable {
    public static final String PREFIX_REPO = "__palo_repository_";
    public static final String PREFIX_SNAPSHOT_DIR = "__ss_";
    public static final String PREFIX_DB = "__db_";
    public static final String PREFIX_TBL = "__tbl_";
    public static final String PREFIX_PART = "__part_";
    public static final String PREFIX_IDX = "__idx_";
    public static final String PREFIX_COMMON = "__";
    public static final String PREFIX_JOB_INFO = "__info_";
    public static final String SUFFIX_TMP_FILE = "part";
    public static final String FILE_REPO_INFO = "__repo_info";
    public static final String FILE_META_INFO = "__meta";
    public static final String DIR_SNAPSHOT_CONTENT = "__ss_content";
    public static final String KEEP_ON_LOCAL_REPO_NAME = "__keep_on_local__";
    public static final long KEEP_ON_LOCAL_REPO_ID = -1;
    private static final Logger LOG = LogManager.getLogger(Repository.class);
    private static final String PATH_DELIMITER = "/";
    private static final String CHECKSUM_SEPARATOR = ".";

    @SerializedName("id")
    private long id;
    @SerializedName("n")
    private String name;
    private String errMsg;
    @SerializedName("ct")
    private long createTime;

    // If True, user can not backup data to this repo.
    @SerializedName("iro")
    private boolean isReadOnly;

    // BOS location should start with "bos://your_bucket_name/"
    // and the specified bucket should exist.
    @SerializedName("lo")
    private String location;

    /** Legacy field: kept for backward-compatible deserialization of old metadata. */
    @Deprecated
    @SerializedName("fs")
    private PersistentFileSystem legacyFileSystem;

    /** New field: lightweight descriptor used for new metadata serialization. */
    @SerializedName("fs_descriptor")
    private FileSystemDescriptor fileSystemDescriptor;

    /** SPI filesystem for I/O operations; transient — rebuilt in {@link #gsonPostProcess()}.
     *  Null for BROKER repositories, which resolve a live broker endpoint per I/O call. */
    private transient org.apache.doris.filesystem.spi.FileSystem spiFs;

    public FileSystemDescriptor getFileSystemDescriptor() {
        return fileSystemDescriptor;
    }

    private Repository() {
        // for persist
    }

    public Repository(long id, String name, boolean isReadOnly, String location,
            StorageProperties storageProperties) {
        this.id = id;
        this.name = name;
        this.isReadOnly = isReadOnly;
        this.location = location;
        String fsName = (storageProperties instanceof BrokerProperties)
                ? ((BrokerProperties) storageProperties).getBrokerName() : "";
        this.fileSystemDescriptor = FileSystemDescriptor.fromStorageProperties(storageProperties, fsName);
        this.createTime = System.currentTimeMillis();
        // Initialize SPI filesystem for I/O; broker resolves a live endpoint per I/O call
        if (fileSystemDescriptor.getStorageType() != FsStorageType.BROKER) {
            try {
                this.spiFs = FileSystemFactory.getFileSystem(storageProperties);
            } catch (IOException e) {
                LOG.warn("Failed to initialize SPI filesystem for new repository {}: {}", name, e.getMessage());
            }
        }
    }


    // eg: __info_2018-01-01-08-00-00
    private static String jobInfoFileNameWithTimestamp(long createTime) {
        if (createTime == -1) {
            return PREFIX_JOB_INFO;
        } else {
            return PREFIX_JOB_INFO
                    + TimeUtils.longToTimeString(createTime, TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
        }
    }

    // join the name with specified prefix
    private static String joinPrefix(String prefix, Object name) {
        return prefix + name;
    }

    // disjoint the name with specified prefix
    private static String disjoinPrefix(String prefix, String nameWithPrefix) {
        return nameWithPrefix.substring(prefix.length());
    }

    private static String assembleFileNameWithSuffix(String filePath, String md5sum) {
        return filePath + CHECKSUM_SEPARATOR + md5sum;
    }

    public static Pair<String, String> decodeFileNameWithChecksum(String fileNameWithChecksum) {
        int index = fileNameWithChecksum.lastIndexOf(CHECKSUM_SEPARATOR);
        if (index == -1) {
            return null;
        }
        String fileName = fileNameWithChecksum.substring(0, index);
        String md5sum = fileNameWithChecksum.substring(index + CHECKSUM_SEPARATOR.length());

        if (md5sum.length() != 32) {
            return null;
        }

        return Pair.of(fileName, md5sum);
    }

    // in: /path/to/orig_file
    // out: /path/to/orig_file.BUWDnl831e4nldsf
    public static String replaceFileNameWithChecksumFileName(String origPath, String fileNameWithChecksum) {
        return origPath.substring(0, origPath.lastIndexOf(PATH_DELIMITER) + 1) + fileNameWithChecksum;
    }

    public static Repository read(DataInput in) throws IOException {
        return GsonUtils.GSON.fromJson(Text.readString(in), Repository.class);
    }

    @Override
    public void gsonPostProcess() {
        // Determine source of properties: prefer new descriptor, fall back to legacy field.
        Map<String, String> fsProps;
        if (fileSystemDescriptor != null) {
            fsProps = fileSystemDescriptor.getProperties();
        } else if (legacyFileSystem != null) {
            fsProps = legacyFileSystem.properties;
            // Migrate to new descriptor so the next write uses the new format.
            fileSystemDescriptor = FileSystemDescriptor.fromPersistentFileSystem(legacyFileSystem);
        } else {
            return;
        }
        // Initialize SPI filesystem for I/O; broker resolves a live endpoint per I/O call
        if (fileSystemDescriptor.getStorageType() != FsStorageType.BROKER) {
            try {
                this.spiFs = FileSystemFactory.getFileSystem(StorageProperties.createPrimary(fsProps));
            } catch (IOException | RuntimeException e) {
                LOG.warn("Failed to initialize SPI filesystem for repository {}: {}", name, e.getMessage());
            }
        }
    }

    public long getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public boolean isReadOnly() {
        return isReadOnly;
    }

    public String getLocation() {
        if (fileSystemDescriptor == null
                || fileSystemDescriptor.getStorageType() == FsStorageType.BROKER) {
            return location;
        }
        try {
            return StorageProperties.createPrimary(fileSystemDescriptor.getProperties())
                    .validateAndNormalizeUri(location);
        } catch (UserException e) {
            throw new RuntimeException(e);
        }
    }

    public String getErrorMsg() {
        return errMsg;
    }

    /**
     * Acquires an SPI FileSystem for I/O operations.
     * <ul>
     *   <li>Non-broker: returns the shared {@link #spiFs} instance (do not close it).</li>
     *   <li>Broker: resolves a live broker endpoint via BrokerMgr and creates a per-call
     *       instance that <b>must</b> be closed by calling {@link #releaseSpiFs}.</li>
     * </ul>
     */
    private org.apache.doris.filesystem.spi.FileSystem acquireSpiFs() throws IOException {
        if (spiFs != null) {
            return spiFs;
        }
        // Broker: resolve live endpoint and create a per-call filesystem
        try {
            BrokerProperties bp = BrokerProperties.of(fileSystemDescriptor.getName(),
                    fileSystemDescriptor.getProperties());
            FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getBroker(fileSystemDescriptor.getName(), "127.0.0.1");
            String clientId = NetUtils.getHostPortInAccessibleFormat(
                    FrontendOptions.getLocalHostAddress(), Config.edit_log_port);
            return FileSystemFactory.getBrokerFileSystem(broker.host, broker.port, clientId,
                    bp.getBrokerParams());
        } catch (AnalysisException e) {
            throw new IOException("Failed to acquire broker filesystem for repository " + name
                    + ": " + e.getMessage(), e);
        }
    }

    /**
     * Releases an SPI FileSystem acquired via {@link #acquireSpiFs}.
     * Closes broker per-call instances; leaves the shared non-broker instance open.
     */
    private void releaseSpiFs(org.apache.doris.filesystem.spi.FileSystem fs) {
        if (fs != spiFs) {
            try {
                fs.close();
            } catch (IOException e) {
                LOG.warn("Failed to close broker filesystem for repository {}", name, e);
            }
        }
    }

    /** Uploads a local file to a remote path via the SPI filesystem. */
    private static void spiUploadFile(org.apache.doris.filesystem.spi.FileSystem fs,
            String localFilePath, String remotePath) throws IOException {
        DorisOutputFile outputFile = fs.newOutputFile(Location.of(remotePath));
        try (java.io.InputStream in = Files.newInputStream(Paths.get(localFilePath));
                java.io.OutputStream out = outputFile.create()) {
            copyStream(in, out);
        }
    }

    /** Copies all bytes from {@code in} to {@code out} (Java-8-compatible alternative to transferTo). */
    private static void copyStream(java.io.InputStream in, java.io.OutputStream out) throws IOException {
        byte[] buf = new byte[8192];
        int n;
        while ((n = in.read(buf)) >= 0) {
            out.write(buf, 0, n);
        }
    }

    public long getCreateTime() {
        return createTime;
    }

    // create repository dir and repo info file
    public Status initRepository() {
        if (FeConstants.runningUnitTest) {
            return Status.OK;
        }

        String repoInfoFilePath = assembleRepoInfoFilePath();
        org.apache.doris.filesystem.spi.FileSystem fs;
        try {
            fs = acquireSpiFs();
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to acquire filesystem: " + e.getMessage());
        }
        try {
            if (fs.exists(Location.of(repoInfoFilePath))) {
                // Repo info file exists: download and parse it
                String localFilePath = BackupHandler.BACKUP_ROOT_DIR + "/tmp_info_" + allocLocalFileSuffix();
                try {
                    DorisInputFile inputFile = fs.newInputFile(Location.of(repoInfoFilePath));
                    try (java.io.InputStream in = inputFile.newStream();
                            java.io.OutputStream localOut = Files.newOutputStream(Paths.get(localFilePath))) {
                        copyStream(in, localOut);
                    }
                    byte[] bytes = Files.readAllBytes(Paths.get(localFilePath));
                    String json = new String(bytes, StandardCharsets.UTF_8);
                    JSONObject root = (JSONObject) JSONValue.parse(json);
                    if (name.compareTo((String) root.get("name")) != 0) {
                        return new Status(ErrCode.COMMON_ERROR,
                                "Invalid repository __repo_info, expected repo '" + name + "', but get name '"
                                        + root.get("name") + "' from " + repoInfoFilePath);
                    }
                    name = (String) root.get("name");
                    createTime = TimeUtils.timeStringToLong((String) root.get("create_time"));
                    if (createTime == -1) {
                        return new Status(ErrCode.COMMON_ERROR,
                                "failed to parse create time of repository: " + root.get("create_time"));
                    }
                    return Status.OK;
                } catch (IOException e) {
                    return new Status(ErrCode.COMMON_ERROR, "failed to read repo info file: " + e.getMessage());
                } finally {
                    new File(localFilePath).delete();
                }
            } else {
                // Repo doesn't exist yet: create the info file
                JSONObject root = new JSONObject();
                root.put("name", name);
                root.put("create_time", TimeUtils.longToTimeString(createTime));
                String repoInfoContent = root.toString();
                DorisOutputFile outputFile = fs.newOutputFile(Location.of(repoInfoFilePath));
                try (java.io.OutputStream out = outputFile.create()) {
                    out.write(repoInfoContent.getBytes(StandardCharsets.UTF_8));
                }
                return Status.OK;
            }
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to init repository: " + e.getMessage());
        } finally {
            releaseSpiFs(fs);
        }
    }

    // eg: location/__palo_repository_repo_name/__repo_info
    public String assembleRepoInfoFilePath() {
        return Joiner.on(PATH_DELIMITER).join(getLocation(),
                joinPrefix(PREFIX_REPO, name),
                FILE_REPO_INFO);
    }

    // eg: location/__palo_repository_repo_name/__my_sp1/__meta
    public String assembleMetaInfoFilePath(String label) {
        return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
                joinPrefix(PREFIX_SNAPSHOT_DIR, label),
                FILE_META_INFO);
    }

    // eg: location/__palo_repository_repo_name/__my_sp1/__info_2018-01-01-08-00-00
    public String assembleJobInfoFilePath(String label, long createTime) {
        return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
                joinPrefix(PREFIX_SNAPSHOT_DIR, label),
                jobInfoFileNameWithTimestamp(createTime));
    }

    // eg:
    // __palo_repository_repo_name/__ss_my_ss1/__ss_content/__db_10001/__tbl_10020/__part_10031/__idx_10020/__10022/
    public String getRepoTabletPathBySnapshotInfo(String label, SnapshotInfo info) {
        String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
                joinPrefix(PREFIX_SNAPSHOT_DIR, label),
                DIR_SNAPSHOT_CONTENT,
                joinPrefix(PREFIX_DB, info.getDbId()),
                joinPrefix(PREFIX_TBL, info.getTblId()),
                joinPrefix(PREFIX_PART, info.getPartitionId()),
                joinPrefix(PREFIX_IDX, info.getIndexId()),
                joinPrefix(PREFIX_COMMON, info.getTabletId()));
        try {
            // we need to normalize the path to avoid double "/" in path, or else some client such as S3 sdk can not
            // handle it correctly.
            return new URI(path).normalize().toString();
        } catch (URISyntaxException e) {
            LOG.warn("failed to normalize path: {}", path, e);
            return null;
        }
    }

    public String getRepoPath(String label, String childPath) {
        String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
                joinPrefix(PREFIX_SNAPSHOT_DIR, label),
                DIR_SNAPSHOT_CONTENT,
                childPath);
        try {
            URI uri = new URI(path);
            return uri.normalize().toString();
        } catch (Exception e) {
            LOG.warn("Invalid path: " + path, e);
            return null;
        }
    }

    // Check if this repo is available.
    // If failed to connect this repo, set errMsg and return false.
    public boolean ping() {
        if (FeConstants.runningUnitTest) {
            return true;
        }
        // for s3 sdk, the headObject() method does not support list "dir",
        // so we check FILE_REPO_INFO instead.
        String path = location + "/" + joinPrefix(PREFIX_REPO, name) + "/" + FILE_REPO_INFO;
        try {
            URI checkUri = new URI(path);
            org.apache.doris.filesystem.spi.FileSystem fs = acquireSpiFs();
            try {
                boolean exists = fs.exists(Location.of(checkUri.normalize().toString()));
                if (!exists) {
                    errMsg = TimeUtils.longToTimeString(System.currentTimeMillis())
                            + ": path does not exist: " + path;
                    return false;
                }
                errMsg = null;
                return true;
            } finally {
                releaseSpiFs(fs);
            }
        } catch (URISyntaxException e) {
            errMsg = TimeUtils.longToTimeString(System.currentTimeMillis())
                    + ": Invalid path. " + path + ", error: " + e.getMessage();
            return false;
        } catch (IOException e) {
            errMsg = TimeUtils.longToTimeString(System.currentTimeMillis()) + ": " + e.getMessage();
            return false;
        }
    }

    // Visit the repository, and list all existing snapshot names
    public Status listSnapshots(List<String> snapshotNames) {
        // list with prefix:
        // eg. __palo_repository_repo_name/__ss_*
        String listPath = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), PREFIX_SNAPSHOT_DIR)
                + "*";
        org.apache.doris.filesystem.spi.FileSystem fs;
        try {
            fs = acquireSpiFs();
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to acquire filesystem: " + e.getMessage());
        }
        try {
            try (FileIterator it = fs.list(Location.of(listPath))) {
                while (it.hasNext()) {
                    FileEntry entry = it.next();
                    if (!entry.isDirectory()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("get snapshot path {} which is not a dir", entry.location());
                        }
                        continue;
                    }
                    String uri = entry.location().uri();
                    String entryName = uri.substring(uri.lastIndexOf('/') + 1);
                    snapshotNames.add(disjoinPrefix(PREFIX_SNAPSHOT_DIR, entryName));
                }
            }
            return Status.OK;
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to list snapshots: " + e.getMessage());
        } finally {
            releaseSpiFs(fs);
        }
    }

    //
    public boolean prepareSnapshotInfo() {
        return false;
    }

    // create remote tablet snapshot path
    // eg:
    // /location/__palo_repository_repo_name/__ss_my_ss1/__ss_content/
    // __db_10001/__tbl_10020/__part_10031/__idx_10032/__10023/__3481721
    public String assembleRemoteSnapshotPath(String label, SnapshotInfo info) {
        String path = Joiner.on(PATH_DELIMITER).join(location,
                joinPrefix(PREFIX_REPO, name),
                joinPrefix(PREFIX_SNAPSHOT_DIR, label),
                DIR_SNAPSHOT_CONTENT,
                joinPrefix(PREFIX_DB, info.getDbId()),
                joinPrefix(PREFIX_TBL, info.getTblId()),
                joinPrefix(PREFIX_PART, info.getPartitionId()),
                joinPrefix(PREFIX_IDX, info.getIndexId()),
                joinPrefix(PREFIX_COMMON, info.getTabletId()),
                joinPrefix(PREFIX_COMMON, info.getSchemaHash()));
        if (LOG.isDebugEnabled()) {
            LOG.debug("get remote tablet snapshot path: {}", path);
        }
        return path;
    }

    public Status getSnapshotInfoFile(String label, String backupTimestamp, List<BackupJobInfo> infos) {
        String remoteInfoFilePath = assembleJobInfoFilePath(label, -1) + backupTimestamp;
        File localInfoFile = new File(BackupHandler.BACKUP_ROOT_DIR + PATH_DELIMITER
                + "info_" + allocLocalFileSuffix());
        try {
            Status st = download(remoteInfoFilePath, localInfoFile.getPath());
            if (!st.ok()) {
                return st;
            }

            BackupJobInfo jobInfo = BackupJobInfo.fromFile(localInfoFile.getAbsolutePath());
            infos.add(jobInfo);
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to create job info from file: "
                    + "" + localInfoFile.getName() + ". msg: " + e.getMessage());
        } finally {
            localInfoFile.delete();
        }

        return Status.OK;
    }

    public Status getSnapshotMetaFile(String label, List<BackupMeta> backupMetas, int metaVersion) {
        String remoteMetaFilePath = assembleMetaInfoFilePath(label);
        File localMetaFile = new File(BackupHandler.BACKUP_ROOT_DIR + PATH_DELIMITER
                + "meta_" + allocLocalFileSuffix());

        try {
            Status st = download(remoteMetaFilePath, localMetaFile.getAbsolutePath());
            if (!st.ok()) {
                return st;
            }

            // read file to backupMeta
            BackupMeta backupMeta = BackupMeta.fromFile(localMetaFile.getAbsolutePath(), metaVersion);
            backupMetas.add(backupMeta);
        } catch (IOException e) {
            LOG.warn("failed to read backup meta from file", e);
            return new Status(ErrCode.COMMON_ERROR, "Failed create backup meta from file: "
                    + localMetaFile.getAbsolutePath() + ", msg: " + e.getMessage());
        } catch (IllegalArgumentException e) {
            LOG.warn("failed to set meta version", e);
            return new Status(ErrCode.COMMON_ERROR, e.getMessage());
        } finally {
            localMetaFile.delete();
        }

        return Status.OK;
    }

    // upload the local file to specified remote file with checksum
    // remoteFilePath should be FULL path
    public Status upload(String localFilePath, String remoteFilePath) {
        // get md5sum of local file
        File file = new File(localFilePath);
        String md5sum;
        try (FileInputStream fis = new FileInputStream(file)) {
            md5sum = DigestUtils.md5Hex(fis);
        } catch (FileNotFoundException e) {
            return new Status(ErrCode.NOT_FOUND, "file " + localFilePath + " does not exist");
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "failed to get md5sum of file: " + localFilePath);
        }
        Preconditions.checkState(!Strings.isNullOrEmpty(md5sum));
        String finalRemotePath = assembleFileNameWithSuffix(remoteFilePath, md5sum);

        org.apache.doris.filesystem.spi.FileSystem fs;
        try {
            fs = acquireSpiFs();
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to acquire filesystem: " + e.getMessage());
        }
        try {
            if (fileSystemDescriptor.getStorageType() == FsStorageType.BROKER) {
                // Broker doesn't support atomic overwrite; use temp-file dance
                String tmpRemotePath = assembleFileNameWithSuffix(remoteFilePath, SUFFIX_TMP_FILE);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("upload with temp file: local={}, tmp={}, final={}",
                            localFilePath, tmpRemotePath, finalRemotePath);
                }
                fs.delete(Location.of(tmpRemotePath), false);
                fs.delete(Location.of(finalRemotePath), false);
                spiUploadFile(fs, localFilePath, tmpRemotePath);
                fs.rename(Location.of(tmpRemotePath), Location.of(finalRemotePath));
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("upload: local={}, final={}", localFilePath, finalRemotePath);
                }
                fs.delete(Location.of(finalRemotePath), false);
                spiUploadFile(fs, localFilePath, finalRemotePath);
            }
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to upload " + localFilePath + ": " + e.getMessage());
        } finally {
            releaseSpiFs(fs);
        }

        LOG.info("finished to upload local file {} to remote file: {}", localFilePath, finalRemotePath);
        return Status.OK;
    }

    // remoteFilePath must be a file(not dir) and does not contain checksum
    public Status download(String remoteFilePath, String localFilePath) {
        org.apache.doris.filesystem.spi.FileSystem fs;
        try {
            fs = acquireSpiFs();
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to acquire filesystem: " + e.getMessage());
        }
        String md5sum;
        try {
            // 0. list to get to full name (with checksum)
            List<FileEntry> remoteFiles = fs.listFiles(Location.of(remoteFilePath + "*"));
            if (remoteFiles.size() != 1) {
                return new Status(ErrCode.COMMON_ERROR,
                        "Expected one file with path: " + remoteFilePath + ". get: " + remoteFiles.size());
            }
            if (remoteFiles.get(0).isDirectory()) {
                return new Status(ErrCode.COMMON_ERROR,
                        "Expected file with path: " + remoteFilePath + ". but get dir");
            }

            String remoteFileFullUri = remoteFiles.get(0).location().uri();
            String remoteFileName = remoteFileFullUri.substring(remoteFileFullUri.lastIndexOf('/') + 1);
            String remoteFilePathWithChecksum = replaceFileNameWithChecksumFileName(remoteFilePath, remoteFileName);
            if (LOG.isDebugEnabled()) {
                LOG.debug("get download filename with checksum: {}", remoteFilePathWithChecksum);
            }

            // 1. get checksum from remote file name
            Pair<String, String> pair = decodeFileNameWithChecksum(remoteFilePathWithChecksum);
            if (pair == null) {
                return new Status(ErrCode.COMMON_ERROR,
                        "file name should contain checksum: " + remoteFilePathWithChecksum);
            }
            if (!remoteFilePath.endsWith(pair.first)) {
                return new Status(ErrCode.COMMON_ERROR, "File does not exist: " + remoteFilePath);
            }
            md5sum = pair.second;

            // 2. download
            DorisInputFile inputFile = fs.newInputFile(Location.of(remoteFilePathWithChecksum));
            try (java.io.InputStream in = inputFile.newStream();
                    java.io.OutputStream out = Files.newOutputStream(Paths.get(localFilePath))) {
                copyStream(in, out);
            }

            // 3. verify checksum
            String localMd5sum;
            try (FileInputStream fis = new FileInputStream(localFilePath)) {
                localMd5sum = DigestUtils.md5Hex(fis);
            }
            if (!localMd5sum.equals(md5sum)) {
                return new Status(ErrCode.BAD_FILE,
                        "md5sum does not equal. local: " + localMd5sum + ", remote: " + md5sum);
            }
            return Status.OK;
        } catch (FileNotFoundException e) {
            return new Status(ErrCode.NOT_FOUND, "file " + localFilePath + " does not exist");
        } catch (IOException e) {
            return new Status(ErrCode.COMMON_ERROR, "Failed to download file: " + e.getMessage());
        } finally {
            releaseSpiFs(fs);
        }
    }

    public Status getBrokerAddress(Long beId, Env env, List<FsBroker> brokerAddrs) {
        // get backend
        Backend be = Env.getCurrentSystemInfo().getBackend(beId);
        if (be == null) {
            return new Status(ErrCode.COMMON_ERROR, "backend " + beId + " is missing. "
                    + "failed to send upload snapshot task");
        }
        // only Broker storage backend need to get broker addr, other type return a fake one;
        if (fileSystemDescriptor.getStorageType() != FsStorageType.BROKER) {
            brokerAddrs.add(new FsBroker("127.0.0.1", 0));
            return Status.OK;
        }

        // get proper broker for this backend
        FsBroker brokerAddr = null;
        try {
            brokerAddr = env.getBrokerMgr().getBroker(fileSystemDescriptor.getName(), be.getHost());
        } catch (AnalysisException e) {
            return new Status(ErrCode.COMMON_ERROR, "failed to get address of broker "
                    + fileSystemDescriptor.getName() + " when try to send upload snapshot task: "
                    + e.getMessage());
        }
        if (brokerAddr == null) {
            return new Status(ErrCode.COMMON_ERROR, "failed to get address of broker "
                    + fileSystemDescriptor.getName() + " when try to send upload snapshot task");
        }
        brokerAddrs.add(brokerAddr);
        return Status.OK;
    }

    public List<String> getInfo() {
        List<String> info = Lists.newArrayList();
        info.add(String.valueOf(id));
        info.add(name);
        info.add(TimeUtils.longToTimeString(createTime));
        info.add(String.valueOf(isReadOnly));
        info.add(location);
        info.add(fileSystemDescriptor.getStorageType() != FsStorageType.BROKER
                ? "-" : fileSystemDescriptor.getName());
        info.add(fileSystemDescriptor.getStorageType().name());
        info.add(errMsg == null ? FeConstants.null_string : errMsg);
        return info;
    }

    public List<List<String>> getSnapshotInfos(String snapshotName, String timestamp)
            throws AnalysisException {
        List<List<String>> snapshotInfos = Lists.newArrayList();
        if (Strings.isNullOrEmpty(snapshotName)) {
            // get all snapshot infos
            List<String> snapshotNames = Lists.newArrayList();
            Status status = listSnapshots(snapshotNames);
            if (!status.ok()) {
                throw new AnalysisException(
                        "Failed to list snapshot in repo: " + name + ", err: " + status.getErrMsg());
            }

            for (String ssName : snapshotNames) {
                List<String> info = getSnapshotInfo(ssName, null /* get all timestamp */);
                snapshotInfos.add(info);
            }
        } else {
            // get specified snapshot info
            List<String> info = getSnapshotInfo(snapshotName, timestamp);
            snapshotInfos.add(info);
        }

        return snapshotInfos;
    }

    public String getCreateStatement() {
        StringBuilder stmtBuilder = new StringBuilder();
        stmtBuilder.append("CREATE ");
        if (this.isReadOnly) {
            stmtBuilder.append("READ ONLY ");
        }
        stmtBuilder.append("REPOSITORY ");
        stmtBuilder.append(this.name);
        stmtBuilder.append(" \nWITH ");
        FsStorageType storageType = fileSystemDescriptor.getStorageType();
        if (storageType == FsStorageType.S3) {
            stmtBuilder.append(" S3 ");
        } else if (storageType == FsStorageType.HDFS) {
            stmtBuilder.append(" HDFS ");
        } else if (storageType == FsStorageType.BROKER) {
            stmtBuilder.append(" BROKER ");
            stmtBuilder.append(fileSystemDescriptor.getName());
        } else {
            // should never reach here
            throw new UnsupportedOperationException(storageType.toString() + " backend is not implemented");
        }
        stmtBuilder.append(" \nON LOCATION \"");
        stmtBuilder.append(this.location);
        stmtBuilder.append("\"");

        stmtBuilder.append("\nPROPERTIES\n(");
        Map<String, String> properties = new HashMap();
        properties.putAll(fileSystemDescriptor.getProperties());
        stmtBuilder.append(new DatasourcePrintableMap<>(properties, " = ", true, true, true));
        stmtBuilder.append("\n)");
        return stmtBuilder.toString();
    }

    private List<String> getSnapshotInfo(String snapshotName, String timestamp) {
        List<String> info = Lists.newArrayList();
        if (Strings.isNullOrEmpty(timestamp)) {
            // get all timestamps
            // path eg: /location/__palo_repository_repo_name/__ss_my_snap/__info_*
            String infoFilePath = assembleJobInfoFilePath(snapshotName, -1);
            if (LOG.isDebugEnabled()) {
                LOG.debug("assemble infoFilePath: {}, snapshot: {}", infoFilePath, snapshotName);
            }
            try {
                org.apache.doris.filesystem.spi.FileSystem fs = acquireSpiFs();
                List<FileEntry> results;
                try {
                    results = fs.listFiles(Location.of(infoFilePath + "*"));
                } finally {
                    releaseSpiFs(fs);
                }
                List<String> tmp = Lists.newArrayList();
                for (FileEntry entry : results) {
                    // __info_2018-04-18-20-11-00.Jdwnd9312sfdn1294343
                    String uri = entry.location().uri();
                    String fileName = uri.substring(uri.lastIndexOf('/') + 1);
                    Pair<String, String> pureFileName = decodeFileNameWithChecksum(fileName);
                    if (pureFileName == null) {
                        // maybe: __info_2018-04-18-20-11-00.part
                        tmp.add("Invalid: " + fileName);
                        continue;
                    }
                    tmp.add(disjoinPrefix(PREFIX_JOB_INFO, pureFileName.first));
                }
                if (!tmp.isEmpty()) {
                    info.add(snapshotName);
                    info.add(Joiner.on("\n").join(tmp));
                    info.add("OK");
                } else {
                    info.add(snapshotName);
                    info.add(FeConstants.null_string);
                    info.add("ERROR: No info file found");
                }
            } catch (IOException e) {
                info.add(snapshotName);
                info.add(FeConstants.null_string);
                info.add("ERROR: Failed to get info: " + e.getMessage());
            }
        } else {
            // get specified timestamp, different repos might have snapshots with same timestamp.
            String localFilePath = BackupHandler.BACKUP_ROOT_DIR + "/"
                    + Repository.PREFIX_JOB_INFO + allocLocalFileSuffix();
            try {
                String remoteInfoFilePath = assembleJobInfoFilePath(snapshotName, -1) + timestamp;
                Status st = download(remoteInfoFilePath, localFilePath);
                if (!st.ok()) {
                    info.add(snapshotName);
                    info.add(timestamp);
                    info.add(FeConstants.null_string);
                    info.add(FeConstants.null_string);
                    info.add("Failed to get info: " + st.getErrMsg());
                } else {
                    try {
                        BackupJobInfo jobInfo = BackupJobInfo.fromFile(localFilePath);
                        info.add(snapshotName);
                        info.add(timestamp);
                        info.add(jobInfo.dbName);
                        info.add(jobInfo.getBrief());
                        info.add("OK");
                    } catch (IOException e) {
                        info.add(snapshotName);
                        info.add(timestamp);
                        info.add(FeConstants.null_string);
                        info.add(FeConstants.null_string);
                        info.add("Failed to read info from local file: " + e.getMessage());
                    }
                }
            } finally {
                // delete tmp local file
                File localFile = new File(localFilePath);
                if (localFile.exists()) {
                    localFile.delete();
                }
            }
        }

        return info;
    }

    // Allocate an unique suffix.
    private String allocLocalFileSuffix() {
        return System.currentTimeMillis() + UUID.randomUUID().toString().replace("-", "_");
    }

    @Override
    public void write(DataOutput out) throws IOException {
        Text.writeString(out, GsonUtils.GSON.toJson(this));
    }
}