HiveUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fsv2.remote.BrokerFileSystem;
import org.apache.doris.fsv2.remote.RemoteFileSystem;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Hive util for create or query hive table.
*/
public final class HiveUtil {
public static final String COMPRESSION_KEY = "compression";
public static final Set<String> SUPPORTED_ORC_COMPRESSIONS = ImmutableSet.of("plain", "zlib", "snappy", "zstd");
public static final Set<String> SUPPORTED_PARQUET_COMPRESSIONS = ImmutableSet.of("plain", "snappy", "zstd");
public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");
private HiveUtil() {
}
/**
* get input format class from inputFormatName.
*
* @param jobConf jobConf used when getInputFormatClass
* @param inputFormatName inputFormat class name
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @return a class of inputFormat.
* @throws UserException when class not found.
*/
public static InputFormat<?, ?> getInputFormat(JobConf jobConf,
String inputFormatName, boolean symlinkTarget) throws UserException {
try {
Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
// symlink targets are always TextInputFormat
inputFormatClass = TextInputFormat.class;
}
return ReflectionUtils.newInstance(inputFormatClass, jobConf);
} catch (ClassNotFoundException | RuntimeException e) {
throw new UserException("Unable to create input format " + inputFormatName, e);
}
}
@SuppressWarnings({"unchecked", "RedundantCast"})
private static Class<? extends InputFormat<?, ?>> getInputFormatClass(JobConf conf, String inputFormatName)
throws ClassNotFoundException {
// CDH uses different names for Parquet
if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName)
|| "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
return MapredParquetInputFormat.class;
}
Class<?> clazz = conf.getClassByName(inputFormatName);
return (Class<? extends InputFormat<?, ?>>) clazz.asSubclass(InputFormat.class);
}
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inputFormat,
String location) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
return ((BrokerFileSystem) remoteFileSystem).isSplittable(location, inputFormat);
}
// All supported hive input format are splittable
return HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat);
}
// "c1=a/c2=b/c3=c" ---> List(["c1","a"], ["c2","b"], ["c3","c"])
// Similar to the `toPartitionValues` method, except that it adds the partition column name.
public static List<String[]> toPartitionColNameAndValues(String partitionName) {
String[] parts = partitionName.split("/");
List<String[]> result = new ArrayList<>(parts.length);
for (String part : parts) {
String[] kv = part.split("=");
Preconditions.checkState(kv.length == 2, String.format("Malformed partition name %s", part));
result.add(new String[] {
FileUtils.unescapePathName(kv[0]),
FileUtils.unescapePathName(kv[1])
});
}
return result;
}
// "c1=a/c2=b/c3=c" ---> List("a","b","c")
public static List<String> toPartitionValues(String partitionName) {
ImmutableList.Builder<String> resultBuilder = ImmutableList.builder();
int start = 0;
while (true) {
while (start < partitionName.length() && partitionName.charAt(start) != '=') {
start++;
}
start++;
int end = start;
while (end < partitionName.length() && partitionName.charAt(end) != '/') {
end++;
}
if (start > partitionName.length()) {
break;
}
//Ref: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
//makePartName(List<String> partCols, List<String> vals,String defaultStr)
resultBuilder.add(FileUtils.unescapePathName(partitionName.substring(start, end)));
start = end + 1;
}
return resultBuilder.build();
}
// List("c1=a/c2=b/c3=c", "c1=a/c2=b/c3=d")
// |
// |
// v
// Map(
// key:"c1=a/c2=b/c3=c", value:Partition(values=List(a,b,c))
// key:"c1=a/c2=b/c3=d", value:Partition(values=List(a,b,d))
// )
public static Map<String, Partition> convertToNamePartitionMap(
List<String> partitionNames,
List<Partition> partitions) {
Map<String, List<String>> partitionNameToPartitionValues =
partitionNames
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));
Map<List<String>, Partition> partitionValuesToPartition =
partitions.stream()
.collect(Collectors.toMap(Partition::getValues, partition -> partition));
ImmutableMap.Builder<String, Partition> resultBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValues.entrySet()) {
Partition partition = partitionValuesToPartition.get(entry.getValue());
if (partition != null) {
resultBuilder.put(entry.getKey(), partition);
}
}
return resultBuilder.build();
}
public static Table toHiveTable(HiveTableMetadata hiveTable) {
Objects.requireNonNull(hiveTable.getDbName(), "Hive database name should be not null");
Objects.requireNonNull(hiveTable.getTableName(), "Hive table name should be not null");
Table table = new Table();
table.setDbName(hiveTable.getDbName());
table.setTableName(hiveTable.getTableName());
int createTime = (int) System.currentTimeMillis() * 1000;
table.setCreateTime(createTime);
table.setLastAccessTime(createTime);
// table.setRetention(0);
Set<String> partitionSet = new HashSet<>(hiveTable.getPartitionKeys());
Pair<List<FieldSchema>, List<FieldSchema>> hiveSchema = toHiveSchema(hiveTable.getColumns(), partitionSet);
table.setSd(toHiveStorageDesc(hiveSchema.first, hiveTable.getBucketCols(), hiveTable.getNumBuckets(),
hiveTable.getFileFormat(), hiveTable.getLocation()));
table.setPartitionKeys(hiveSchema.second);
// table.setViewOriginalText(hiveTable.getViewSql());
// table.setViewExpandedText(hiveTable.getViewSql());
table.setTableType("MANAGED_TABLE");
Map<String, String> props = new HashMap<>(hiveTable.getProperties());
props.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
setCompressType(hiveTable, props);
// set hive table comment by table properties
props.put("comment", hiveTable.getComment());
if (props.containsKey("owner")) {
table.setOwner(props.get("owner"));
}
HiveProperties.setTableProperties(table, props);
return table;
}
private static void setCompressType(HiveTableMetadata hiveTable, Map<String, String> props) {
String fileFormat = hiveTable.getFileFormat();
String compression = props.get(COMPRESSION_KEY);
// on HMS, default orc compression type is zlib and default parquet compression type is snappy.
if (fileFormat.equalsIgnoreCase("parquet")) {
if (StringUtils.isNotEmpty(compression) && !SUPPORTED_PARQUET_COMPRESSIONS.contains(compression)) {
throw new AnalysisException("Unsupported parquet compression type " + compression);
}
props.putIfAbsent("parquet.compression", StringUtils.isEmpty(compression) ? "snappy" : compression);
} else if (fileFormat.equalsIgnoreCase("orc")) {
if (StringUtils.isNotEmpty(compression) && !SUPPORTED_ORC_COMPRESSIONS.contains(compression)) {
throw new AnalysisException("Unsupported orc compression type " + compression);
}
props.putIfAbsent("orc.compress", StringUtils.isEmpty(compression) ? "zlib" : compression);
} else if (fileFormat.equalsIgnoreCase("text")) {
if (StringUtils.isNotEmpty(compression) && !SUPPORTED_TEXT_COMPRESSIONS.contains(compression)) {
throw new AnalysisException("Unsupported text compression type " + compression);
}
props.putIfAbsent("text.compression", StringUtils.isEmpty(compression)
? ConnectContext.get().getSessionVariable().hiveTextCompression() : compression);
} else {
throw new IllegalArgumentException("Compression is not supported on " + fileFormat);
}
// remove if exists
props.remove(COMPRESSION_KEY);
}
private static StorageDescriptor toHiveStorageDesc(List<FieldSchema> columns,
List<String> bucketCols, int numBuckets, String fileFormat, Optional<String> location) {
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(columns);
setFileFormat(fileFormat, sd);
location.ifPresent(sd::setLocation);
sd.setBucketCols(bucketCols);
sd.setNumBuckets(numBuckets);
Map<String, String> parameters = new HashMap<>();
parameters.put("tag", "doris external hive table");
sd.setParameters(parameters);
return sd;
}
private static void setFileFormat(String fileFormat, StorageDescriptor sd) {
String inputFormat;
String outputFormat;
String serDe;
if (fileFormat.equalsIgnoreCase("orc")) {
inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
serDe = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
} else if (fileFormat.equalsIgnoreCase("parquet")) {
inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
outputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
serDe = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
} else if (fileFormat.equalsIgnoreCase("text")) {
inputFormat = "org.apache.hadoop.mapred.TextInputFormat";
outputFormat = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
serDe = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
} else {
throw new IllegalArgumentException("Creating table with an unsupported file format: " + fileFormat);
}
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(serDe);
sd.setSerdeInfo(serDeInfo);
sd.setInputFormat(inputFormat);
sd.setOutputFormat(outputFormat);
}
private static Pair<List<FieldSchema>, List<FieldSchema>> toHiveSchema(List<Column> columns,
Set<String> partitionSet) {
List<FieldSchema> hiveCols = new ArrayList<>();
List<FieldSchema> hiveParts = new ArrayList<>();
for (Column column : columns) {
FieldSchema hiveFieldSchema = new FieldSchema();
// TODO: add doc, just support doris type
hiveFieldSchema.setType(HiveMetaStoreClientHelper.dorisTypeToHiveType(column.getType()));
hiveFieldSchema.setName(column.getName());
hiveFieldSchema.setComment(column.getComment());
if (partitionSet.contains(column.getName())) {
hiveParts.add(hiveFieldSchema);
} else {
hiveCols.add(hiveFieldSchema);
}
}
return Pair.of(hiveCols, hiveParts);
}
public static Database toHiveDatabase(HiveDatabaseMetadata hiveDb) {
Database database = new Database();
database.setName(hiveDb.getDbName());
if (StringUtils.isNotEmpty(hiveDb.getLocationUri())) {
database.setLocationUri(hiveDb.getLocationUri());
}
Map<String, String> props = hiveDb.getProperties();
database.setParameters(props);
database.setDescription(hiveDb.getComment());
if (props.containsKey("owner")) {
database.setOwnerName(props.get("owner"));
database.setOwnerType(PrincipalType.USER);
}
return database;
}
public static Map<String, String> updateStatisticsParameters(
Map<String, String> parameters,
CommonStatistics statistics) {
HashMap<String, String> result = new HashMap<>(parameters);
result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
result.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount()));
result.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalFileBytes()));
// CDH 5.16 metastore ignores stats unless STATS_GENERATED_VIA_STATS_TASK is set
// https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231
if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) {
result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for potential lack of HIVE-12730");
}
return result;
}
public static HivePartitionStatistics toHivePartitionStatistics(Map<String, String> params) {
long rowCount = Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1"));
long totalSize = Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1"));
long numFiles = Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1"));
return HivePartitionStatistics.fromCommonStatistics(rowCount, numFiles, totalSize);
}
public static Partition toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) {
Partition partition =
toMetastoreApiPartition(partitionWithStatistics.getPartition());
partition.setParameters(updateStatisticsParameters(
partition.getParameters(), partitionWithStatistics.getStatistics().getCommonStatistics()));
return partition;
}
public static Partition toMetastoreApiPartition(HivePartition hivePartition) {
Partition result = new Partition();
result.setDbName(hivePartition.getTableInfo().getDbName());
result.setTableName(hivePartition.getTableInfo().getTbName());
result.setValues(hivePartition.getPartitionValues());
result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
result.setParameters(hivePartition.getParameters());
return result;
}
public static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) {
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(partition.getTableInfo().getTbName());
serdeInfo.setSerializationLib(partition.getSerde());
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation(Strings.emptyToNull(partition.getPath()));
sd.setCols(partition.getColumns());
sd.setSerdeInfo(serdeInfo);
sd.setInputFormat(partition.getInputFormat());
sd.setOutputFormat(partition.getOutputFormat());
sd.setParameters(ImmutableMap.of());
return sd;
}
}