DefaultRemote.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.storage;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* Default implementation of {@link RemoteBase} use {@link S3Client}.
* If one object storage such as OSS can not use {@link S3Client} to access, please override these methods.
*/
public class DefaultRemote extends RemoteBase {
private static final Logger LOG = LogManager.getLogger(DefaultRemote.class);
private S3Client s3Client;
private static int MULTI_PART_UPLOAD_MAX_PART_NUM = 10000;
private static ThreadPoolExecutor POOL = null;
public DefaultRemote(ObjectInfo obj) {
super(obj);
}
@Override
public ListObjectsResult listObjects(String continuationToken) throws DdlException {
return listObjectsInner(normalizePrefix(), continuationToken);
}
@Override
public ListObjectsResult listObjects(String subPrefix, String continuationToken) throws DdlException {
return listObjectsInner(normalizePrefix(subPrefix), continuationToken);
}
@Override
public ListObjectsResult headObject(String subKey) throws DdlException {
initClient();
try {
String key = normalizePrefix(subKey);
HeadObjectRequest request = HeadObjectRequest.builder().bucket(obj.getBucket()).key(key)
.build();
HeadObjectResponse response = s3Client.headObject(request);
ObjectFile objectFile = new ObjectFile(key, getRelativePath(key), response.eTag(),
response.contentLength());
return new ListObjectsResult(Lists.newArrayList(objectFile), false, null);
} catch (NoSuchKeyException e) {
LOG.warn("NoSuchKey when head object for S3, subKey={}", subKey);
return new ListObjectsResult(Lists.newArrayList(), false, null);
} catch (SdkException e) {
LOG.warn("Failed to head object for S3, subKey={}", subKey, e);
throw new DdlException(
"Failed to head object for S3, subKey=" + subKey + " Error message=" + e.getMessage());
}
}
@Override
public Triple<String, String, String> getStsToken() throws DdlException {
throw new DdlException("Get sts token is unsupported");
}
private ListObjectsResult listObjectsInner(String prefix, String continuationToken) throws DdlException {
initClient();
try {
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(obj.getBucket())
.prefix(prefix);
if (!StringUtils.isEmpty(continuationToken)) {
requestBuilder.continuationToken(continuationToken);
}
ListObjectsV2Response response = s3Client.listObjectsV2(requestBuilder.build());
List<ObjectFile> objectFiles = new ArrayList<>();
for (S3Object c : response.contents()) {
objectFiles.add(new ObjectFile(c.key(), getRelativePath(c.key()), c.eTag(), c.size()));
}
return new ListObjectsResult(objectFiles, response.isTruncated(), response.nextContinuationToken());
} catch (SdkException e) {
LOG.warn("Failed to list objects for S3, prefix {}", prefix, e);
throw new DdlException("Failed to list objects for S3, Error message=" + e.getMessage());
}
}
private void initClient() {
if (s3Client == null) {
/*
* https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#131-client-http-configuration
* https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#133-client-override-configuration
* There are several timeout configuration, please config if needed.
*/
AwsCredentials credentials;
if (obj.getToken() != null) {
credentials = AwsSessionCredentials.create(obj.getAk(), obj.getSk(), obj.getToken());
} else {
credentials = AwsBasicCredentials.create(obj.getAk(), obj.getSk());
}
StaticCredentialsProvider scp = StaticCredentialsProvider.create(credentials);
URI endpointUri = URI.create("http://" + obj.getEndpoint());
s3Client = S3Client.builder().endpointOverride(endpointUri).credentialsProvider(scp)
.region(Region.of(obj.getRegion())).build();
}
}
@Override
public void close() {
if (s3Client != null) {
s3Client.close();
s3Client = null;
}
}
@Override
public void deleteObjects(List<String> keys) throws DdlException {
checkDeleteKeys(keys);
initClient();
try {
int maxDelete = 1000;
for (int i = 0; i < keys.size() / maxDelete + 1; i++) {
ArrayList<ObjectIdentifier> toDelete = new ArrayList<>();
for (int j = maxDelete * i; j < keys.size() && toDelete.size() < maxDelete; j++) {
toDelete.add(ObjectIdentifier.builder().key(keys.get(j)).build());
}
DeleteObjectsRequest.Builder requestBuilder = DeleteObjectsRequest.builder().bucket(obj.getBucket())
.delete(Delete.builder().objects(toDelete).build());
LOG.info("Delete objects for bucket={}, keys={}", obj.getBucket(), keys);
DeleteObjectsResponse response = s3Client.deleteObjects(requestBuilder.build());
if (!response.errors().isEmpty()) {
S3Error error = response.errors().get(0);
throw new DdlException(
"Failed delete objects, bucket=" + obj.getBucket() + ", key=" + error.key() + ", error="
+ error.message() + ", code=" + error.code());
}
}
} catch (SdkException e) {
LOG.warn("Failed to delete objects for S3", e);
throw new DdlException("Failed to delete objects for S3, Error message=" + e.getMessage());
}
}
@Override
public void putObject(File file, String key) throws DdlException {
initClient();
try {
PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder().bucket(obj.getBucket())
.key(key);
s3Client.putObject(requestBuilder.build(), RequestBody.fromFile(file));
LOG.info("Put object for bucket={}, key={}", obj.getBucket(), key);
} catch (SdkException e) {
LOG.warn("Failed to put object for S3", e);
throw new DdlException("Failed to put object for S3, Error message=" + e.getMessage());
}
}
@Override
public void multipartUploadObject(File file, String key, Function<String, Pair<Boolean, String>> function)
throws DdlException {
long fileSize = file.length();
if (fileSize <= Config.multi_part_upload_part_size_in_bytes) {
putObject(file, key);
return;
}
initClient();
initPool();
// create multipart upload
CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
.bucket(obj.getBucket()).key(key).build();
CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(
createMultipartUploadRequest);
String uploadId = multipartUpload.uploadId();
// calculate part size
long partSize = Config.multi_part_upload_part_size_in_bytes;
if (partSize * MULTI_PART_UPLOAD_MAX_PART_NUM < fileSize) {
partSize = (fileSize + MULTI_PART_UPLOAD_MAX_PART_NUM - 1) / MULTI_PART_UPLOAD_MAX_PART_NUM;
}
int totalPartNum = (int) (fileSize / partSize) + (fileSize % partSize == 0 ? 0 : 1);
LOG.info("multipart upload file: {}, size: {}, part size: {}, total part num: {}, upload id: {}",
file.getAbsolutePath(), fileSize, partSize, totalPartNum, uploadId);
try {
if (function != null) {
Pair<Boolean, String> result = function.apply(uploadId);
if (!result.first) {
LOG.warn("Failed to multipart upload object, file: {}, key: {}, upload id: {}, reason: {}",
file.getAbsolutePath(), key, uploadId, result.second == null ? "" : result.second);
throw new DdlException("Failed to multi part upload object, reason: "
+ (result.second == null ? "" : result.second));
}
}
long start = System.currentTimeMillis();
try (FileInputStream inputStream = FileUtils.openInputStream(file)) {
List<CompletedPart> parts = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(totalPartNum);
int partNum = 1;
long totalUploaded = 0;
AtomicBoolean failed = new AtomicBoolean(false);
while (totalUploaded < fileSize && !failed.get()) {
long nextPartSize = Math.min(partSize, fileSize - totalUploaded);
int partNumConst = partNum;
POOL.submit(() -> {
if (failed.get()) {
return;
}
LOG.debug("start multipart upload part id: {} for file: {}, key, {}, upload id: {}",
partNumConst, file.getAbsolutePath(), key, uploadId);
UploadPartRequest uploadPartRequest = UploadPartRequest.builder().bucket(obj.getBucket())
.key(key).uploadId(uploadId).partNumber(partNumConst).build();
try {
UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadPartRequest,
RequestBody.fromInputStream(inputStream, nextPartSize));
synchronized (parts) {
parts.add(
CompletedPart.builder().partNumber(partNumConst).eTag(uploadPartResponse.eTag())
.build());
}
LOG.debug("finish multipart upload part id: {} for file: {}, key, {}, upload id: {}",
partNumConst, file.getAbsolutePath(), key, uploadId);
} catch (Exception e) {
LOG.warn("Failed multipart upload part id: {} for file: {}, key, {}, upload id: {}",
partNumConst, file.getAbsolutePath(), key, uploadId, e);
failed.set(true);
} finally {
latch.countDown();
}
});
totalUploaded += nextPartSize;
partNum++;
}
while ((System.currentTimeMillis() - start) / 1000 < Config.multi_part_upload_max_seconds) {
if (latch.await(10, TimeUnit.SECONDS) || failed.get()) {
break;
}
}
if (failed.get() || parts.size() < totalPartNum) {
throw new DdlException("Failed to multipart upload object for file: " + file.getAbsolutePath()
+ ", key=" + key + ", upload id: " + uploadId + ", finished part num: " + parts.size()
+ ", total part num: " + totalPartNum);
}
// complete the multipart upload
parts.sort(Comparator.comparingInt(CompletedPart::partNumber));
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
.bucket(obj.getBucket()).key(key).uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()).build();
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3Client.completeMultipartUpload(
completeMultipartUploadRequest);
LOG.info("Finish multipart upload file: {}, size: {}, key: {}, upload id: {}, etag: {}, cost {} ms",
file.getAbsolutePath(), fileSize, key, uploadId, completeMultipartUploadResponse.eTag(),
System.currentTimeMillis() - start);
}
} catch (Exception e) {
LOG.warn("Failed to multipart upload object for file: {}, size: {}, key: {}, upload id: {}",
file.getAbsolutePath(), fileSize, key, uploadId, e);
s3Client.abortMultipartUpload(
AbortMultipartUploadRequest.builder().uploadId(uploadId).bucket(obj.getBucket()).key(key)
.build());
throw new DdlException("Failed to multipart upload object, Error message=" + e.getMessage());
}
}
@Override
public void getObject(String key, String file) throws DdlException {
initClient();
try {
GetObjectRequest.Builder getObjectRequest = GetObjectRequest.builder()
.bucket(obj.getBucket()).key(key);
s3Client.getObject(getObjectRequest.build(), Paths.get(file));
LOG.info("Get object for bucket={}, key={}, file={}", obj.getBucket(), key, file);
} catch (SdkException e) {
LOG.warn("Failed to get object for S3", e);
throw new DdlException("Failed to get object for S3, Error message=" + e.getMessage());
}
}
private void initPool() {
if (POOL == null) {
synchronized (DefaultRemote.class) {
if (POOL == null) {
POOL = ThreadPoolManager.newDaemonThreadPool(Config.multi_part_upload_pool_size,
Config.multi_part_upload_pool_size, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(MULTI_PART_UPLOAD_MAX_PART_NUM),
new ThreadPoolExecutor.DiscardPolicy(), "multi-part-upload", false);
POOL.allowCoreThreadTimeOut(true);
}
}
}
}
}