SparkRepository.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.DorisFE;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.thrift.TBrokerFileStatus;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
/*
* SparkRepository represents the remote repository for spark archives uploaded by spark
* The organization in repository is:
*
* * __spark_repository__/
* * __archive_1_0_0/
* * __lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp.jar
* * __lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
* * __archive_2_2_0/
* * __lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp.jar
* * __lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip
* * __archive_3_2_0/
* * ...
*/
@Deprecated
public class SparkRepository {
private static final Logger LOG = LogManager.getLogger(SparkRepository.class);
public static final String REPOSITORY_DIR = "__spark_repository__";
public static final String PREFIX_ARCHIVE = "__archive_";
public static final String PREFIX_LIB = "__lib_";
public static final String SPARK_DPP_JAR = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies.jar";
public static final String SPARK_DPP = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies";
public static final String SPARK_2X = "spark-2x";
private static final String PATH_DELIMITER = "/";
private static final String FILE_NAME_SEPARATOR = "_";
private static final String DPP_RESOURCE_DIR = "/spark-dpp/";
private static final String SPARK_RESOURCE = "/jars/spark-2x.zip";
private String remoteRepositoryPath;
private BrokerDesc brokerDesc;
private String localDppPath;
private String localSpark2xPath;
// Version of the spark dpp program in this cluster
private String currentDppVersion;
// Archive that current dpp version pointed to
private SparkArchive currentArchive;
public SparkRepository(String remoteRepositoryPath, BrokerDesc brokerDesc) {
this.remoteRepositoryPath = remoteRepositoryPath;
this.brokerDesc = brokerDesc;
this.currentDppVersion = Config.spark_dpp_version;
this.currentArchive = new SparkArchive(getRemoteArchivePath(currentDppVersion), currentDppVersion);
this.localDppPath = DorisFE.DORIS_HOME_DIR + DPP_RESOURCE_DIR + SPARK_DPP_JAR;
if (!Strings.isNullOrEmpty(Config.spark_resource_path)) {
this.localSpark2xPath = Config.spark_resource_path;
} else {
this.localSpark2xPath = Config.spark_home_default_dir + SPARK_RESOURCE;
}
}
public void prepare() throws LoadException {
initRepository();
}
private void initRepository() throws LoadException {
LOG.info("start to init remote repository. local dpp: {}", this.localDppPath);
boolean needUpload = false;
boolean needReplace = false;
CHECK: {
if (Strings.isNullOrEmpty(remoteRepositoryPath) || brokerDesc == null) {
break CHECK;
}
if (!checkCurrentArchiveExists()) {
needUpload = true;
break CHECK;
}
// init current archive
String remoteArchivePath = getRemoteArchivePath(currentDppVersion);
List<SparkLibrary> libraries = Lists.newArrayList();
getLibraries(remoteArchivePath, libraries);
if (libraries.size() != 2) {
needUpload = true;
needReplace = true;
break CHECK;
}
currentArchive.libraries.addAll(libraries);
for (SparkLibrary library : currentArchive.libraries) {
String localMd5sum = null;
switch (library.libType) {
case DPP:
localMd5sum = getMd5String(localDppPath);
break;
case SPARK2X:
localMd5sum = getMd5String(localSpark2xPath);
break;
default:
Preconditions.checkState(false, "wrong library type: " + library.libType);
break;
}
if (!localMd5sum.equals(library.md5sum)) {
needUpload = true;
needReplace = true;
break;
}
}
}
if (needUpload) {
uploadArchive(needReplace);
}
LOG.info("init spark repository success, current dppVersion={}, archive path={}, libraries size={}",
currentDppVersion, currentArchive.remotePath, currentArchive.libraries.size());
}
private boolean checkCurrentArchiveExists() {
boolean result = false;
Preconditions.checkNotNull(remoteRepositoryPath);
String remotePath = getRemoteArchivePath(currentDppVersion);
try {
result = BrokerUtil.checkPathExist(remotePath, brokerDesc);
LOG.info("check archive exists in repository, {}", result);
} catch (UserException e) {
LOG.warn("Failed to check remote archive exist, path={}, version={}", remotePath, currentDppVersion);
}
return result;
}
private void uploadArchive(boolean isReplace) throws LoadException {
try {
String remoteArchivePath = getRemoteArchivePath(currentDppVersion);
if (isReplace) {
BrokerUtil.deletePathWithBroker(remoteArchivePath, brokerDesc);
currentArchive.libraries.clear();
}
String srcFilePath = null;
// upload dpp
{ // CHECKSTYLE IGNORE THIS LINE
// 1. upload dpp
srcFilePath = localDppPath;
String fileName = getFileName(PATH_DELIMITER, srcFilePath);
String origFilePath = remoteArchivePath + PATH_DELIMITER
+ assemblyFileName(PREFIX_LIB, "", fileName, "");
upload(srcFilePath, origFilePath);
// 2. rename dpp
String md5sum = getMd5String(srcFilePath);
long size = getFileSize(srcFilePath);
String destFilePath = remoteArchivePath + PATH_DELIMITER
+ assemblyFileName(PREFIX_LIB, md5sum, fileName, "");
rename(origFilePath, destFilePath);
currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.DPP, size));
} // CHECKSTYLE IGNORE THIS LINE
// upload spark2x
{ // CHECKSTYLE IGNORE THIS LINE
// 1. upload spark2x
srcFilePath = localSpark2xPath;
String origFilePath = remoteArchivePath + PATH_DELIMITER
+ assemblyFileName(PREFIX_LIB, "", SPARK_2X, ".zip");
upload(srcFilePath, origFilePath);
// 2. rename spark2x
String md5sum = getMd5String(srcFilePath);
long size = getFileSize(srcFilePath);
String destFilePath = remoteArchivePath + PATH_DELIMITER
+ assemblyFileName(PREFIX_LIB, md5sum, SPARK_2X, ".zip");
rename(origFilePath, destFilePath);
currentArchive.libraries.add(new SparkLibrary(
destFilePath, md5sum, SparkLibrary.LibType.SPARK2X, size));
} // CHECKSTYLE IGNORE THIS LINE
LOG.info("finished to upload archive to repository, currentDppVersion={}, path={}",
currentDppVersion, remoteArchivePath);
} catch (UserException e) {
throw new LoadException(e.getMessage());
}
}
private void getLibraries(String remoteArchivePath, List<SparkLibrary> libraries) throws LoadException {
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
try {
BrokerUtil.parseFile(remoteArchivePath + "/*", brokerDesc, fileStatuses);
} catch (UserException e) {
throw new LoadException(e.getMessage());
}
for (TBrokerFileStatus fileStatus : fileStatuses) {
String fileName = getFileName(PATH_DELIMITER, fileStatus.path);
if (!fileName.startsWith(PREFIX_LIB)) {
continue;
}
// fileName should like:
// __lib_md5sum_spark-dpp-1.0.0-jar-with-dependencies.jar
// __lib_md5sum_spark-2x.zip
String[] libArg = unwrap(PREFIX_LIB, fileName).split(FILE_NAME_SEPARATOR);
if (libArg.length != 2) {
continue;
}
String md5sum = libArg[0];
if (Strings.isNullOrEmpty(md5sum)) {
continue;
}
String type = libArg[1];
SparkLibrary.LibType libType = null;
if (type.equals(SPARK_DPP)) {
libType = SparkLibrary.LibType.DPP;
} else if (type.equals(SPARK_2X)) {
libType = SparkLibrary.LibType.SPARK2X;
} else {
throw new LoadException("Invalid library type: " + type);
}
SparkLibrary remoteFile = new SparkLibrary(fileStatus.path, md5sum, libType, fileStatus.size);
libraries.add(remoteFile);
LOG.info("get Libraries from remote archive, archive path={}, library={}, md5sum={}, size={}",
remoteArchivePath, remoteFile.remotePath, remoteFile.md5sum, remoteFile.size);
}
}
public String getMd5String(String filePath) throws LoadException {
File file = new File(filePath);
String md5sum;
try (FileInputStream fis = new FileInputStream(file)) {
md5sum = DigestUtils.md5Hex(fis);
Preconditions.checkNotNull(md5sum);
if (LOG.isDebugEnabled()) {
LOG.debug("get md5sum from file {}, md5sum={}", filePath, md5sum);
}
return md5sum;
} catch (FileNotFoundException e) {
throw new LoadException("file " + filePath + " does not exist");
} catch (IOException e) {
throw new LoadException("failed to get md5sum from file " + filePath);
}
}
public long getFileSize(String filePath) throws LoadException {
File file = new File(filePath);
long size = file.length();
if (size <= 0) {
throw new LoadException("failed to get size from file " + filePath);
}
return size;
}
private void upload(String srcFilePath, String destFilePath) throws LoadException {
try {
BrokerUtil.writeFile(srcFilePath, destFilePath, brokerDesc);
LOG.info("finished to upload file, localPath={}, remotePath={}", srcFilePath, destFilePath);
} catch (UserException e) {
throw new LoadException("failed to upload lib to repository, srcPath=" + srcFilePath
+ " destPath=" + destFilePath + " message=" + e.getMessage());
}
}
private void rename(String origFilePath, String destFilePath) throws LoadException {
try {
BrokerUtil.rename(origFilePath, destFilePath, brokerDesc);
LOG.info("finished to rename file, originPath={}, destPath={}", origFilePath, destFilePath);
} catch (UserException e) {
throw new LoadException("failed to rename file from " + origFilePath + " to " + destFilePath
+ ", message=" + e.getMessage());
}
}
public SparkArchive getCurrentArchive() {
return currentArchive;
}
private static String getFileName(String delimiter, String path) {
return path.substring(path.lastIndexOf(delimiter) + 1);
}
// input: __lib_md5sum_spark-dpp-1.0.0-jar-with-dependencies.jar
// output: md5sum_spark-dpp-1.0.0-jar-with-dependencies
private static String unwrap(String prefix, String fileName) {
int pos = fileName.lastIndexOf(".");
return fileName.substring(prefix.length(), pos > 0 ? pos : fileName.length());
}
private static String joinPrefix(String prefix, String fileName) {
return prefix + fileName;
}
// eg:
// __lib_979bfbcb9469888f1c5539e168d1925d_spark-2x.zip
public static String assemblyFileName(String prefix, String md5sum, String fileName, String suffix) {
return prefix + md5sum + FILE_NAME_SEPARATOR + fileName + suffix;
}
// eg:
// .../__spark_repository__/__archive_1_0_0
private String getRemoteArchivePath(String version) {
return Joiner.on(PATH_DELIMITER).join(remoteRepositoryPath, joinPrefix(PREFIX_ARCHIVE, version));
}
// Represents a remote directory contains the uploaded libraries
// an archive is named as __archive_{dppVersion}.
// e.g. __archive_1_0_0/
// \ __lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp.jar
// * \ __lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
public static class SparkArchive {
public String remotePath;
public String version;
public List<SparkLibrary> libraries;
public SparkArchive(String remotePath, String version) {
this.remotePath = remotePath;
this.version = version;
this.libraries = Lists.newArrayList();
}
public SparkLibrary getDppLibrary() {
Optional<SparkLibrary> library = libraries.stream()
.filter(lib -> lib.libType == SparkLibrary.LibType.DPP).findFirst();
return library.orElse(null);
}
public SparkLibrary getSpark2xLibrary() {
Optional<SparkLibrary> library = libraries.stream()
.filter(lib -> lib.libType == SparkLibrary.LibType.SPARK2X).findFirst();
return library.orElse(null);
}
}
// Represents a uploaded remote file that save in the archive
// a library refers to the dependency of DPP program or spark platform
// named as __lib_{md5sum}_spark_{type}.{jar/zip}.
// e.g. __lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp.jar
// __lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
public static class SparkLibrary {
public String remotePath;
public String md5sum;
public long size;
public LibType libType;
public enum LibType {
DPP, SPARK2X
}
public SparkLibrary(String remotePath, String md5sum, LibType libType, long size) {
this.remotePath = remotePath;
this.md5sum = md5sum;
this.libType = libType;
this.size = size;
}
}
}