HdfsStorageVault.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.backup.Status;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* HDFS resource
* <p>
* Syntax:
* CREATE STORAGE VAULT "remote_hdfs"
* PROPERTIES
* (
* "type" = "hdfs",
* "fs.defaultFS" = "hdfs://10.220.147.151:8020",
* "path_prefix" = "/path/to/data",
* "hadoop.username" = "root"
* );
*/
public class HdfsStorageVault extends StorageVault {
private static final Logger LOG = LogManager.getLogger(HdfsStorageVault.class);
public static final String HADOOP_FS_PREFIX = "dfs.";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
public static final String HDFS_PREFIX = "hdfs:";
public static final String HDFS_FILE_PREFIX = "hdfs://";
public static class PropertyKey {
public static String HADOOP_FS_NAME = "fs.defaultFS";
public static String VAULT_PATH_PREFIX = "path_prefix";
public static String HADOOP_USER_NAME = AuthenticationConfig.HADOOP_USER_NAME;
public static String HADOOP_SECURITY_AUTHENTICATION =
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
public static String HADOOP_KERBEROS_KEYTAB = AuthenticationConfig.HADOOP_KERBEROS_KEYTAB;
public static String HADOOP_KERBEROS_PRINCIPAL = AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL;
}
public static final HashSet<String> FORBID_ALTER_PROPERTIES = new HashSet<>(Arrays.asList(
PropertyKey.VAULT_PATH_PREFIX,
PropertyKey.HADOOP_FS_NAME
));
/**
* Property keys used by Doris, and should not be put in HDFS client configs,
* such as `type`, `path_prefix`, etc.
*/
private static final Set<String> NON_HDFS_CONF_PROPERTY_KEYS =
ImmutableSet.of(StorageVault.PropertyKey.TYPE, PropertyKey.VAULT_PATH_PREFIX, S3Properties.VALIDITY_CHECK)
.stream().map(String::toLowerCase)
.collect(ImmutableSet.toImmutableSet());
@SerializedName(value = "properties")
private Map<String, String> properties;
public HdfsStorageVault(String name, boolean ifNotExists, boolean setAsDefault) {
super(name, StorageVault.StorageVaultType.HDFS, ifNotExists, setAsDefault);
properties = Maps.newHashMap();
}
@Override
public void modifyProperties(ImmutableMap<String, String> newProperties) throws DdlException {
for (Map.Entry<String, String> kv : newProperties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
}
checkConnectivity(this.properties);
}
@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
}
public static void checkConnectivity(Map<String, String> newProperties) throws DdlException {
if (newProperties.containsKey(S3Properties.VALIDITY_CHECK)
&& newProperties.get(S3Properties.VALIDITY_CHECK).equalsIgnoreCase("false")) {
return;
}
String hadoopFsName = null;
String pathPrefix = null;
for (Map.Entry<String, String> property : newProperties.entrySet()) {
if (property.getKey().equalsIgnoreCase(PropertyKey.HADOOP_FS_NAME)) {
hadoopFsName = property.getValue();
} else if (property.getKey().equalsIgnoreCase(PropertyKey.VAULT_PATH_PREFIX)) {
pathPrefix = property.getValue();
}
}
Preconditions.checkArgument(
!Strings.isNullOrEmpty(hadoopFsName), "%s is null or empty", PropertyKey.HADOOP_FS_NAME);
Preconditions.checkArgument(
!Strings.isNullOrEmpty(pathPrefix), "%s is null or empty", PropertyKey.VAULT_PATH_PREFIX);
try (DFSFileSystem dfsFileSystem = new DFSFileSystem(newProperties)) {
Long timestamp = System.currentTimeMillis();
String remotePath = hadoopFsName + "/" + pathPrefix + "/doris-check-connectivity" + timestamp.toString();
Status st = dfsFileSystem.makeDir(remotePath);
if (st != Status.OK) {
throw new DdlException(
"checkConnectivity(makeDir) failed, status: " + st + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}
st = dfsFileSystem.exists(remotePath);
if (st != Status.OK) {
throw new DdlException(
"checkConnectivity(exist) failed, status: " + st + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}
st = dfsFileSystem.delete(remotePath);
if (st != Status.OK) {
throw new DdlException(
"checkConnectivity(exist) failed, status: " + st + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}
} catch (IOException e) {
LOG.warn("checkConnectivity failed, properties:{}", new PrintableMap<>(
newProperties, "=", true, false, true, false), e);
throw new DdlException("checkConnectivity failed, properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false), e);
}
}
public static Cloud.HdfsVaultInfo generateHdfsParam(Map<String, String> properties) {
Cloud.HdfsVaultInfo.Builder hdfsVaultInfoBuilder =
Cloud.HdfsVaultInfo.newBuilder();
Cloud.HdfsBuildConf.Builder hdfsConfBuilder = Cloud.HdfsBuildConf.newBuilder();
Set<String> lowerCaseKeys = properties.keySet().stream().map(String::toLowerCase)
.collect(Collectors.toSet());
for (Map.Entry<String, String> property : properties.entrySet()) {
if (property.getKey().equalsIgnoreCase(PropertyKey.HADOOP_FS_NAME)) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(property.getValue()),
"%s is null or empty", property.getKey());
hdfsConfBuilder.setFsName(property.getValue());
} else if (property.getKey().equalsIgnoreCase(PropertyKey.VAULT_PATH_PREFIX)) {
hdfsVaultInfoBuilder.setPrefix(property.getValue());
} else if (property.getKey().equalsIgnoreCase(PropertyKey.HADOOP_USER_NAME)) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(property.getValue()),
"%s is null or empty", property.getKey());
hdfsConfBuilder.setUser(property.getValue());
} else if (property.getKey()
.equalsIgnoreCase(PropertyKey.HADOOP_SECURITY_AUTHENTICATION)) {
Preconditions.checkArgument(lowerCaseKeys.contains(PropertyKey.HADOOP_KERBEROS_PRINCIPAL),
"%s is required for kerberos", PropertyKey.HADOOP_KERBEROS_PRINCIPAL);
Preconditions.checkArgument(lowerCaseKeys.contains(PropertyKey.HADOOP_KERBEROS_KEYTAB),
"%s is required for kerberos", PropertyKey.HADOOP_KERBEROS_KEYTAB);
} else if (property.getKey().equalsIgnoreCase(PropertyKey.HADOOP_KERBEROS_PRINCIPAL)) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(property.getValue()),
"%s is null or empty", property.getKey());
hdfsConfBuilder.setHdfsKerberosPrincipal(property.getValue());
} else if (property.getKey().equalsIgnoreCase(PropertyKey.HADOOP_KERBEROS_KEYTAB)) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(property.getValue()),
"%s is null or empty", property.getKey());
hdfsConfBuilder.setHdfsKerberosKeytab(property.getValue());
} else if (property.getKey().equalsIgnoreCase(StorageVault.PropertyKey.VAULT_NAME)) {
continue;
} else {
// Get rid of copy and paste from create s3 vault stmt
Preconditions.checkArgument(
!property.getKey().toLowerCase().contains(S3StorageVault.PropertyKey.REGION),
"Invalid argument %s", property.getKey());
Preconditions.checkArgument(
!property.getKey().toLowerCase().contains(S3StorageVault.PropertyKey.ENDPOINT),
"Invalid argument %s", property.getKey());
Preconditions.checkArgument(
!property.getKey().toLowerCase().contains(S3StorageVault.PropertyKey.ROOT_PATH),
"Invalid argument %s", property.getKey());
Preconditions.checkArgument(
!property.getKey().toLowerCase().contains(S3StorageVault.PropertyKey.PROVIDER),
"Invalid argument %s", property.getKey());
Preconditions.checkArgument(
!property.getKey().toLowerCase().contains(S3StorageVault.PropertyKey.BUCKET),
"Invalid argument %s", property.getKey());
if (!NON_HDFS_CONF_PROPERTY_KEYS.contains(property.getKey().toLowerCase())) {
Cloud.HdfsBuildConf.HdfsConfKVPair.Builder conf = Cloud.HdfsBuildConf.HdfsConfKVPair.newBuilder();
conf.setKey(property.getKey());
conf.setValue(property.getValue());
hdfsConfBuilder.addHdfsConfs(conf.build());
}
}
}
return hdfsVaultInfoBuilder.setBuildConf(hdfsConfBuilder.build()).build();
}
}