IcebergUtils.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg;

import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;

import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.Or;
import org.apache.iceberg.expressions.Unbound;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.LocationUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Iceberg utils
 */
public class IcebergUtils {
    private static final Logger LOG = LogManager.getLogger(IcebergUtils.class);
    private static ThreadLocal<Integer> columnIdThreadLocal = new ThreadLocal<Integer>() {
        @Override
        public Integer initialValue() {
            return 0;
        }
    };
    // https://iceberg.apache.org/spec/#schemas-and-data-types
    // All time and timestamp values are stored with microsecond precision
    private static final int ICEBERG_DATETIME_SCALE_MS = 6;
    private static final String PARQUET_NAME = "parquet";
    private static final String ORC_NAME = "orc";

    public static final String TOTAL_RECORDS = "total-records";
    public static final String TOTAL_POSITION_DELETES = "total-position-deletes";
    public static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes";

    // nickname in flink and spark
    public static final String WRITE_FORMAT = "write-format";
    public static final String COMPRESSION_CODEC = "compression-codec";

    // nickname in spark
    public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec";

    public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
        if (expr == null) {
            return null;
        }

        Expression expression = null;
        // BoolLiteral
        if (expr instanceof BoolLiteral) {
            BoolLiteral boolLiteral = (BoolLiteral) expr;
            boolean value = boolLiteral.getValue();
            if (value) {
                expression = Expressions.alwaysTrue();
            } else {
                expression = Expressions.alwaysFalse();
            }
        } else if (expr instanceof CompoundPredicate) {
            CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
            switch (compoundPredicate.getOp()) {
                case AND: {
                    Expression left = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
                    Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
                    if (left != null && right != null) {
                        expression = Expressions.and(left, right);
                    } else if (left != null) {
                        return left;
                    } else if (right != null) {
                        return right;
                    }
                    break;
                }
                case OR: {
                    Expression left = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
                    Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
                    if (left != null && right != null) {
                        expression = Expressions.or(left, right);
                    }
                    break;
                }
                case NOT: {
                    Expression child = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
                    if (child != null) {
                        expression = Expressions.not(child);
                    }
                    break;
                }
                default:
                    return null;
            }
        } else if (expr instanceof BinaryPredicate) {
            TExprOpcode opCode = expr.getOpcode();
            switch (opCode) {
                case EQ:
                case NE:
                case GE:
                case GT:
                case LE:
                case LT:
                case EQ_FOR_NULL:
                    BinaryPredicate eq = (BinaryPredicate) expr;
                    SlotRef slotRef = convertDorisExprToSlotRef(eq.getChild(0));
                    LiteralExpr literalExpr = null;
                    if (slotRef == null && eq.getChild(0).isLiteral()) {
                        literalExpr = (LiteralExpr) eq.getChild(0);
                        slotRef = convertDorisExprToSlotRef(eq.getChild(1));
                    } else if (eq.getChild(1).isLiteral()) {
                        literalExpr = (LiteralExpr) eq.getChild(1);
                    }
                    if (slotRef == null || literalExpr == null) {
                        return null;
                    }
                    String colName = slotRef.getColumnName();
                    Types.NestedField nestedField = schema.caseInsensitiveFindField(colName);
                    colName = nestedField.name();
                    Object value = extractDorisLiteral(nestedField.type(), literalExpr);
                    if (value == null) {
                        if (opCode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof NullLiteral) {
                            expression = Expressions.isNull(colName);
                        } else {
                            return null;
                        }
                    } else {
                        switch (opCode) {
                            case EQ:
                            case EQ_FOR_NULL:
                                expression = Expressions.equal(colName, value);
                                break;
                            case NE:
                                expression = Expressions.not(Expressions.equal(colName, value));
                                break;
                            case GE:
                                expression = Expressions.greaterThanOrEqual(colName, value);
                                break;
                            case GT:
                                expression = Expressions.greaterThan(colName, value);
                                break;
                            case LE:
                                expression = Expressions.lessThanOrEqual(colName, value);
                                break;
                            case LT:
                                expression = Expressions.lessThan(colName, value);
                                break;
                            default:
                                return null;
                        }
                    }
                    break;
                default:
                    return null;
            }
        } else if (expr instanceof InPredicate) {
            // InPredicate, only support a in (1,2,3)
            InPredicate inExpr = (InPredicate) expr;
            if (inExpr.contains(Subquery.class)) {
                return null;
            }
            SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0));
            if (slotRef == null) {
                return null;
            }
            String colName = slotRef.getColumnName();
            Types.NestedField nestedField = schema.caseInsensitiveFindField(colName);
            colName = nestedField.name();
            List<Object> valueList = new ArrayList<>();
            for (int i = 1; i < inExpr.getChildren().size(); ++i) {
                if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
                    return null;
                }
                LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
                Object value = extractDorisLiteral(nestedField.type(), literalExpr);
                if (value == null) {
                    return null;
                }
                valueList.add(value);
            }
            if (inExpr.isNotIn()) {
                // not in
                expression = Expressions.notIn(colName, valueList);
            } else {
                // in
                expression = Expressions.in(colName, valueList);
            }
        }

        return checkConversion(expression, schema);
    }

    private static Expression checkConversion(Expression expression, Schema schema) {
        if (expression == null) {
            return null;
        }
        switch (expression.op()) {
            case AND: {
                And andExpr = (And) expression;
                Expression left = checkConversion(andExpr.left(), schema);
                Expression right = checkConversion(andExpr.right(), schema);
                if (left != null && right != null) {
                    return andExpr;
                } else if (left != null) {
                    return left;
                } else if (right != null) {
                    return right;
                } else {
                    return null;
                }
            }
            case OR: {
                Or orExpr = (Or) expression;
                Expression left = checkConversion(orExpr.left(), schema);
                Expression right = checkConversion(orExpr.right(), schema);
                if (left == null || right == null) {
                    return null;
                } else {
                    return orExpr;
                }
            }
            case NOT: {
                Not notExpr = (Not) expression;
                Expression child = checkConversion(notExpr.child(), schema);
                if (child == null) {
                    return null;
                } else {
                    return notExpr;
                }
            }
            case TRUE:
            case FALSE:
                return expression;
            default:
                if (!(expression instanceof Unbound)) {
                    return null;
                }
                try {
                    ((Unbound<?, ?>) expression).bind(schema.asStruct(), true);
                    return expression;
                } catch (Exception e) {
                    LOG.debug("Failed to check expression: {}", e.getMessage());
                    return null;
                }
        }
    }

    public static Object extractDorisLiteral(org.apache.iceberg.types.Type icebergType, Expr expr) {
        TypeID icebergTypeID = icebergType.typeId();
        if (expr instanceof BoolLiteral) {
            BoolLiteral boolLiteral = (BoolLiteral) expr;
            switch (icebergTypeID) {
                case BOOLEAN:
                    return boolLiteral.getValue();
                case STRING:
                    return boolLiteral.getStringValue();
                default:
                    return null;
            }
        } else if (expr instanceof DateLiteral) {
            DateLiteral dateLiteral = (DateLiteral) expr;
            switch (icebergTypeID) {
                case STRING:
                case DATE:
                    return dateLiteral.getStringValue();
                case TIMESTAMP:
                    if (((Types.TimestampType) icebergType).shouldAdjustToUTC()) {
                        return dateLiteral.getUnixTimestampWithMicroseconds(TimeUtils.getTimeZone());
                    } else {
                        return dateLiteral.getUnixTimestampWithMicroseconds(TimeUtils.getUTCTimeZone());
                    }
                default:
                    return null;
            }
        } else if (expr instanceof DecimalLiteral) {
            DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
            switch (icebergTypeID) {
                case DECIMAL:
                    return decimalLiteral.getValue();
                case STRING:
                    return decimalLiteral.getStringValue();
                case DOUBLE:
                    return decimalLiteral.getDoubleValue();
                default:
                    return null;
            }
        } else if (expr instanceof FloatLiteral) {
            FloatLiteral floatLiteral = (FloatLiteral) expr;
            if (floatLiteral.getType() == Type.FLOAT) {
                switch (icebergTypeID) {
                    case FLOAT:
                    case DOUBLE:
                    case DECIMAL:
                        return floatLiteral.getValue();
                    default:
                        return null;
                }
            } else {
                switch (icebergTypeID) {
                    case DOUBLE:
                    case DECIMAL:
                        return floatLiteral.getValue();
                    default:
                        return null;
                }
            }
        } else if (expr instanceof IntLiteral) {
            IntLiteral intLiteral = (IntLiteral) expr;
            Type type = intLiteral.getType();
            if (type.isInteger32Type()) {
                switch (icebergTypeID) {
                    case INTEGER:
                    case LONG:
                    case FLOAT:
                    case DOUBLE:
                    case DATE:
                    case DECIMAL:
                        return (int) intLiteral.getValue();
                    default:
                        return null;
                }
            } else {
                // only PrimitiveType.BIGINT
                switch (icebergTypeID) {
                    case INTEGER:
                    case LONG:
                    case FLOAT:
                    case DOUBLE:
                    case TIME:
                    case TIMESTAMP:
                    case DATE:
                    case DECIMAL:
                        return intLiteral.getValue();
                    default:
                        return null;
                }
            }
        } else if (expr instanceof StringLiteral) {
            String value = expr.getStringValue();
            switch (icebergTypeID) {
                case DATE:
                case TIME:
                case TIMESTAMP:
                case STRING:
                case UUID:
                case DECIMAL:
                    return value;
                case INTEGER:
                    try {
                        return Integer.parseInt(value);
                    } catch (Exception e) {
                        return null;
                    }
                case LONG:
                    try {
                        return Long.parseLong(value);
                    } catch (Exception e) {
                        return null;
                    }
                default:
                    return null;
            }
        }
        return null;
    }

    private static SlotRef convertDorisExprToSlotRef(Expr expr) {
        SlotRef slotRef = null;
        if (expr instanceof SlotRef) {
            slotRef = (SlotRef) expr;
        } else if (expr instanceof CastExpr) {
            if (expr.getChild(0) instanceof SlotRef) {
                slotRef = (SlotRef) expr.getChild(0);
            }
        }
        return slotRef;
    }

    public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDesc, Schema schema)
            throws UserException {
        if (partitionDesc == null) {
            return PartitionSpec.unpartitioned();
        }

        ArrayList<Expr> partitionExprs = partitionDesc.getPartitionExprs();
        PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
        for (Expr expr : partitionExprs) {
            if (expr instanceof SlotRef) {
                builder.identity(((SlotRef) expr).getColumnName());
            } else if (expr instanceof FunctionCallExpr) {
                String exprName = expr.getExprName();
                List<Expr> params = ((FunctionCallExpr) expr).getParams().exprs();
                switch (exprName.toLowerCase()) {
                    case "bucket":
                        builder.bucket(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
                        break;
                    case "year":
                    case "years":
                        builder.year(params.get(0).getExprName());
                        break;
                    case "month":
                    case "months":
                        builder.month(params.get(0).getExprName());
                        break;
                    case "date":
                    case "day":
                    case "days":
                        builder.day(params.get(0).getExprName());
                        break;
                    case "date_hour":
                    case "hour":
                    case "hours":
                        builder.hour(params.get(0).getExprName());
                        break;
                    case "truncate":
                        builder.truncate(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
                        break;
                    default:
                        throw new UserException("unsupported partition for " + exprName);
                }
            }
        }
        return builder.build();
    }

    private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
        switch (primitive.typeId()) {
            case BOOLEAN:
                return Type.BOOLEAN;
            case INTEGER:
                return Type.INT;
            case LONG:
                return Type.BIGINT;
            case FLOAT:
                return Type.FLOAT;
            case DOUBLE:
                return Type.DOUBLE;
            case STRING:
            case BINARY:
            case UUID:
                return Type.STRING;
            case FIXED:
                Types.FixedType fixed = (Types.FixedType) primitive;
                return ScalarType.createCharType(fixed.length());
            case DECIMAL:
                Types.DecimalType decimal = (Types.DecimalType) primitive;
                return ScalarType.createDecimalV3Type(decimal.precision(), decimal.scale());
            case DATE:
                return ScalarType.createDateV2Type();
            case TIMESTAMP:
                return ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
            case TIME:
                return Type.UNSUPPORTED;
            default:
                throw new IllegalArgumentException("Cannot transform unknown type: " + primitive);
        }
    }

    public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
        if (type.isPrimitiveType()) {
            return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type);
        }
        switch (type.typeId()) {
            case LIST:
                Types.ListType list = (Types.ListType) type;
                return ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
            case MAP:
                Types.MapType map = (Types.MapType) type;
                return new MapType(
                        icebergTypeToDorisType(map.keyType()),
                        icebergTypeToDorisType(map.valueType())
                );
            case STRUCT:
                Types.StructType struct = (Types.StructType) type;
                ArrayList<StructField> nestedTypes = struct.fields().stream().map(
                        x -> new StructField(x.name(), icebergTypeToDorisType(x.type()))
                ).collect(Collectors.toCollection(ArrayList::new));
                return new StructType(nestedTypes);
            default:
                throw new IllegalArgumentException("Cannot transform unknown type: " + type);
        }
    }


    public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog catalog, String dbName, String tblName) {
        return getIcebergTableInternal(catalog, dbName, tblName, false);
    }

    public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
        return getIcebergTableInternal(catalog, tableInfo.getDbName(), tableInfo.getTbName(), true);
    }

    public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
        return Env.getCurrentEnv()
                .getExtMetaCacheMgr()
                .getIcebergMetadataCache()
                .getIcebergTable(catalog, tableInfo.getDbName(), tableInfo.getTbName());
    }

    private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog catalog, String dbName,
            String tblName,
            boolean isClone) {
        IcebergMetadataCache metadataCache = Env.getCurrentEnv()
                .getExtMetaCacheMgr()
                .getIcebergMetadataCache();
        return isClone ? metadataCache.getAndCloneTable(catalog, dbName, tblName)
                : metadataCache.getIcebergTable(catalog, dbName, tblName);
    }

    /**
     * Get iceberg schema from catalog and convert them to doris schema
     */
    public static List<Column> getSchema(ExternalCatalog catalog, String dbName, String name) {
        try {
            return catalog.getPreExecutionAuthenticator().execute(() -> {
                org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name);
                Schema schema = icebergTable.schema();
                List<Types.NestedField> columns = schema.columns();
                List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
                for (Types.NestedField field : columns) {
                    tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
                            IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
                            schema.caseInsensitiveFindField(field.name()).fieldId()));
                }
                return tmpSchema;
            });
        } catch (Exception e) {
            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
        }

    }


    /**
     * Estimate iceberg table row count.
     * Get the row count by adding all task file recordCount.
     *
     * @return estimated row count
     */
    public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, String tbName) {
        // the table may be null when the iceberg metadata cache is not loaded.But I don't think it's a problem,
        // because the NPE would be caught in the caller and return the default value -1.
        // Meanwhile, it will trigger iceberg metadata cache to load the table, so we can get it next time.
        Table icebergTable = Env.getCurrentEnv()
                .getExtMetaCacheMgr()
                .getIcebergMetadataCache()
                .getIcebergTable(catalog, dbName, tbName);
        Snapshot snapshot = icebergTable.currentSnapshot();
        if (snapshot == null) {
            LOG.info("Iceberg table {}.{}.{} is empty, return -1.", catalog.getName(), dbName, tbName);
            // empty table
            return TableIf.UNKNOWN_ROW_COUNT;
        }
        Map<String, String> summary = snapshot.summary();
        long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
        LOG.info("Iceberg table {}.{}.{} row count in summary is {}", catalog.getName(), dbName, tbName, rows);
        return rows;
    }


    public static FileFormat getFileFormat(Table icebergTable) {
        Map<String, String> properties = icebergTable.properties();
        String fileFormatName;
        if (properties.containsKey(WRITE_FORMAT)) {
            fileFormatName = properties.get(WRITE_FORMAT);
        } else {
            fileFormatName = properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME);
        }
        FileFormat fileFormat;
        if (fileFormatName.toLowerCase().contains(ORC_NAME)) {
            fileFormat = FileFormat.ORC;
        } else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) {
            fileFormat = FileFormat.PARQUET;
        } else {
            throw new RuntimeException("Unsupported input format type: " + fileFormatName);
        }
        return fileFormat;
    }


    public static String getFileCompress(Table table) {
        Map<String, String> properties = table.properties();
        if (properties.containsKey(COMPRESSION_CODEC)) {
            return properties.get(COMPRESSION_CODEC);
        } else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
            return properties.get(SPARK_SQL_COMPRESSION_CODEC);
        }
        FileFormat fileFormat = getFileFormat(table);
        if (fileFormat == FileFormat.PARQUET) {
            return properties.getOrDefault(
                    TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
        } else if (fileFormat == FileFormat.ORC) {
            return properties.getOrDefault(
                    TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT);
        }
        throw new NotSupportedException("Unsupported file format: " + fileFormat);
    }

    public static String dataLocation(Table table) {
        Map<String, String> properties = table.properties();
        if (properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
            throw new NotSupportedException(
                    "Table " + table.name() + " specifies " + properties
                            .get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
                            + " as a location provider. "
                            + "Writing to Iceberg tables with custom location provider is not supported.");
        }
        String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
        if (dataLocation == null) {
            dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
            if (dataLocation == null) {
                dataLocation = String.format("%s/data", LocationUtil.stripTrailingSlash(table.location()));
            }
        }
        return dataLocation;
    }

    public static HiveCatalog createIcebergHiveCatalog(ExternalCatalog externalCatalog, String name) {
        HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
        hiveCatalog.setConf(externalCatalog.getConfiguration());

        Map<String, String> catalogProperties = externalCatalog.getProperties();
        String metastoreUris = catalogProperties.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
        catalogProperties.put(CatalogProperties.URI, metastoreUris);

        hiveCatalog.initialize(name, catalogProperties);
        return hiveCatalog;
    }
}