S3FileSystem.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.obj.S3ObjStorage;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class S3FileSystem extends ObjFileSystem {
private static final Logger LOG = LogManager.getLogger(S3FileSystem.class);
private HadoopAuthenticator authenticator = null;
public S3FileSystem(Map<String, String> properties) {
super(StorageBackend.StorageType.S3.name(), StorageBackend.StorageType.S3, new S3ObjStorage(properties));
initFsProperties();
}
@VisibleForTesting
public S3FileSystem(S3ObjStorage storage) {
super(StorageBackend.StorageType.S3.name(), StorageBackend.StorageType.S3, storage);
initFsProperties();
}
private void initFsProperties() {
this.properties.putAll(((S3ObjStorage) objStorage).getProperties());
}
@Override
protected FileSystem nativeFileSystem(String remotePath) throws UserException {
//todo Extracting a common method to achieve logic reuse
if (closed.get()) {
throw new UserException("FileSystem is closed.");
}
if (dfsFileSystem == null) {
synchronized (this) {
if (closed.get()) {
throw new UserException("FileSystem is closed.");
}
if (dfsFileSystem == null) {
Configuration conf = DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
// the entry value in properties may be null, and
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
// S3 does not support Kerberos authentication,
// so here we create a simple authentication
AuthenticationConfig authConfig = AuthenticationConfig.getSimpleAuthenticationConfig(conf);
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig);
try {
dfsFileSystem = authenticator.doAs(() -> {
try {
return FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
this.authenticator = authenticator;
RemoteFSPhantomManager.registerPhantomReference(this);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
}
}
}
return dfsFileSystem;
}
// broker file pattern glob is too complex, so we use hadoop directly
private Status globListImplV1(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
FileSystem s3AFileSystem = nativeFileSystem(remotePath);
Path pathPattern = new Path(remotePath);
FileStatus[] files = s3AFileSystem.globStatus(pathPattern);
if (files == null) {
return Status.OK;
}
for (FileStatus fileStatus : files) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime());
result.add(remoteFile);
}
if (LOG.isDebugEnabled()) {
LOG.debug("remotePath:{}, result:{}", remotePath, result);
}
} catch (FileNotFoundException e) {
LOG.info("file not found: " + e.getMessage());
return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
} catch (Exception e) {
if (e.getCause() instanceof AmazonS3Exception) {
// process minio error msg
AmazonS3Exception ea = (AmazonS3Exception) e.getCause();
Map<String, String> callbackHeaders = ea.getHttpHeaders();
if (callbackHeaders != null && !callbackHeaders.isEmpty()) {
String minioErrMsg = callbackHeaders.get("X-Minio-Error-Desc");
if (minioErrMsg != null) {
return new Status(Status.ErrCode.COMMON_ERROR, "Minio request error: " + minioErrMsg);
}
}
}
LOG.error("errors while get file status ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage());
}
return Status.OK;
}
private Status globListImplV2(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
return ((S3ObjStorage) objStorage).globList(remotePath, result, fileNameOnly);
}
@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
if (!Strings.isNullOrEmpty(properties.get(S3Properties.ROLE_ARN))
|| !Strings.isNullOrEmpty(properties.get(S3Properties.Env.ROLE_ARN))) {
// https://issues.apache.org/jira/browse/HADOOP-19201
// hadoop 3.3.6 we used now, not support aws assumed role with external id, so we
// write a globListImplV2 to support it
LOG.info("aws role arn mode, use globListImplV2");
return globListImplV2(remotePath, result, fileNameOnly);
}
return globListImplV1(remotePath, result, fileNameOnly);
}
@VisibleForTesting
public HadoopAuthenticator getAuthenticator() {
return authenticator;
}
}