HiveMetaStoreClientHelper.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.thrift.TExprOpcode;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
 * Helper class for HiveMetaStoreClient
 */
public class HiveMetaStoreClientHelper {
    private static final Logger LOG = LogManager.getLogger(HiveMetaStoreClientHelper.class);

    public static final String COMMENT = "comment";

    private static final Pattern digitPattern = Pattern.compile("(\\d+)");

    public static final String HIVE_JSON_SERDE = "org.apache.hive.hcatalog.data.JsonSerDe";
    public static final String LEGACY_HIVE_JSON_SERDE = "org.apache.hadoop.hive.serde2.JsonSerDe";
    public static final String OPENX_JSON_SERDE = "org.openx.data.jsonserde.JsonSerDe";

    public enum HiveFileFormat {
        TEXT_FILE(0, "text"),
        PARQUET(1, "parquet"),
        ORC(2, "orc");

        private int index;
        private String desc;

        HiveFileFormat(int index, String desc) {
            this.index = index;
            this.desc = desc;
        }

        public int getIndex() {
            return index;
        }

        public String getDesc() {
            return desc;
        }

        /**
         * convert Hive table inputFormat to file format
         * @param input inputFormat of Hive file
         * @return
         * @throws DdlException
         */
        public static String getFormat(String input) throws DdlException {
            String formatDesc = "";
            for (HiveFileFormat format : HiveFileFormat.values()) {
                String lowerCaseInput = input.toLowerCase();
                if (lowerCaseInput.contains(format.getDesc())) {
                    formatDesc = format.getDesc();
                    break;
                }
            }
            if (Strings.isNullOrEmpty(formatDesc)) {
                LOG.warn("Not supported Hive file format [{}].", input);
                throw new DdlException("Not supported Hive file format " + input);
            }
            return formatDesc;
        }
    }

    /**
     * Convert Doris expr to Hive expr, only for partition column
     * @param tblName
     * @return
     * @throws DdlException
     * @throws SemanticException
     */
    public static ExprNodeGenericFuncDesc convertToHivePartitionExpr(List<Expr> conjuncts,
            List<String> partitionKeys, String tblName) throws DdlException {
        List<ExprNodeDesc> hivePredicates = new ArrayList<>();

        for (Expr conjunct : conjuncts) {
            ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr(
                    conjunct, partitionKeys, tblName).getFuncDesc();
            if (hiveExpr != null) {
                hivePredicates.add(hiveExpr);
            }
        }
        int count = hivePredicates.size();
        // combine all predicate by `and`
        // compoundExprs must have at least 2 predicates
        if (count >= 2) {
            return HiveMetaStoreClientHelper.getCompoundExpr(hivePredicates, "and");
        } else if (count == 1) {
            // only one predicate
            return (ExprNodeGenericFuncDesc) hivePredicates.get(0);
        } else {
            return genAlwaysTrueExpr(tblName);
        }
    }

    private static ExprNodeGenericFuncDesc genAlwaysTrueExpr(String tblName) throws DdlException {
        // have no predicate, make a dummy predicate "1=1" to get all partitions
        HiveMetaStoreClientHelper.ExprBuilder exprBuilder =
                new HiveMetaStoreClientHelper.ExprBuilder(tblName);
        return exprBuilder.val(TypeInfoFactory.intTypeInfo, 1)
                .val(TypeInfoFactory.intTypeInfo, 1)
                .pred("=", 2).build();
    }

    private static class ExprNodeGenericFuncDescContext {
        private static final ExprNodeGenericFuncDescContext BAD_CONTEXT = new ExprNodeGenericFuncDescContext();

        private ExprNodeGenericFuncDesc funcDesc = null;
        private boolean eligible = false;

        public ExprNodeGenericFuncDescContext(ExprNodeGenericFuncDesc funcDesc) {
            this.funcDesc = funcDesc;
            this.eligible = true;
        }

        private ExprNodeGenericFuncDescContext() {
        }

        /**
         * Check eligible before use the expr in CompoundPredicate for `and` and `or` .
         */
        public boolean isEligible() {
            return eligible;
        }

        public ExprNodeGenericFuncDesc getFuncDesc() {
            return funcDesc;
        }
    }

    private static ExprNodeGenericFuncDescContext convertToHivePartitionExpr(Expr dorisExpr,
            List<String> partitionKeys, String tblName) throws DdlException {
        if (dorisExpr == null) {
            return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
        }

        if (dorisExpr instanceof CompoundPredicate) {
            CompoundPredicate compoundPredicate = (CompoundPredicate) dorisExpr;
            ExprNodeGenericFuncDescContext left = convertToHivePartitionExpr(
                    compoundPredicate.getChild(0), partitionKeys, tblName);
            ExprNodeGenericFuncDescContext right = convertToHivePartitionExpr(
                    compoundPredicate.getChild(1), partitionKeys, tblName);

            switch (compoundPredicate.getOp()) {
                case AND: {
                    if (left.isEligible() && right.isEligible()) {
                        List<ExprNodeDesc> andArgs = new ArrayList<>();
                        andArgs.add(left.getFuncDesc());
                        andArgs.add(right.getFuncDesc());
                        return new ExprNodeGenericFuncDescContext(getCompoundExpr(andArgs, "and"));
                    } else if (left.isEligible()) {
                        return left;
                    } else if (right.isEligible()) {
                        return right;
                    } else {
                        return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
                    }
                }
                case OR: {
                    if (left.isEligible() && right.isEligible()) {
                        List<ExprNodeDesc> andArgs = new ArrayList<>();
                        andArgs.add(left.getFuncDesc());
                        andArgs.add(right.getFuncDesc());
                        return new ExprNodeGenericFuncDescContext(getCompoundExpr(andArgs, "or"));
                    } else {
                        // If it is not a partition key, this is an always true expr.
                        // Or if is a partition key and also is a not supportedOp, this is an always true expr.
                        return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
                    }
                }
                default:
                    // TODO: support NOT predicate for CompoundPredicate
                    return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
            }
        }
        return binaryExprDesc(dorisExpr, partitionKeys, tblName);
    }

    private static ExprNodeGenericFuncDescContext binaryExprDesc(Expr dorisExpr,
            List<String> partitionKeys, String tblName) throws DdlException {
        TExprOpcode opcode = dorisExpr.getOpcode();
        switch (opcode) {
            case EQ:
            case NE:
            case GE:
            case GT:
            case LE:
            case LT:
            case EQ_FOR_NULL:
                BinaryPredicate eq = (BinaryPredicate) dorisExpr;
                // Make sure the col slot is always first
                SlotRef slotRef = convertDorisExprToSlotRef(eq.getChild(0));
                LiteralExpr literalExpr = convertDorisExprToLiteralExpr(eq.getChild(1));
                if (slotRef == null || literalExpr == null) {
                    return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
                }
                String colName = slotRef.getColumnName();
                // check whether colName is partition column or not
                if (!partitionKeys.contains(colName)) {
                    return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
                }
                PrimitiveType dorisPrimitiveType = slotRef.getType().getPrimitiveType();
                PrimitiveTypeInfo hivePrimitiveType = convertToHiveColType(dorisPrimitiveType);
                Object value = extractDorisLiteral(literalExpr);
                if (value == null) {
                    if (opcode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof NullLiteral) {
                        return genExprDesc(tblName, hivePrimitiveType, colName, "NULL", "=");
                    } else {
                        return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
                    }
                }
                switch (opcode) {
                    case EQ:
                    case EQ_FOR_NULL:
                        return genExprDesc(tblName, hivePrimitiveType, colName, value, "=");
                    case NE:
                        return genExprDesc(tblName, hivePrimitiveType, colName, value, "!=");
                    case GE:
                        return genExprDesc(tblName, hivePrimitiveType, colName, value, ">=");
                    case GT:
                        return genExprDesc(tblName, hivePrimitiveType, colName, value, ">");
                    case LE:
                        return genExprDesc(tblName, hivePrimitiveType, colName, value, "<=");
                    case LT:
                        return genExprDesc(tblName, hivePrimitiveType, colName, value, "<");
                    default:
                        return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
                }
            default:
                // TODO: support in predicate
                return ExprNodeGenericFuncDescContext.BAD_CONTEXT;
        }
    }

    private static ExprNodeGenericFuncDescContext genExprDesc(
            String tblName,
            PrimitiveTypeInfo hivePrimitiveType,
            String colName,
            Object value,
            String op) throws DdlException {
        ExprBuilder exprBuilder = new ExprBuilder(tblName);
        exprBuilder.col(hivePrimitiveType, colName).val(hivePrimitiveType, value);
        return new ExprNodeGenericFuncDescContext(exprBuilder.pred(op, 2).build());
    }

    public static ExprNodeGenericFuncDesc getCompoundExpr(List<ExprNodeDesc> args, String op) throws DdlException {
        ExprNodeGenericFuncDesc compoundExpr;
        try {
            compoundExpr = ExprNodeGenericFuncDesc.newInstance(
                    FunctionRegistry.getFunctionInfo(op).getGenericUDF(), args);
        } catch (SemanticException e) {
            LOG.warn("Convert to Hive expr failed: {}", e.getMessage());
            throw new DdlException("Convert to Hive expr failed. Error: " + e.getMessage());
        }
        return compoundExpr;
    }

    public static SlotRef convertDorisExprToSlotRef(Expr expr) {
        SlotRef slotRef = null;
        if (expr instanceof SlotRef) {
            slotRef = (SlotRef) expr;
        } else if (expr instanceof CastExpr) {
            if (expr.getChild(0) instanceof SlotRef) {
                slotRef = (SlotRef) expr.getChild(0);
            }
        }
        return slotRef;
    }

    public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) {
        LiteralExpr literalExpr = null;
        if (expr instanceof LiteralExpr) {
            literalExpr = (LiteralExpr) expr;
        } else if (expr instanceof CastExpr) {
            if (expr.getChild(0) instanceof LiteralExpr) {
                literalExpr = (LiteralExpr) expr.getChild(0);
            }
        }
        return literalExpr;
    }

    public static Object extractDorisLiteral(Expr expr) {
        if (!expr.isLiteral()) {
            return null;
        }
        if (expr instanceof BoolLiteral) {
            BoolLiteral boolLiteral = (BoolLiteral) expr;
            return boolLiteral.getValue();
        } else if (expr instanceof DateLiteral) {
            DateLiteral dateLiteral = (DateLiteral) expr;
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss")
                    .withZone(ZoneId.systemDefault());
            StringBuilder sb = new StringBuilder();
            sb.append(dateLiteral.getYear())
                    .append(dateLiteral.getMonth())
                    .append(dateLiteral.getDay())
                    .append(dateLiteral.getHour())
                    .append(dateLiteral.getMinute())
                    .append(dateLiteral.getSecond());
            Date date;
            try {
                date = Date.from(
                        LocalDateTime.parse(sb.toString(), formatter).atZone(ZoneId.systemDefault()).toInstant());
            } catch (DateTimeParseException e) {
                return null;
            }
            return date.getTime();
        } else if (expr instanceof DecimalLiteral) {
            DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
            return decimalLiteral.getValue();
        } else if (expr instanceof FloatLiteral) {
            FloatLiteral floatLiteral = (FloatLiteral) expr;
            return floatLiteral.getValue();
        } else if (expr instanceof IntLiteral) {
            IntLiteral intLiteral = (IntLiteral) expr;
            return intLiteral.getValue();
        } else if (expr instanceof StringLiteral) {
            StringLiteral stringLiteral = (StringLiteral) expr;
            return stringLiteral.getStringValue();
        }
        return null;
    }

    /**
     * Convert from Doris column type to Hive column type
     * @param dorisType
     * @return hive primitive type info
     * @throws DdlException
     */
    private static PrimitiveTypeInfo convertToHiveColType(PrimitiveType dorisType) throws DdlException {
        switch (dorisType) {
            case BOOLEAN:
                return TypeInfoFactory.booleanTypeInfo;
            case TINYINT:
                return TypeInfoFactory.byteTypeInfo;
            case SMALLINT:
                return TypeInfoFactory.shortTypeInfo;
            case INT:
                return TypeInfoFactory.intTypeInfo;
            case BIGINT:
                return TypeInfoFactory.longTypeInfo;
            case FLOAT:
                return TypeInfoFactory.floatTypeInfo;
            case DOUBLE:
                return TypeInfoFactory.doubleTypeInfo;
            case DECIMAL32:
            case DECIMAL64:
            case DECIMAL128:
            case DECIMALV2:
                return TypeInfoFactory.decimalTypeInfo;
            case DATE:
            case DATEV2:
                return TypeInfoFactory.dateTypeInfo;
            case DATETIME:
            case DATETIMEV2:
                return TypeInfoFactory.timestampTypeInfo;
            case CHAR:
                return TypeInfoFactory.charTypeInfo;
            case VARCHAR:
            case STRING:
                return TypeInfoFactory.varcharTypeInfo;
            default:
                throw new DdlException("Unsupported column type: " + dorisType);
        }
    }

    /**
     * Helper class for building a Hive expression.
     */
    public static class ExprBuilder {
        private final String tblName;
        private final Deque<ExprNodeDesc> queue = new LinkedList<>();

        public ExprBuilder(String tblName) {
            this.tblName = tblName;
        }

        public ExprNodeGenericFuncDesc build() throws DdlException {
            if (queue.size() != 1) {
                throw new DdlException("Build Hive expression Failed: " + queue.size());
            }
            return (ExprNodeGenericFuncDesc) queue.pollFirst();
        }

        public ExprBuilder pred(String name, int args) throws DdlException {
            return fn(name, TypeInfoFactory.booleanTypeInfo, args);
        }

        private ExprBuilder fn(String name, TypeInfo ti, int args) throws DdlException {
            List<ExprNodeDesc> children = new ArrayList<>();
            for (int i = 0; i < args; ++i) {
                children.add(queue.pollFirst());
            }
            try {
                queue.offerLast(new ExprNodeGenericFuncDesc(ti,
                        FunctionRegistry.getFunctionInfo(name).getGenericUDF(), children));
            } catch (SemanticException e) {
                LOG.warn("Build Hive expression failed: semantic analyze exception: {}", e.getMessage());
                throw new DdlException("Build Hive expression Failed. Error: " + e.getMessage());
            }
            return this;
        }

        public ExprBuilder col(TypeInfo ti, String col) {
            queue.offerLast(new ExprNodeColumnDesc(ti, col, tblName, true));
            return this;
        }

        public ExprBuilder val(TypeInfo ti, Object val) {
            queue.offerLast(new ExprNodeConstantDesc(ti, val));
            return this;
        }
    }

    /**
     * The nested column has inner columns, and each column is separated a comma. The inner column maybe a nested
     * column too, so we cannot simply split by the comma. We need to match the angle brackets,
     * and deal with the inner column recursively.
     */
    private static int findNextNestedField(String commaSplitFields) {
        int numLess = 0;
        int numBracket = 0;
        for (int i = 0; i < commaSplitFields.length(); i++) {
            char c = commaSplitFields.charAt(i);
            if (c == '<') {
                numLess++;
            } else if (c == '>') {
                numLess--;
            } else if (c == '(') {
                numBracket++;
            } else if (c == ')') {
                numBracket--;
            } else if (c == ',' && numLess == 0 && numBracket == 0) {
                return i;
            }
        }
        return commaSplitFields.length();
    }

    /**
     * Convert doris type to hive type.
     */
    public static String dorisTypeToHiveType(Type dorisType) {
        if (dorisType.isScalarType()) {
            PrimitiveType primitiveType = dorisType.getPrimitiveType();
            switch (primitiveType) {
                case BOOLEAN:
                    return "boolean";
                case TINYINT:
                    return "tinyint";
                case SMALLINT:
                    return "smallint";
                case INT:
                    return "int";
                case BIGINT:
                    return "bigint";
                case DATEV2:
                case DATE:
                    return "date";
                case DATETIMEV2:
                case DATETIME:
                    return "timestamp";
                case FLOAT:
                    return "float";
                case DOUBLE:
                    return "double";
                case CHAR: {
                    ScalarType scalarType = (ScalarType) dorisType;
                    return "char(" + scalarType.getLength() + ")";
                }
                case VARCHAR:
                case STRING:
                    return "string";
                case DECIMAL32:
                case DECIMAL64:
                case DECIMAL128:
                case DECIMAL256:
                case DECIMALV2: {
                    StringBuilder decimalType = new StringBuilder();
                    decimalType.append("decimal");
                    ScalarType scalarType = (ScalarType) dorisType;
                    int precision = scalarType.getScalarPrecision();
                    if (precision == 0) {
                        precision = ScalarType.DEFAULT_PRECISION;
                    }
                    // decimal(precision, scale)
                    int scale = scalarType.getScalarScale();
                    decimalType.append("(");
                    decimalType.append(precision);
                    decimalType.append(",");
                    decimalType.append(scale);
                    decimalType.append(")");
                    return decimalType.toString();
                }
                default:
                    throw new HMSClientException("Unsupported primitive type conversion of " + dorisType.toSql());
            }
        } else if (dorisType.isArrayType()) {
            ArrayType dorisArray = (ArrayType) dorisType;
            Type itemType = dorisArray.getItemType();
            return "array<" + dorisTypeToHiveType(itemType) + ">";
        } else if (dorisType.isMapType()) {
            MapType dorisMap = (MapType) dorisType;
            Type keyType = dorisMap.getKeyType();
            Type valueType = dorisMap.getValueType();
            return "map<"
                    + dorisTypeToHiveType(keyType)
                    + ","
                    + dorisTypeToHiveType(valueType)
                    + ">";
        } else if (dorisType.isStructType()) {
            StructType dorisStruct = (StructType) dorisType;
            StringBuilder structType = new StringBuilder();
            structType.append("struct<");
            ArrayList<StructField> fields = dorisStruct.getFields();
            for (int i = 0; i < fields.size(); i++) {
                StructField field = fields.get(i);
                structType.append(field.getName());
                structType.append(":");
                structType.append(dorisTypeToHiveType(field.getType()));
                if (i != fields.size() - 1) {
                    structType.append(",");
                }
            }
            structType.append(">");
            return structType.toString();
        }
        throw new HMSClientException("Unsupported type conversion of " + dorisType.toSql());
    }

    /**
     * Convert hive type to doris type.
     */
    public static Type hiveTypeToDorisType(String hiveType) {
        // use the largest scale as default time scale.
        return hiveTypeToDorisType(hiveType, 6);
    }

    /**
     * Convert hive type to doris type with timescale.
     */
    public static Type hiveTypeToDorisType(String hiveType, int timeScale) {
        String lowerCaseType = hiveType.toLowerCase();
        switch (lowerCaseType) {
            case "boolean":
                return Type.BOOLEAN;
            case "tinyint":
                return Type.TINYINT;
            case "smallint":
                return Type.SMALLINT;
            case "int":
                return Type.INT;
            case "bigint":
                return Type.BIGINT;
            case "date":
                return ScalarType.createDateV2Type();
            case "timestamp":
                return ScalarType.createDatetimeV2Type(timeScale);
            case "float":
                return Type.FLOAT;
            case "double":
                return Type.DOUBLE;
            case "string":
            case "binary":
                return ScalarType.createStringType();
            default:
                break;
        }
        // resolve schema like array<int>
        if (lowerCaseType.startsWith("array")) {
            if (lowerCaseType.indexOf("<") == 5 && lowerCaseType.lastIndexOf(">") == lowerCaseType.length() - 1) {
                Type innerType = hiveTypeToDorisType(lowerCaseType.substring(6, lowerCaseType.length() - 1));
                return ArrayType.create(innerType, true);
            }
        }
        // resolve schema like map<text, int>
        if (lowerCaseType.startsWith("map")) {
            if (lowerCaseType.indexOf("<") == 3 && lowerCaseType.lastIndexOf(">") == lowerCaseType.length() - 1) {
                String keyValue = lowerCaseType.substring(4, lowerCaseType.length() - 1);
                int index = findNextNestedField(keyValue);
                if (index != keyValue.length() && index != 0) {
                    return new MapType(hiveTypeToDorisType(keyValue.substring(0, index)),
                            hiveTypeToDorisType(keyValue.substring(index + 1)));
                }
            }
        }
        // resolve schema like struct<col1: text, col2: int>
        if (lowerCaseType.startsWith("struct")) {
            if (lowerCaseType.indexOf("<") == 6 && lowerCaseType.lastIndexOf(">") == lowerCaseType.length() - 1) {
                String listFields = lowerCaseType.substring(7, lowerCaseType.length() - 1);
                ArrayList<StructField> fields = new ArrayList<>();
                while (listFields.length() > 0) {
                    int index = findNextNestedField(listFields);
                    int pivot = listFields.indexOf(':');
                    if (pivot > 0 && pivot < listFields.length() - 1) {
                        fields.add(new StructField(listFields.substring(0, pivot),
                                hiveTypeToDorisType(listFields.substring(pivot + 1, index))));
                        listFields = listFields.substring(Math.min(index + 1, listFields.length()));
                    } else {
                        break;
                    }
                }
                if (listFields.isEmpty()) {
                    return new StructType(fields);
                }
            }
        }
        if (lowerCaseType.startsWith("char")) {
            Matcher match = digitPattern.matcher(lowerCaseType);
            if (match.find()) {
                return ScalarType.createType(PrimitiveType.CHAR, Integer.parseInt(match.group(1)), 0, 0);
            }
            return ScalarType.createType(PrimitiveType.CHAR);
        }
        if (lowerCaseType.startsWith("varchar")) {
            Matcher match = digitPattern.matcher(lowerCaseType);
            if (match.find()) {
                return ScalarType.createType(PrimitiveType.VARCHAR, Integer.parseInt(match.group(1)), 0, 0);
            }
            return ScalarType.createType(PrimitiveType.VARCHAR);
        }
        if (lowerCaseType.startsWith("decimal")) {
            Matcher match = digitPattern.matcher(lowerCaseType);
            int precision = ScalarType.DEFAULT_PRECISION;
            int scale = ScalarType.DEFAULT_SCALE;
            if (match.find()) {
                precision = Integer.parseInt(match.group(1));
            }
            if (match.find()) {
                scale = Integer.parseInt(match.group(1));
            }
            return ScalarType.createDecimalV3Type(precision, scale);
        }
        return Type.UNSUPPORTED;
    }

    public static String showCreateTable(HMSExternalTable hmsTable) {
        // Always use the latest schema
        HMSExternalCatalog catalog = (HMSExternalCatalog) hmsTable.getCatalog();
        Table remoteTable = catalog.getClient().getTable(hmsTable.getDbName(), hmsTable.getRemoteName());
        StringBuilder output = new StringBuilder();
        if (remoteTable.isSetViewOriginalText() || remoteTable.isSetViewExpandedText()) {
            output.append(String.format("CREATE VIEW `%s` AS ", remoteTable.getTableName()));
            if (remoteTable.getViewExpandedText() != null) {
                output.append(remoteTable.getViewExpandedText());
            } else {
                output.append(remoteTable.getViewOriginalText());
            }
        } else {
            output.append(String.format("CREATE TABLE `%s`(\n", remoteTable.getTableName()));
            Iterator<FieldSchema> fields = remoteTable.getSd().getCols().iterator();
            while (fields.hasNext()) {
                FieldSchema field = fields.next();
                output.append(String.format("  `%s` %s", field.getName(), field.getType()));
                if (field.getComment() != null) {
                    output.append(String.format(" COMMENT '%s'", field.getComment()));
                }
                if (fields.hasNext()) {
                    output.append(",\n");
                }
            }
            output.append(")\n");
            if (remoteTable.getParameters().containsKey(COMMENT)) {
                output.append(String.format("COMMENT '%s'", remoteTable.getParameters().get(COMMENT))).append("\n");
            }
            if (remoteTable.getPartitionKeys().size() > 0) {
                output.append("PARTITIONED BY (\n")
                        .append(remoteTable.getPartitionKeys().stream().map(
                                        partition ->
                                                String.format(" `%s` %s", partition.getName(), partition.getType()))
                                .collect(Collectors.joining(",\n")))
                        .append(")\n");
            }
            StorageDescriptor descriptor = remoteTable.getSd();
            List<String> bucketCols = descriptor.getBucketCols();
            if (bucketCols != null && bucketCols.size() > 0) {
                output.append("CLUSTERED BY (\n")
                        .append(bucketCols.stream().map(
                                bucketCol -> "  " + bucketCol).collect(Collectors.joining(",\n")))
                        .append(")\n")
                        .append(String.format("INTO %d BUCKETS\n", descriptor.getNumBuckets()));
            }
            if (descriptor.getSerdeInfo().isSetSerializationLib()) {
                output.append("ROW FORMAT SERDE\n")
                        .append(String.format("  '%s'\n", descriptor.getSerdeInfo().getSerializationLib()));
            }
            if (descriptor.getSerdeInfo().isSetParameters()) {
                output.append("WITH SERDEPROPERTIES (\n")
                        .append(descriptor.getSerdeInfo().getParameters().entrySet().stream()
                        .map(entry -> String.format("  '%s' = '%s'", entry.getKey(), entry.getValue()))
                        .collect(Collectors.joining(",\n")))
                        .append(")\n");
            }
            if (descriptor.isSetInputFormat()) {
                output.append("STORED AS INPUTFORMAT\n")
                        .append(String.format("  '%s'\n", descriptor.getInputFormat()));
            }
            if (descriptor.isSetOutputFormat()) {
                output.append("OUTPUTFORMAT\n")
                        .append(String.format("  '%s'\n", descriptor.getOutputFormat()));
            }
            if (descriptor.isSetLocation()) {
                output.append("LOCATION\n")
                        .append(String.format("  '%s'\n", descriptor.getLocation()));
            }
            if (remoteTable.isSetParameters()) {
                output.append("TBLPROPERTIES (\n");
                Map<String, String> parameters = Maps.newHashMap();
                // Copy the parameters to a new Map to keep them unchanged.
                parameters.putAll(remoteTable.getParameters());
                if (parameters.containsKey(COMMENT)) {
                    // Comment is always added to the end of remote table parameters.
                    // It has already showed above in COMMENT section, so remove it here.
                    parameters.remove(COMMENT);
                }
                Iterator<Map.Entry<String, String>> params = parameters.entrySet().iterator();
                while (params.hasNext()) {
                    Map.Entry<String, String> param = params.next();
                    output.append(String.format("  '%s'='%s'", param.getKey(), param.getValue()));
                    if (params.hasNext()) {
                        output.append(",\n");
                    }
                }
                output.append(")");
            }
        }
        return output.toString();
    }

    public static InternalSchema getHudiTableSchema(HMSExternalTable table, boolean[] enableSchemaEvolution,
            String timestamp) {
        HoodieTableMetaClient metaClient = table.getHudiClient();
        TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);

        // Here, the timestamp should be reloaded again.
        // Because when hudi obtains the schema in `getTableAvroSchema`, it needs to read the specified commit file,
        // which is saved in the `metaClient`.
        // But the `metaClient` is obtained from cache, so the file obtained may be an old file.
        // This file may be deleted by hudi clean task, and an error will be reported.
        // So, we should reload timeline so that we can read the latest commit files.
        metaClient.reloadActiveTimeline();

        Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(timestamp);

        if (internalSchemaOption.isPresent()) {
            enableSchemaEvolution[0] = true;
            return internalSchemaOption.get();
        } else {
            try {
                // schema evolution is not enabled. (hoodie.schema.on.read.enable = false).
                enableSchemaEvolution[0] = false;
                // AvroInternalSchemaConverter.convert() will generator field id.
                return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema(true));
            } catch (Exception e) {
                throw new RuntimeException("Cannot get hudi table schema.", e);
            }
        }
    }

    public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> action) {
        // if hive config is not ready, then use hadoop kerberos to login
        AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf);
        HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
        try {
            return hadoopAuthenticator.doAs(action);
        } catch (IOException e) {
            LOG.warn("HiveMetaStoreClientHelper ugiDoAs failed.", e);
            throw new RuntimeException(e);
        }
    }

    public static Configuration getConfiguration(HMSExternalTable table) {
        return table.getCatalog().getConfiguration();
    }

    public static Optional<String> getSerdeProperty(Table table, String key) {
        String valueFromSd = table.getSd().getSerdeInfo().getParameters().get(key);
        String valueFromTbl = table.getParameters().get(key);
        return firstNonNullable(valueFromTbl, valueFromSd);
    }

    private static Optional<String> firstNonNullable(String... values) {
        for (String value : values) {
            if (!Strings.isNullOrEmpty(value)) {
                return Optional.of(value);
            }
        }
        return Optional.empty();
    }

    public static String firstPresentOrDefault(String defaultValue, Optional<String>... values) {
        for (Optional<String> value : values) {
            if (value.isPresent()) {
                return value.get();
            }
        }
        return defaultValue;
    }

    /**
     * Return the byte value of the number string.
     *
     * @param altValue
     *                 The string containing a number.
     */
    public static String getByte(String altValue) {
        if (altValue != null && altValue.length() > 0) {
            try {
                return Character.toString((char) ((Byte.parseByte(altValue) + 256) % 256));
            } catch (NumberFormatException e) {
                return altValue.substring(0, 1);
            }
        }
        return null;
    }
}