IcebergUtils.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.Or;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.Unbound;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructProjection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.time.DateTimeException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.Month;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Iceberg utils
*/
public class IcebergUtils {
private static final Logger LOG = LogManager.getLogger(IcebergUtils.class);
private static ThreadLocal<Integer> columnIdThreadLocal = new ThreadLocal<Integer>() {
@Override
public Integer initialValue() {
return 0;
}
};
// https://iceberg.apache.org/spec/#schemas-and-data-types
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
private static final String PARQUET_NAME = "parquet";
private static final String ORC_NAME = "orc";
public static final String TOTAL_RECORDS = "total-records";
public static final String TOTAL_POSITION_DELETES = "total-position-deletes";
public static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes";
// nickname in flink and spark
public static final String WRITE_FORMAT = "write-format";
public static final String COMPRESSION_CODEC = "compression-codec";
// nickname in spark
public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec";
public static final long UNKNOWN_SNAPSHOT_ID = -1; // means an empty table
public static final long NEWEST_SCHEMA_ID = -1;
public static final String YEAR = "year";
public static final String MONTH = "month";
public static final String DAY = "day";
public static final String HOUR = "hour";
public static final String IDENTITY = "identity";
public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
return null;
}
Expression expression = null;
// BoolLiteral
if (expr instanceof BoolLiteral) {
BoolLiteral boolLiteral = (BoolLiteral) expr;
boolean value = boolLiteral.getValue();
if (value) {
expression = Expressions.alwaysTrue();
} else {
expression = Expressions.alwaysFalse();
}
} else if (expr instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
switch (compoundPredicate.getOp()) {
case AND: {
Expression left = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
if (left != null && right != null) {
expression = Expressions.and(left, right);
} else if (left != null) {
return left;
} else if (right != null) {
return right;
}
break;
}
case OR: {
Expression left = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
if (left != null && right != null) {
expression = Expressions.or(left, right);
}
break;
}
case NOT: {
Expression child = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
if (child != null) {
expression = Expressions.not(child);
}
break;
}
default:
return null;
}
} else if (expr instanceof BinaryPredicate) {
TExprOpcode opCode = expr.getOpcode();
switch (opCode) {
case EQ:
case NE:
case GE:
case GT:
case LE:
case LT:
case EQ_FOR_NULL:
BinaryPredicate eq = (BinaryPredicate) expr;
SlotRef slotRef = convertDorisExprToSlotRef(eq.getChild(0));
LiteralExpr literalExpr = null;
if (slotRef == null && eq.getChild(0).isLiteral()) {
literalExpr = (LiteralExpr) eq.getChild(0);
slotRef = convertDorisExprToSlotRef(eq.getChild(1));
} else if (eq.getChild(1).isLiteral()) {
literalExpr = (LiteralExpr) eq.getChild(1);
}
if (slotRef == null || literalExpr == null) {
return null;
}
String colName = slotRef.getColumnName();
Types.NestedField nestedField = schema.caseInsensitiveFindField(colName);
colName = nestedField.name();
Object value = extractDorisLiteral(nestedField.type(), literalExpr);
if (value == null) {
if (opCode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof NullLiteral) {
expression = Expressions.isNull(colName);
} else {
return null;
}
} else {
switch (opCode) {
case EQ:
case EQ_FOR_NULL:
expression = Expressions.equal(colName, value);
break;
case NE:
expression = Expressions.not(Expressions.equal(colName, value));
break;
case GE:
expression = Expressions.greaterThanOrEqual(colName, value);
break;
case GT:
expression = Expressions.greaterThan(colName, value);
break;
case LE:
expression = Expressions.lessThanOrEqual(colName, value);
break;
case LT:
expression = Expressions.lessThan(colName, value);
break;
default:
return null;
}
}
break;
default:
return null;
}
} else if (expr instanceof InPredicate) {
// InPredicate, only support a in (1,2,3)
InPredicate inExpr = (InPredicate) expr;
if (inExpr.contains(Subquery.class)) {
return null;
}
SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0));
if (slotRef == null) {
return null;
}
String colName = slotRef.getColumnName();
Types.NestedField nestedField = schema.caseInsensitiveFindField(colName);
colName = nestedField.name();
List<Object> valueList = new ArrayList<>();
for (int i = 1; i < inExpr.getChildren().size(); ++i) {
if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
return null;
}
LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
Object value = extractDorisLiteral(nestedField.type(), literalExpr);
if (value == null) {
return null;
}
valueList.add(value);
}
if (inExpr.isNotIn()) {
// not in
expression = Expressions.notIn(colName, valueList);
} else {
// in
expression = Expressions.in(colName, valueList);
}
}
return checkConversion(expression, schema);
}
private static Expression checkConversion(Expression expression, Schema schema) {
if (expression == null) {
return null;
}
switch (expression.op()) {
case AND: {
And andExpr = (And) expression;
Expression left = checkConversion(andExpr.left(), schema);
Expression right = checkConversion(andExpr.right(), schema);
if (left != null && right != null) {
return andExpr;
} else if (left != null) {
return left;
} else if (right != null) {
return right;
} else {
return null;
}
}
case OR: {
Or orExpr = (Or) expression;
Expression left = checkConversion(orExpr.left(), schema);
Expression right = checkConversion(orExpr.right(), schema);
if (left == null || right == null) {
return null;
} else {
return orExpr;
}
}
case NOT: {
Not notExpr = (Not) expression;
Expression child = checkConversion(notExpr.child(), schema);
if (child == null) {
return null;
} else {
return notExpr;
}
}
case TRUE:
case FALSE:
return expression;
default:
if (!(expression instanceof Unbound)) {
return null;
}
try {
((Unbound<?, ?>) expression).bind(schema.asStruct(), true);
return expression;
} catch (Exception e) {
LOG.debug("Failed to check expression: {}", e.getMessage());
return null;
}
}
}
public static Object extractDorisLiteral(org.apache.iceberg.types.Type icebergType, Expr expr) {
TypeID icebergTypeID = icebergType.typeId();
if (expr instanceof BoolLiteral) {
BoolLiteral boolLiteral = (BoolLiteral) expr;
switch (icebergTypeID) {
case BOOLEAN:
return boolLiteral.getValue();
case STRING:
return boolLiteral.getStringValue();
default:
return null;
}
} else if (expr instanceof DateLiteral) {
DateLiteral dateLiteral = (DateLiteral) expr;
switch (icebergTypeID) {
case STRING:
case DATE:
return dateLiteral.getStringValue();
case TIMESTAMP:
if (((Types.TimestampType) icebergType).shouldAdjustToUTC()) {
return dateLiteral.getUnixTimestampWithMicroseconds(TimeUtils.getTimeZone());
} else {
return dateLiteral.getUnixTimestampWithMicroseconds(TimeUtils.getUTCTimeZone());
}
default:
return null;
}
} else if (expr instanceof DecimalLiteral) {
DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
switch (icebergTypeID) {
case DECIMAL:
return decimalLiteral.getValue();
case STRING:
return decimalLiteral.getStringValue();
case DOUBLE:
return decimalLiteral.getDoubleValue();
default:
return null;
}
} else if (expr instanceof FloatLiteral) {
FloatLiteral floatLiteral = (FloatLiteral) expr;
if (floatLiteral.getType() == Type.FLOAT) {
switch (icebergTypeID) {
case FLOAT:
case DOUBLE:
case DECIMAL:
return floatLiteral.getValue();
default:
return null;
}
} else {
switch (icebergTypeID) {
case DOUBLE:
case DECIMAL:
return floatLiteral.getValue();
default:
return null;
}
}
} else if (expr instanceof IntLiteral) {
IntLiteral intLiteral = (IntLiteral) expr;
Type type = intLiteral.getType();
if (type.isInteger32Type()) {
switch (icebergTypeID) {
case INTEGER:
case LONG:
case FLOAT:
case DOUBLE:
case DATE:
case DECIMAL:
return (int) intLiteral.getValue();
default:
return null;
}
} else {
// only PrimitiveType.BIGINT
switch (icebergTypeID) {
case INTEGER:
case LONG:
case FLOAT:
case DOUBLE:
case TIME:
case TIMESTAMP:
case DATE:
case DECIMAL:
return intLiteral.getValue();
default:
return null;
}
}
} else if (expr instanceof StringLiteral) {
String value = expr.getStringValue();
switch (icebergTypeID) {
case DATE:
case TIME:
case TIMESTAMP:
case STRING:
case UUID:
case DECIMAL:
return value;
case INTEGER:
try {
return Integer.parseInt(value);
} catch (Exception e) {
return null;
}
case LONG:
try {
return Long.parseLong(value);
} catch (Exception e) {
return null;
}
default:
return null;
}
}
return null;
}
private static SlotRef convertDorisExprToSlotRef(Expr expr) {
SlotRef slotRef = null;
if (expr instanceof SlotRef) {
slotRef = (SlotRef) expr;
} else if (expr instanceof CastExpr) {
if (expr.getChild(0) instanceof SlotRef) {
slotRef = (SlotRef) expr.getChild(0);
}
}
return slotRef;
}
public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDesc, Schema schema)
throws UserException {
if (partitionDesc == null) {
return PartitionSpec.unpartitioned();
}
ArrayList<Expr> partitionExprs = partitionDesc.getPartitionExprs();
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
for (Expr expr : partitionExprs) {
if (expr instanceof SlotRef) {
builder.identity(((SlotRef) expr).getColumnName());
} else if (expr instanceof FunctionCallExpr) {
String exprName = expr.getExprName();
List<Expr> params = ((FunctionCallExpr) expr).getParams().exprs();
switch (exprName.toLowerCase()) {
case "bucket":
builder.bucket(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
break;
case "year":
case "years":
builder.year(params.get(0).getExprName());
break;
case "month":
case "months":
builder.month(params.get(0).getExprName());
break;
case "date":
case "day":
case "days":
builder.day(params.get(0).getExprName());
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(params.get(0).getExprName());
break;
case "truncate":
builder.truncate(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
break;
default:
throw new UserException("unsupported partition for " + exprName);
}
}
}
return builder.build();
}
private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
case BOOLEAN:
return Type.BOOLEAN;
case INTEGER:
return Type.INT;
case LONG:
return Type.BIGINT;
case FLOAT:
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
case STRING:
case BINARY:
case UUID:
return Type.STRING;
case FIXED:
Types.FixedType fixed = (Types.FixedType) primitive;
return ScalarType.createCharType(fixed.length());
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;
return ScalarType.createDecimalV3Type(decimal.precision(), decimal.scale());
case DATE:
return ScalarType.createDateV2Type();
case TIMESTAMP:
return ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
case TIME:
return Type.UNSUPPORTED;
default:
throw new IllegalArgumentException("Cannot transform unknown type: " + primitive);
}
}
public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
if (type.isPrimitiveType()) {
return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type);
}
switch (type.typeId()) {
case LIST:
Types.ListType list = (Types.ListType) type;
return ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
case MAP:
Types.MapType map = (Types.MapType) type;
return new MapType(
icebergTypeToDorisType(map.keyType()),
icebergTypeToDorisType(map.valueType())
);
case STRUCT:
Types.StructType struct = (Types.StructType) type;
ArrayList<StructField> nestedTypes = struct.fields().stream().map(
x -> new StructField(x.name(), icebergTypeToDorisType(x.type()))
).collect(Collectors.toCollection(ArrayList::new));
return new StructType(nestedTypes);
default:
throw new IllegalArgumentException("Cannot transform unknown type: " + type);
}
}
public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog catalog, String dbName, String tblName) {
return getIcebergTableInternal(catalog, dbName, tblName, false);
}
public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
return getIcebergTableInternal(catalog, tableInfo.getDbName(), tableInfo.getTbName(), true);
}
public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(catalog, tableInfo.getDbName(), tableInfo.getTbName());
}
private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog catalog, String dbName,
String tblName,
boolean isClone) {
IcebergMetadataCache metadataCache = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache();
return isClone ? metadataCache.getAndCloneTable(catalog, dbName, tblName)
: metadataCache.getIcebergTable(catalog, dbName, tblName);
}
/**
* Get iceberg schema from catalog and convert them to doris schema
*/
public static List<Column> getSchema(ExternalCatalog catalog, String dbName, String name, long schemaId) {
try {
return catalog.getPreExecutionAuthenticator().execute(() -> {
org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name);
Schema schema;
if (schemaId == NEWEST_SCHEMA_ID || icebergTable.currentSnapshot() == null) {
schema = icebergTable.schema();
} else {
schema = icebergTable.schemas().get((int) schemaId);
}
Preconditions.checkNotNull(schema,
"Schema for table " + catalog.getName() + "." + dbName + "." + name + " is null");
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}
return tmpSchema;
});
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
/**
* Estimate iceberg table row count.
* Get the row count by adding all task file recordCount.
*
* @return estimated row count
*/
public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, String tbName) {
// the table may be null when the iceberg metadata cache is not loaded.But I don't think it's a problem,
// because the NPE would be caught in the caller and return the default value -1.
// Meanwhile, it will trigger iceberg metadata cache to load the table, so we can get it next time.
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(catalog, dbName, tbName);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
LOG.info("Iceberg table {}.{}.{} is empty, return -1.", catalog.getName(), dbName, tbName);
// empty table
return TableIf.UNKNOWN_ROW_COUNT;
}
Map<String, String> summary = snapshot.summary();
long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
LOG.info("Iceberg table {}.{}.{} row count in summary is {}", catalog.getName(), dbName, tbName, rows);
return rows;
}
public static FileFormat getFileFormat(Table icebergTable) {
Map<String, String> properties = icebergTable.properties();
String fileFormatName;
if (properties.containsKey(WRITE_FORMAT)) {
fileFormatName = properties.get(WRITE_FORMAT);
} else {
fileFormatName = properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME);
}
FileFormat fileFormat;
if (fileFormatName.toLowerCase().contains(ORC_NAME)) {
fileFormat = FileFormat.ORC;
} else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) {
fileFormat = FileFormat.PARQUET;
} else {
throw new RuntimeException("Unsupported input format type: " + fileFormatName);
}
return fileFormat;
}
public static String getFileCompress(Table table) {
Map<String, String> properties = table.properties();
if (properties.containsKey(COMPRESSION_CODEC)) {
return properties.get(COMPRESSION_CODEC);
} else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
return properties.get(SPARK_SQL_COMPRESSION_CODEC);
}
FileFormat fileFormat = getFileFormat(table);
if (fileFormat == FileFormat.PARQUET) {
return properties.getOrDefault(
TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
} else if (fileFormat == FileFormat.ORC) {
return properties.getOrDefault(
TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT);
}
throw new NotSupportedException("Unsupported file format: " + fileFormat);
}
public static String dataLocation(Table table) {
Map<String, String> properties = table.properties();
if (properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
throw new NotSupportedException(
"Table " + table.name() + " specifies " + properties
.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
+ " as a location provider. "
+ "Writing to Iceberg tables with custom location provider is not supported.");
}
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
if (dataLocation == null) {
dataLocation = String.format("%s/data", LocationUtil.stripTrailingSlash(table.location()));
}
}
return dataLocation;
}
public static HiveCatalog createIcebergHiveCatalog(ExternalCatalog externalCatalog, String name) {
HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
hiveCatalog.setConf(externalCatalog.getConfiguration());
Map<String, String> catalogProperties = externalCatalog.getProperties();
if (!catalogProperties.containsKey(HiveCatalog.LIST_ALL_TABLES)) {
// This configuration will display all tables (including non-Iceberg type tables),
// which can save the time of obtaining table objects.
// Later, type checks will be performed when loading the table.
catalogProperties.put(HiveCatalog.LIST_ALL_TABLES, "true");
}
String metastoreUris = catalogProperties.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
catalogProperties.put(CatalogProperties.URI, metastoreUris);
hiveCatalog.initialize(name, catalogProperties);
return hiveCatalog;
}
// Retrieve the manifest files that match the query based on partitions in filter
public static CloseableIterable<ManifestFile> getMatchingManifest(
List<ManifestFile> dataManifests,
Map<Integer, PartitionSpec> specsById,
Expression dataFilter) {
LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder()
.build(
specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(
Expressions.alwaysTrue(),
Projections.inclusive(spec, true).project(dataFilter)),
spec,
true);
});
CloseableIterable<ManifestFile> matchingManifests = CloseableIterable.filter(
CloseableIterable.withNoopClose(dataManifests),
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
matchingManifests =
CloseableIterable.filter(
matchingManifests,
manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());
return matchingManifests;
}
// get snapshot id from query like 'for version/time as of'
public static long getQuerySpecSnapshot(Table table, TableSnapshot queryTableSnapshot) {
TableSnapshot.VersionType type = queryTableSnapshot.getType();
if (type == TableSnapshot.VersionType.VERSION) {
return queryTableSnapshot.getVersion();
} else {
long timestamp = TimeUtils.timeStringToLong(queryTableSnapshot.getTime(), TimeUtils.getTimeZone());
if (timestamp < 0) {
throw new DateTimeException("can't parse time: " + queryTableSnapshot.getTime());
}
return SnapshotUtil.snapshotIdAsOfTime(table, timestamp);
}
}
// read schema from external schema cache
public static IcebergSchemaCacheValue getSchemaCacheValue(
ExternalCatalog catalog, String dbName, String name, long schemaId) {
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
new IcebergSchemaCacheKey(dbName, name, schemaId));
if (!schemaCacheValue.isPresent()) {
throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
null, catalog.getName(), dbName, name, schemaId);
}
return (IcebergSchemaCacheValue) schemaCacheValue.get();
}
public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog catalog, String dbName, String tbName) {
Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
Snapshot snapshot = table.currentSnapshot();
long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : snapshot.snapshotId();
return new IcebergSnapshot(snapshotId, table.schema().schemaId());
}
public static IcebergPartitionInfo loadPartitionInfo(
ExternalCatalog catalog, String dbName, String tbName, long snapshotId) throws AnalysisException {
// snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet.
if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
return IcebergPartitionInfo.empty();
}
Table table = getIcebergTable(catalog, dbName, tbName);
List<IcebergPartition> icebergPartitions = loadIcebergPartition(table, snapshotId);
Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
List<Column> partitionColumns = IcebergUtils.getSchemaCacheValue(
catalog, dbName, tbName, table.snapshot(snapshotId).schemaId()).getPartitionColumns();
for (IcebergPartition partition : icebergPartitions) {
nameToPartition.put(partition.getPartitionName(), partition);
String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
Range<PartitionKey> partitionRange = getPartitionRange(
partition.getPartitionValues().get(0), transform, partitionColumns);
PartitionItem item = new RangePartitionItem(partitionRange);
nameToPartitionItem.put(partition.getPartitionName(), item);
}
Map<String, Set<String>> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem);
return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap);
}
private static List<IcebergPartition> loadIcebergPartition(Table table, long snapshotId) {
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
.createMetadataTableInstance(table, MetadataTableType.PARTITIONS);
List<IcebergPartition> partitions = Lists.newArrayList();
try (CloseableIterable<FileScanTask> tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
for (FileScanTask task : tasks) {
CloseableIterable<StructLike> rows = task.asDataTask().rows();
for (StructLike row : rows) {
partitions.add(generateIcebergPartition(table, row));
}
}
} catch (IOException e) {
LOG.warn("Failed to get Iceberg table {} partition info.", table.name(), e);
}
return partitions;
}
private static IcebergPartition generateIcebergPartition(Table table, StructLike row) {
// row format :
// 0. partitionData,
// 1. spec_id,
// 2. record_count,
// 3. file_count,
// 4. total_data_file_size_in_bytes,
// 5. position_delete_record_count,
// 6. position_delete_file_count,
// 7. equality_delete_record_count,
// 8. equality_delete_file_count,
// 9. last_updated_at,
// 10. last_updated_snapshot_id
Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table.");
int specId = row.get(1, Integer.class);
PartitionSpec partitionSpec = table.specs().get(specId);
StructProjection partitionData = row.get(0, StructProjection.class);
StringBuilder sb = new StringBuilder();
List<String> partitionValues = Lists.newArrayList();
List<String> transforms = Lists.newArrayList();
for (int i = 0; i < partitionSpec.fields().size(); ++i) {
PartitionField partitionField = partitionSpec.fields().get(i);
Class<?> fieldClass = partitionSpec.javaClasses()[i];
int fieldId = partitionField.fieldId();
// Iceberg partition field id starts at PARTITION_DATA_ID_START,
// So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START
int index = fieldId - PARTITION_DATA_ID_START;
Object o = partitionData.get(index, fieldClass);
String fieldValue = o == null ? null : o.toString();
String fieldName = partitionField.name();
sb.append(fieldName);
sb.append("=");
sb.append(fieldValue);
sb.append("/");
partitionValues.add(fieldValue);
transforms.add(partitionField.transform().toString());
}
if (sb.length() > 0) {
sb.delete(sb.length() - 1, sb.length());
}
String partitionName = sb.toString();
long recordCount = row.get(2, Long.class);
long fileCount = row.get(3, Integer.class);
long fileSizeInBytes = row.get(4, Long.class);
long lastUpdateTime = row.get(9, Long.class);
long lastUpdateSnapShotId = row.get(10, Long.class);
return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount,
lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms);
}
@VisibleForTesting
public static Range<PartitionKey> getPartitionRange(String value, String transform, List<Column> partitionColumns)
throws AnalysisException {
// For NULL value, create a minimum partition for it.
if (value == null) {
PartitionKey nullLowKey = PartitionKey.createPartitionKey(
Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns);
PartitionKey nullUpKey = nullLowKey.successor();
return Range.closedOpen(nullLowKey, nullUpKey);
}
LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
LocalDateTime target;
LocalDateTime lower;
LocalDateTime upper;
long longValue = Long.parseLong(value);
switch (transform) {
case HOUR:
target = epoch.plusHours(longValue);
lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(),
target.getHour(), 0, 0);
upper = lower.plusHours(1);
break;
case DAY:
target = epoch.plusDays(longValue);
lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0);
upper = lower.plusDays(1);
break;
case MONTH:
target = epoch.plusMonths(longValue);
lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0);
upper = lower.plusMonths(1);
break;
case YEAR:
target = epoch.plusYears(longValue);
lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0);
upper = lower.plusYears(1);
break;
default:
throw new RuntimeException("Unsupported transform " + transform);
}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
Column c = partitionColumns.get(0);
Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column");
if (c.getType().isDate() || c.getType().isDateV2()) {
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
}
PartitionValue lowerValue = new PartitionValue(lower.format(formatter));
PartitionValue upperValue = new PartitionValue(upper.format(formatter));
PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns);
PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns);
return Range.closedOpen(lowKey, upperKey);
}
/**
* Merge overlapped iceberg partitions into one Doris partition.
*/
@VisibleForTesting
public static Map<String, Set<String>> mergeOverlapPartitions(Map<String, PartitionItem> originPartitions) {
List<Map.Entry<String, PartitionItem>> entries = sortPartitionMap(originPartitions);
Map<String, Set<String>> map = Maps.newHashMap();
for (int i = 0; i < entries.size() - 1; i++) {
Range<PartitionKey> firstValue = entries.get(i).getValue().getItems();
String firstKey = entries.get(i).getKey();
Range<PartitionKey> secondValue = entries.get(i + 1).getValue().getItems();
String secondKey = entries.get(i + 1).getKey();
// If the first entry enclose the second one, remove the second entry and keep a record in the return map.
// So we can track the iceberg partitions those contained by one Doris partition.
while (i < entries.size() && firstValue.encloses(secondValue)) {
originPartitions.remove(secondKey);
map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
String finalSecondKey = secondKey;
map.computeIfPresent(firstKey, (key, value) -> {
value.add(finalSecondKey);
return value;
});
i++;
if (i >= entries.size() - 1) {
break;
}
secondValue = entries.get(i + 1).getValue().getItems();
secondKey = entries.get(i + 1).getKey();
}
}
return map;
}
/**
* Sort the given map entries by PartitionItem Range(LOW, HIGH)
* When comparing two ranges, the one with smaller LOW value is smaller than the other one.
* If two ranges have same values of LOW, the one with larger HIGH value is smaller.
*
* For now, we only support year, month, day and hour,
* so it is impossible to have two partially intersect partitions.
* One range is either enclosed by another or has no intersection at all with another.
*
*
* For example, we have these 4 ranges:
* [10, 20), [30, 40), [0, 30), [10, 15)
*
* After sort, they become:
* [0, 30), [10, 20), [10, 15), [30, 40)
*/
@VisibleForTesting
public static List<Map.Entry<String, PartitionItem>> sortPartitionMap(Map<String, PartitionItem> originPartitions) {
List<Map.Entry<String, PartitionItem>> entries = new ArrayList<>(originPartitions.entrySet());
entries.sort(new RangeComparator());
return entries;
}
public static class RangeComparator implements Comparator<Map.Entry<String, PartitionItem>> {
@Override
public int compare(Map.Entry<String, PartitionItem> p1, Map.Entry<String, PartitionItem> p2) {
PartitionItem value1 = p1.getValue();
PartitionItem value2 = p2.getValue();
if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) {
Range<PartitionKey> items1 = value1.getItems();
Range<PartitionKey> items2 = value2.getItems();
if (!items1.hasLowerBound()) {
return -1;
}
if (!items2.hasLowerBound()) {
return 1;
}
PartitionKey upper1 = items1.upperEndpoint();
PartitionKey lower1 = items1.lowerEndpoint();
PartitionKey upper2 = items2.upperEndpoint();
PartitionKey lower2 = items2.lowerEndpoint();
int compareLow = lower1.compareTo(lower2);
return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
}
return 0;
}
}
public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue(
Optional<TableSnapshot> tableSnapshot,
ExternalCatalog catalog,
String dbName,
String tbName) {
IcebergSnapshotCacheValue snapshotCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
.getSnapshotCache(catalog, dbName, tbName);
if (tableSnapshot.isPresent()) {
// If a snapshot is specified,
// use the specified snapshot and the corresponding schema(not the latest schema).
Table icebergTable = getIcebergTable(catalog, dbName, tbName);
TableSnapshot snapshot = tableSnapshot.get();
long querySpecSnapshot = getQuerySpecSnapshot(icebergTable, snapshot);
return new IcebergSnapshotCacheValue(
IcebergPartitionInfo.empty(),
new IcebergSnapshot(querySpecSnapshot, icebergTable.snapshot(querySpecSnapshot).schemaId()));
} else {
// Otherwise, use the latest snapshot and the latest schema.
return snapshotCache;
}
}
// load table schema from iceberg API to external schema cache.
public static Optional<SchemaCacheValue> loadSchemaCacheValue(
ExternalCatalog catalog, String dbName, String tbName, long schemaId) {
Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName, schemaId);
List<Column> tmpColumns = Lists.newArrayList();
PartitionSpec spec = table.spec();
for (PartitionField field : spec.fields()) {
Types.NestedField col = table.schema().findField(field.sourceId());
for (Column c : schema) {
if (c.getName().equalsIgnoreCase(col.name())) {
tmpColumns.add(c);
break;
}
}
}
return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
}
public static List<Column> getIcebergSchema(
TableIf tableIf,
ExternalCatalog catalog,
String dbName,
String tbName) {
Optional<MvccSnapshot> snapshotFromContext = MvccUtil.getSnapshotFromContext(tableIf);
IcebergSnapshotCacheValue cacheValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext, catalog, dbName, tbName);
return IcebergUtils.getSchemaCacheValue(
catalog, dbName, tbName, cacheValue.getSnapshot().getSchemaId())
.getSchema();
}
public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(
Optional<MvccSnapshot> snapshot,
ExternalCatalog catalog,
String dbName,
String tbName) {
if (snapshot.isPresent()) {
return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue();
} else {
return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), catalog, dbName, tbName);
}
}
}