Repository.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fsv2.FileSystemFactory;
import org.apache.doris.fsv2.PersistentFileSystem;
import org.apache.doris.fsv2.remote.BrokerFileSystem;
import org.apache.doris.fsv2.remote.RemoteFile;
import org.apache.doris.fsv2.remote.RemoteFileSystem;
import org.apache.doris.fsv2.remote.S3FileSystem;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Backend;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
/*
* Repository represents a remote storage for backup to or restore from
* File organization in repository is:
*
* * __palo_repository_repo_name/
* * __repo_info
* * __ss_my_ss1/
* * __meta__DJdwnfiu92n
* * __info_2018-01-01-08-00-00.OWdn90ndwpu
* * __info_2018-01-02-08-00-00.Dnvdio298da
* * __info_2018-01-03-08-00-00.O79adbneJWk
* * __ss_content/
* * __db_10001/
* * __tbl_10010/
* * __tbl_10020/
* * __part_10021/
* * __part_10031/
* * __idx_10041/
* * __idx_10020/
* * __10022/
* * __10023/
* * __10023_seg1.dat.NUlniklnwDN67
* * __10023_seg2.dat.DNW231dnklawd
* * __10023.hdr.dnmwDDWI92dDko
*/
public class Repository implements Writable, GsonPostProcessable {
public static final String PREFIX_REPO = "__palo_repository_";
public static final String PREFIX_SNAPSHOT_DIR = "__ss_";
public static final String PREFIX_DB = "__db_";
public static final String PREFIX_TBL = "__tbl_";
public static final String PREFIX_PART = "__part_";
public static final String PREFIX_IDX = "__idx_";
public static final String PREFIX_COMMON = "__";
public static final String PREFIX_JOB_INFO = "__info_";
public static final String SUFFIX_TMP_FILE = "part";
public static final String FILE_REPO_INFO = "__repo_info";
public static final String FILE_META_INFO = "__meta";
public static final String DIR_SNAPSHOT_CONTENT = "__ss_content";
public static final String KEEP_ON_LOCAL_REPO_NAME = "__keep_on_local__";
public static final long KEEP_ON_LOCAL_REPO_ID = -1;
private static final Logger LOG = LogManager.getLogger(Repository.class);
private static final String PATH_DELIMITER = "/";
private static final String CHECKSUM_SEPARATOR = ".";
@SerializedName("id")
private long id;
@SerializedName("n")
private String name;
private String errMsg;
@SerializedName("ct")
private long createTime;
// If True, user can not backup data to this repo.
@SerializedName("iro")
private boolean isReadOnly;
// BOS location should start with "bos://your_bucket_name/"
// and the specified bucket should exist.
@SerializedName("lo")
private String location;
@SerializedName("fs")
private org.apache.doris.fs.PersistentFileSystem oldfs;
// Temporary field: currently still using the legacy fs config (oldfs).
// This field can be removed once the new fs configuration is fully enabled.
private PersistentFileSystem fileSystem;
public org.apache.doris.fs.PersistentFileSystem getOldfs() {
return oldfs;
}
private Repository() {
// for persist
}
public Repository(long id, String name, boolean isReadOnly, String location, RemoteFileSystem fileSystem,
org.apache.doris.fs.PersistentFileSystem oldFs) {
this.id = id;
this.name = name;
this.isReadOnly = isReadOnly;
this.location = location;
this.fileSystem = fileSystem;
this.oldfs = oldFs;
this.createTime = System.currentTimeMillis();
}
// join job info file name with timestamp
// eg: __info_2018-01-01-08-00-00
private static String jobInfoFileNameWithTimestamp(long createTime) {
if (createTime == -1) {
return PREFIX_JOB_INFO;
} else {
return PREFIX_JOB_INFO
+ TimeUtils.longToTimeString(createTime, TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
}
}
// join the name with specified prefix
private static String joinPrefix(String prefix, Object name) {
return prefix + name;
}
// disjoint the name with specified prefix
private static String disjoinPrefix(String prefix, String nameWithPrefix) {
return nameWithPrefix.substring(prefix.length());
}
private static String assembleFileNameWithSuffix(String filePath, String md5sum) {
return filePath + CHECKSUM_SEPARATOR + md5sum;
}
public static Pair<String, String> decodeFileNameWithChecksum(String fileNameWithChecksum) {
int index = fileNameWithChecksum.lastIndexOf(CHECKSUM_SEPARATOR);
if (index == -1) {
return null;
}
String fileName = fileNameWithChecksum.substring(0, index);
String md5sum = fileNameWithChecksum.substring(index + CHECKSUM_SEPARATOR.length());
if (md5sum.length() != 32) {
return null;
}
return Pair.of(fileName, md5sum);
}
// in: /path/to/orig_file
// out: /path/to/orig_file.BUWDnl831e4nldsf
public static String replaceFileNameWithChecksumFileName(String origPath, String fileNameWithChecksum) {
return origPath.substring(0, origPath.lastIndexOf(PATH_DELIMITER) + 1) + fileNameWithChecksum;
}
public static Repository read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) {
Repository repo = new Repository();
repo.readFields(in);
return repo;
} else {
return GsonUtils.GSON.fromJson(Text.readString(in), Repository.class);
}
}
//todo why only support alter S3 properties
public Status alterRepositoryS3Properties(Map<String, String> properties) {
if (this.fileSystem instanceof S3FileSystem) {
Map<String, String> oldProperties = new HashMap<>(this.getRemoteFileSystem().getProperties());
oldProperties.remove(S3Properties.ACCESS_KEY);
oldProperties.remove(S3Properties.SECRET_KEY);
oldProperties.remove(S3Properties.SESSION_TOKEN);
oldProperties.remove(S3Properties.Env.ACCESS_KEY);
oldProperties.remove(S3Properties.Env.SECRET_KEY);
oldProperties.remove(S3Properties.Env.TOKEN);
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
|| Objects.equals(entry.getKey(), S3Properties.Env.ACCESS_KEY)) {
oldProperties.putIfAbsent(S3Properties.ACCESS_KEY, entry.getValue());
}
if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
|| Objects.equals(entry.getKey(), S3Properties.Env.SECRET_KEY)) {
oldProperties.putIfAbsent(S3Properties.SECRET_KEY, entry.getValue());
}
if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
|| Objects.equals(entry.getKey(), S3Properties.Env.TOKEN)) {
oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN, entry.getValue());
}
}
properties.clear();
properties.putAll(oldProperties);
return Status.OK;
} else {
return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository");
}
}
@Override
public void gsonPostProcess() {
StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
if (this.oldfs.properties.containsKey(org.apache.doris.fs.PersistentFileSystem.STORAGE_TYPE)) {
type = StorageBackend.StorageType.valueOf(
this.oldfs.properties.get(org.apache.doris.fs.PersistentFileSystem.STORAGE_TYPE));
this.oldfs.properties.remove(org.apache.doris.fs.PersistentFileSystem.STORAGE_TYPE);
}
this.oldfs = org.apache.doris.fs.FileSystemFactory.get(this.oldfs.getName(),
type,
this.oldfs.getProperties());
if (!type.equals(StorageBackend.StorageType.BROKER)) {
StorageProperties storageProperties = StorageProperties.createPrimary(this.oldfs.properties);
this.fileSystem = FileSystemFactory.get(storageProperties);
}
}
public long getId() {
return id;
}
public String getName() {
return name;
}
public boolean isReadOnly() {
return isReadOnly;
}
public String getLocation() {
if (null == fileSystem) {
return location;
}
try {
if (null == fileSystem.getStorageProperties()) {
return location;
} else {
return fileSystem.getStorageProperties().validateAndNormalizeUri(location);
}
} catch (UserException e) {
throw new RuntimeException(e);
}
}
public String getErrorMsg() {
return errMsg;
}
public PersistentFileSystem getRemoteFileSystem() {
return fileSystem;
}
public long getCreateTime() {
return createTime;
}
// create repository dir and repo info file
public Status initRepository() {
if (FeConstants.runningUnitTest) {
return Status.OK;
}
String repoInfoFilePath = assembleRepoInfoFilePath();
// check if the repo is already exist in remote
List<RemoteFile> remoteFiles = Lists.newArrayList();
Status st = fileSystem.globList(repoInfoFilePath, remoteFiles);
if (!st.ok()) {
return st;
}
if (remoteFiles.size() == 1) {
RemoteFile remoteFile = remoteFiles.get(0);
if (!remoteFile.isFile()) {
return new Status(ErrCode.COMMON_ERROR, "the existing repo info is not a file");
}
// exist, download and parse the repo info file
String localFilePath = BackupHandler.BACKUP_ROOT_DIR + "/tmp_info_" + allocLocalFileSuffix();
try {
st = fileSystem.downloadWithFileSize(repoInfoFilePath, localFilePath, remoteFile.getSize());
if (!st.ok()) {
return st;
}
byte[] bytes = Files.readAllBytes(Paths.get(localFilePath));
String json = new String(bytes, StandardCharsets.UTF_8);
JSONObject root = (JSONObject) JSONValue.parse(json);
if (name.compareTo((String) root.get("name")) != 0) {
return new Status(ErrCode.COMMON_ERROR,
"Invalid repository __repo_info, expected repo '" + name + "', but get name '"
+ (String) root.get("name") + "' from " + repoInfoFilePath);
}
name = (String) root.get("name");
createTime = TimeUtils.timeStringToLong((String) root.get("create_time"));
if (createTime == -1) {
return new Status(ErrCode.COMMON_ERROR,
"failed to parse create time of repository: " + root.get("create_time"));
}
return Status.OK;
} catch (IOException e) {
return new Status(ErrCode.COMMON_ERROR, "failed to read repo info file: " + e.getMessage());
} finally {
File localFile = new File(localFilePath);
localFile.delete();
}
} else if (remoteFiles.size() > 1) {
return new Status(ErrCode.COMMON_ERROR,
"Invalid repository dir. expected one repo info file. get more: " + remoteFiles);
} else {
// repo is already exist, get repo info
JSONObject root = new JSONObject();
root.put("name", name);
root.put("create_time", TimeUtils.longToTimeString(createTime));
String repoInfoContent = root.toString();
return fileSystem.directUpload(repoInfoContent, repoInfoFilePath);
}
}
// eg: location/__palo_repository_repo_name/__repo_info
public String assembleRepoInfoFilePath() {
return Joiner.on(PATH_DELIMITER).join(getLocation(),
joinPrefix(PREFIX_REPO, name),
FILE_REPO_INFO);
}
// eg: location/__palo_repository_repo_name/__my_sp1/__meta
public String assembleMetaInfoFilePath(String label) {
return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
FILE_META_INFO);
}
// eg: location/__palo_repository_repo_name/__my_sp1/__info_2018-01-01-08-00-00
public String assembleJobInfoFilePath(String label, long createTime) {
return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
jobInfoFileNameWithTimestamp(createTime));
}
// eg:
// __palo_repository_repo_name/__ss_my_ss1/__ss_content/__db_10001/__tbl_10020/__part_10031/__idx_10020/__10022/
public String getRepoTabletPathBySnapshotInfo(String label, SnapshotInfo info) {
String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
DIR_SNAPSHOT_CONTENT,
joinPrefix(PREFIX_DB, info.getDbId()),
joinPrefix(PREFIX_TBL, info.getTblId()),
joinPrefix(PREFIX_PART, info.getPartitionId()),
joinPrefix(PREFIX_IDX, info.getIndexId()),
joinPrefix(PREFIX_COMMON, info.getTabletId()));
try {
// we need to normalize the path to avoid double "/" in path, or else some client such as S3 sdk can not
// handle it correctly.
return new URI(path).normalize().toString();
} catch (URISyntaxException e) {
LOG.warn("failed to normalize path: {}", path, e);
return null;
}
}
public String getRepoPath(String label, String childPath) {
String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
DIR_SNAPSHOT_CONTENT,
childPath);
try {
URI uri = new URI(path);
return uri.normalize().toString();
} catch (Exception e) {
LOG.warn("Invalid path: " + path, e);
return null;
}
}
// Check if this repo is available.
// If failed to connect this repo, set errMsg and return false.
public boolean ping() {
if (FeConstants.runningUnitTest) {
return true;
}
// for s3 sdk, the headObject() method does not support list "dir",
// so we check FILE_REPO_INFO instead.
String path = location + "/" + joinPrefix(PREFIX_REPO, name) + "/" + FILE_REPO_INFO;
try {
URI checkUri = new URI(path);
Status st = fileSystem.exists(checkUri.normalize().toString());
if (!st.ok()) {
errMsg = TimeUtils.longToTimeString(System.currentTimeMillis()) + ": " + st.getErrMsg();
return false;
}
// clear err msg
errMsg = null;
return true;
} catch (URISyntaxException e) {
errMsg = TimeUtils.longToTimeString(System.currentTimeMillis())
+ ": Invalid path. " + path + ", error: " + e.getMessage();
return false;
}
}
// Visit the repository, and list all existing snapshot names
public Status listSnapshots(List<String> snapshotNames) {
// list with prefix:
// eg. __palo_repository_repo_name/__ss_*
String listPath = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), PREFIX_SNAPSHOT_DIR)
+ "*";
List<RemoteFile> result = Lists.newArrayList();
Status st = fileSystem.globList(listPath, result);
if (!st.ok()) {
return st;
}
for (RemoteFile remoteFile : result) {
if (remoteFile.isFile()) {
if (LOG.isDebugEnabled()) {
LOG.debug("get snapshot path{} which is not a dir", remoteFile);
}
continue;
}
snapshotNames.add(disjoinPrefix(PREFIX_SNAPSHOT_DIR, remoteFile.getName()));
}
return Status.OK;
}
//
public boolean prepareSnapshotInfo() {
return false;
}
// create remote tablet snapshot path
// eg:
// /location/__palo_repository_repo_name/__ss_my_ss1/__ss_content/
// __db_10001/__tbl_10020/__part_10031/__idx_10032/__10023/__3481721
public String assembleRemoteSnapshotPath(String label, SnapshotInfo info) {
String path = Joiner.on(PATH_DELIMITER).join(location,
joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
DIR_SNAPSHOT_CONTENT,
joinPrefix(PREFIX_DB, info.getDbId()),
joinPrefix(PREFIX_TBL, info.getTblId()),
joinPrefix(PREFIX_PART, info.getPartitionId()),
joinPrefix(PREFIX_IDX, info.getIndexId()),
joinPrefix(PREFIX_COMMON, info.getTabletId()),
joinPrefix(PREFIX_COMMON, info.getSchemaHash()));
if (LOG.isDebugEnabled()) {
LOG.debug("get remote tablet snapshot path: {}", path);
}
return path;
}
public Status getSnapshotInfoFile(String label, String backupTimestamp, List<BackupJobInfo> infos) {
String remoteInfoFilePath = assembleJobInfoFilePath(label, -1) + backupTimestamp;
File localInfoFile = new File(BackupHandler.BACKUP_ROOT_DIR + PATH_DELIMITER
+ "info_" + allocLocalFileSuffix());
try {
Status st = download(remoteInfoFilePath, localInfoFile.getPath());
if (!st.ok()) {
return st;
}
BackupJobInfo jobInfo = BackupJobInfo.fromFile(localInfoFile.getAbsolutePath());
infos.add(jobInfo);
} catch (IOException e) {
return new Status(ErrCode.COMMON_ERROR, "Failed to create job info from file: "
+ "" + localInfoFile.getName() + ". msg: " + e.getMessage());
} finally {
localInfoFile.delete();
}
return Status.OK;
}
public Status getSnapshotMetaFile(String label, List<BackupMeta> backupMetas, int metaVersion) {
String remoteMetaFilePath = assembleMetaInfoFilePath(label);
File localMetaFile = new File(BackupHandler.BACKUP_ROOT_DIR + PATH_DELIMITER
+ "meta_" + allocLocalFileSuffix());
try {
Status st = download(remoteMetaFilePath, localMetaFile.getAbsolutePath());
if (!st.ok()) {
return st;
}
// read file to backupMeta
BackupMeta backupMeta = BackupMeta.fromFile(localMetaFile.getAbsolutePath(), metaVersion);
backupMetas.add(backupMeta);
} catch (IOException e) {
LOG.warn("failed to read backup meta from file", e);
return new Status(ErrCode.COMMON_ERROR, "Failed create backup meta from file: "
+ localMetaFile.getAbsolutePath() + ", msg: " + e.getMessage());
} catch (IllegalArgumentException e) {
LOG.warn("failed to set meta version", e);
return new Status(ErrCode.COMMON_ERROR, e.getMessage());
} finally {
localMetaFile.delete();
}
return Status.OK;
}
// upload the local file to specified remote file with checksum
// remoteFilePath should be FULL path
public Status upload(String localFilePath, String remoteFilePath) {
// Preconditions.checkArgument(remoteFilePath.startsWith(location), remoteFilePath);
// get md5usm of local file
File file = new File(localFilePath);
String md5sum;
try (FileInputStream fis = new FileInputStream(file)) {
md5sum = DigestUtils.md5Hex(fis);
} catch (FileNotFoundException e) {
return new Status(ErrCode.NOT_FOUND, "file " + localFilePath + " does not exist");
} catch (IOException e) {
return new Status(ErrCode.COMMON_ERROR, "failed to get md5sum of file: " + localFilePath);
}
Preconditions.checkState(!Strings.isNullOrEmpty(md5sum));
String finalRemotePath = assembleFileNameWithSuffix(remoteFilePath, md5sum);
Status st = Status.OK;
if (fileSystem instanceof BrokerFileSystem) {
// this may be a retry, so we should first delete remote file
String tmpRemotePath = assembleFileNameWithSuffix(remoteFilePath, SUFFIX_TMP_FILE);
if (LOG.isDebugEnabled()) {
LOG.debug("get md5sum of file: {}. tmp remote path: {}. final remote path: {}",
localFilePath, tmpRemotePath, finalRemotePath);
}
st = fileSystem.delete(tmpRemotePath);
if (!st.ok()) {
return st;
}
st = fileSystem.delete(finalRemotePath);
if (!st.ok()) {
return st;
}
// upload tmp file
st = fileSystem.upload(localFilePath, tmpRemotePath);
if (!st.ok()) {
return st;
}
// rename tmp file with checksum named file
st = fileSystem.rename(tmpRemotePath, finalRemotePath);
if (!st.ok()) {
return st;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("get md5sum of file: {}. final remote path: {}", localFilePath, finalRemotePath);
}
st = fileSystem.delete(finalRemotePath);
if (!st.ok()) {
return st;
}
// upload final file
st = fileSystem.upload(localFilePath, finalRemotePath);
if (!st.ok()) {
return st;
}
}
LOG.info("finished to upload local file {} to remote file: {}", localFilePath, finalRemotePath);
return st;
}
// remoteFilePath must be a file(not dir) and does not contain checksum
public Status download(String remoteFilePath, String localFilePath) {
// 0. list to get to full name(with checksum)
List<RemoteFile> remoteFiles = Lists.newArrayList();
Status status = fileSystem.globList(remoteFilePath + "*", remoteFiles);
if (!status.ok()) {
return status;
}
if (remoteFiles.size() != 1) {
return new Status(ErrCode.COMMON_ERROR,
"Expected one file with path: " + remoteFilePath + ". get: " + remoteFiles.size());
}
if (!remoteFiles.get(0).isFile()) {
return new Status(ErrCode.COMMON_ERROR, "Expected file with path: " + remoteFilePath + ". but get dir");
}
String remoteFilePathWithChecksum = replaceFileNameWithChecksumFileName(remoteFilePath,
remoteFiles.get(0).getName());
if (LOG.isDebugEnabled()) {
LOG.debug("get download filename with checksum: " + remoteFilePathWithChecksum);
}
// 1. get checksum from remote file name
Pair<String, String> pair = decodeFileNameWithChecksum(remoteFilePathWithChecksum);
if (pair == null) {
return new Status(ErrCode.COMMON_ERROR,
"file name should contains checksum: " + remoteFilePathWithChecksum);
}
if (!remoteFilePath.endsWith(pair.first)) {
return new Status(ErrCode.COMMON_ERROR, "File does not exist: " + remoteFilePath);
}
String md5sum = pair.second;
// 2. download
status = fileSystem.downloadWithFileSize(remoteFilePathWithChecksum, localFilePath,
remoteFiles.get(0).getSize());
if (!status.ok()) {
return status;
}
// 3. verify checksum
String localMd5sum = null;
try (FileInputStream fis = new FileInputStream(localFilePath)) {
localMd5sum = DigestUtils.md5Hex(fis);
} catch (FileNotFoundException e) {
return new Status(ErrCode.NOT_FOUND, "file " + localFilePath + " does not exist");
} catch (IOException e) {
return new Status(ErrCode.COMMON_ERROR, "failed to get md5sum of file: " + localFilePath);
}
if (!localMd5sum.equals(md5sum)) {
return new Status(ErrCode.BAD_FILE,
"md5sum does not equal. local: " + localMd5sum + ", remote: " + md5sum);
}
return Status.OK;
}
public Status getBrokerAddress(Long beId, Env env, List<FsBroker> brokerAddrs) {
// get backend
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
if (be == null) {
return new Status(ErrCode.COMMON_ERROR, "backend " + beId + " is missing. "
+ "failed to send upload snapshot task");
}
// only Broker storage backend need to get broker addr, other type return a fake one;
if (fileSystem.getStorageType() != StorageBackend.StorageType.BROKER) {
brokerAddrs.add(new FsBroker("127.0.0.1", 0));
return Status.OK;
}
// get proper broker for this backend
FsBroker brokerAddr = null;
try {
brokerAddr = env.getBrokerMgr().getBroker(fileSystem.getName(), be.getHost());
} catch (AnalysisException e) {
return new Status(ErrCode.COMMON_ERROR, "failed to get address of broker "
+ fileSystem.getName() + " when try to send upload snapshot task: "
+ e.getMessage());
}
if (brokerAddr == null) {
return new Status(ErrCode.COMMON_ERROR, "failed to get address of broker "
+ fileSystem.getName() + " when try to send upload snapshot task");
}
brokerAddrs.add(brokerAddr);
return Status.OK;
}
public List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(id));
info.add(name);
info.add(TimeUtils.longToTimeString(createTime));
info.add(String.valueOf(isReadOnly));
info.add(location);
info.add(fileSystem.getStorageType() != StorageBackend.StorageType.BROKER ? "-" : fileSystem.getName());
info.add(fileSystem.getStorageType().name());
info.add(errMsg == null ? FeConstants.null_string : errMsg);
return info;
}
public List<List<String>> getSnapshotInfos(String snapshotName, String timestamp)
throws AnalysisException {
List<List<String>> snapshotInfos = Lists.newArrayList();
if (Strings.isNullOrEmpty(snapshotName)) {
// get all snapshot infos
List<String> snapshotNames = Lists.newArrayList();
Status status = listSnapshots(snapshotNames);
if (!status.ok()) {
throw new AnalysisException(
"Failed to list snapshot in repo: " + name + ", err: " + status.getErrMsg());
}
for (String ssName : snapshotNames) {
List<String> info = getSnapshotInfo(ssName, null /* get all timestamp */);
snapshotInfos.add(info);
}
} else {
// get specified snapshot info
List<String> info = getSnapshotInfo(snapshotName, timestamp);
snapshotInfos.add(info);
}
return snapshotInfos;
}
public String getCreateStatement() {
StringBuilder stmtBuilder = new StringBuilder();
stmtBuilder.append("CREATE ");
if (this.isReadOnly) {
stmtBuilder.append("READ ONLY ");
}
stmtBuilder.append("REPOSITORY ");
stmtBuilder.append(this.name);
stmtBuilder.append(" \nWITH ");
StorageBackend.StorageType storageType = this.fileSystem.getStorageType();
if (storageType == StorageBackend.StorageType.S3) {
stmtBuilder.append(" S3 ");
} else if (storageType == StorageBackend.StorageType.HDFS) {
stmtBuilder.append(" HDFS ");
} else if (storageType == StorageBackend.StorageType.BROKER) {
stmtBuilder.append(" BROKER ");
stmtBuilder.append(this.fileSystem.getName());
} else {
// should never reach here
throw new UnsupportedOperationException(storageType.toString() + " backend is not implemented");
}
stmtBuilder.append(" \nON LOCATION \"");
stmtBuilder.append(this.location);
stmtBuilder.append("\"");
stmtBuilder.append("\nPROPERTIES\n(");
Map<String, String> properties = new HashMap();
properties.putAll(this.getRemoteFileSystem().getProperties());
stmtBuilder.append(new PrintableMap<>(properties, " = ", true, true, true));
stmtBuilder.append("\n)");
return stmtBuilder.toString();
}
private List<String> getSnapshotInfo(String snapshotName, String timestamp) {
List<String> info = Lists.newArrayList();
if (Strings.isNullOrEmpty(timestamp)) {
// get all timestamp
// path eg: /location/__palo_repository_repo_name/__ss_my_snap/__info_*
String infoFilePath = assembleJobInfoFilePath(snapshotName, -1);
if (LOG.isDebugEnabled()) {
LOG.debug("assemble infoFilePath: {}, snapshot: {}", infoFilePath, snapshotName);
}
List<RemoteFile> results = Lists.newArrayList();
Status st = fileSystem.globList(infoFilePath + "*", results);
if (!st.ok()) {
info.add(snapshotName);
info.add(FeConstants.null_string);
info.add("ERROR: Failed to get info: " + st.getErrMsg());
} else {
List<String> tmp = Lists.newArrayList();
for (RemoteFile file : results) {
// __info_2018-04-18-20-11-00.Jdwnd9312sfdn1294343
Pair<String, String> pureFileName = decodeFileNameWithChecksum(file.getName());
if (pureFileName == null) {
// maybe: __info_2018-04-18-20-11-00.part
tmp.add("Invalid: " + file.getName());
continue;
}
tmp.add(disjoinPrefix(PREFIX_JOB_INFO, pureFileName.first));
}
if (!tmp.isEmpty()) {
info.add(snapshotName);
info.add(Joiner.on("\n").join(tmp));
info.add("OK");
} else {
info.add(snapshotName);
info.add(FeConstants.null_string);
info.add("ERROR: No info file found");
}
}
} else {
// get specified timestamp, different repos might have snapshots with same timestamp.
String localFilePath = BackupHandler.BACKUP_ROOT_DIR + "/"
+ Repository.PREFIX_JOB_INFO + allocLocalFileSuffix();
try {
String remoteInfoFilePath = assembleJobInfoFilePath(snapshotName, -1) + timestamp;
Status st = download(remoteInfoFilePath, localFilePath);
if (!st.ok()) {
info.add(snapshotName);
info.add(timestamp);
info.add(FeConstants.null_string);
info.add(FeConstants.null_string);
info.add("Failed to get info: " + st.getErrMsg());
} else {
try {
BackupJobInfo jobInfo = BackupJobInfo.fromFile(localFilePath);
info.add(snapshotName);
info.add(timestamp);
info.add(jobInfo.dbName);
info.add(jobInfo.getBrief());
info.add("OK");
} catch (IOException e) {
info.add(snapshotName);
info.add(timestamp);
info.add(FeConstants.null_string);
info.add(FeConstants.null_string);
info.add("Failed to read info from local file: " + e.getMessage());
}
}
} finally {
// delete tmp local file
File localFile = new File(localFilePath);
if (localFile.exists()) {
localFile.delete();
}
}
}
return info;
}
// Allocate an unique suffix.
private String allocLocalFileSuffix() {
return System.currentTimeMillis() + UUID.randomUUID().toString().replace("-", "_");
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
@Deprecated
public void readFields(DataInput in) throws IOException {
id = in.readLong();
name = Text.readString(in);
isReadOnly = in.readBoolean();
location = Text.readString(in);
oldfs = org.apache.doris.fs.PersistentFileSystem.read(in);
try {
fileSystem = FileSystemFactory.get(oldfs.getStorageType(), oldfs.getProperties());
} catch (UserException e) {
// do we ignore this exception?
throw new IOException("Failed to create file system: " + e.getMessage());
}
createTime = in.readLong();
}
}