LakeSoulExternalTable.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.lakesoul;

import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
import org.apache.doris.thrift.TLakeSoulTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.google.common.collect.Lists;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class LakeSoulExternalTable extends ExternalTable {
    private static final Logger LOG = LogManager.getLogger(LakeSoulExternalTable.class);
    public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;

    public final String tableId;

    public LakeSoulExternalTable(long id, String name, String remoteName, LakeSoulExternalCatalog catalog,
            LakeSoulExternalDatabase db) {
        super(id, name, remoteName, catalog, db, TableType.LAKESOUl_EXTERNAL_TABLE);
        TableInfo tableInfo = getLakeSoulTableInfo();
        if (tableInfo == null) {
            throw new RuntimeException(String.format("LakeSoul table %s.%s does not exist", dbName, name));
        }
        tableId = tableInfo.getTableId();
    }

    @Override
    public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
        makeSureInitialized();
        return new ExternalAnalysisTask(info);
    }

    private Type arrowFiledToDorisType(Field field) {
        ArrowType dt = field.getType();
        if (dt instanceof ArrowType.Bool) {
            return Type.BOOLEAN;
        } else if (dt instanceof ArrowType.Int) {
            ArrowType.Int type = (ArrowType.Int) dt;
            switch (type.getBitWidth()) {
                case 8:
                    return Type.TINYINT;
                case 16:
                    return Type.SMALLINT;
                case 32:
                    return Type.INT;
                case 64:
                    return Type.BIGINT;
                default:
                    throw new IllegalArgumentException("Invalid integer bit width: "
                            + type.getBitWidth()
                            + " for LakeSoul table: "
                            + getTableIdentifier());
            }
        } else if (dt instanceof ArrowType.FloatingPoint) {
            ArrowType.FloatingPoint type = (ArrowType.FloatingPoint) dt;
            switch (type.getPrecision()) {
                case SINGLE:
                    return Type.FLOAT;
                case DOUBLE:
                    return Type.DOUBLE;
                default:
                    throw new IllegalArgumentException("Invalid floating point precision: "
                            + type.getPrecision()
                            + " for LakeSoul table: "
                            + getTableIdentifier());
            }
        } else if (dt instanceof ArrowType.Utf8) {
            return Type.STRING;
        } else if (dt instanceof ArrowType.Decimal) {
            ArrowType.Decimal decimalType = (ArrowType.Decimal) dt;
            return ScalarType.createDecimalType(PrimitiveType.DECIMAL64, decimalType.getPrecision(),
                    decimalType.getScale());
        } else if (dt instanceof ArrowType.Date) {
            return ScalarType.createDateV2Type();
        } else if (dt instanceof ArrowType.Timestamp) {
            ArrowType.Timestamp tsType = (ArrowType.Timestamp) dt;
            int scale = LAKESOUL_TIMESTAMP_SCALE_MS;
            switch (tsType.getUnit()) {
                case SECOND:
                    scale = 0;
                    break;
                case MILLISECOND:
                    scale = 3;
                    break;
                case MICROSECOND:
                    scale = 6;
                    break;
                case NANOSECOND:
                    scale = 9;
                    break;
                default:
                    break;
            }
            return ScalarType.createDatetimeV2Type(scale);
        } else if (dt instanceof ArrowType.List) {
            List<Field> children = field.getChildren();
            Preconditions.checkArgument(children.size() == 1,
                    "Lists have one child Field. Found: %s", children.isEmpty() ? "none" : children);
            return ArrayType.create(arrowFiledToDorisType(children.get(0)), children.get(0).isNullable());
        } else if (dt instanceof ArrowType.Struct) {
            List<Field> children = field.getChildren();
            return new StructType(children.stream().map(this::arrowFiledToDorisType).collect(Collectors.toList()));
        }
        throw new IllegalArgumentException("Cannot transform type "
            + dt
            + " to doris type"
            + " for LakeSoul table "
            + getTableIdentifier());
    }

    @Override
    public TTableDescriptor toThrift() {
        List<Column> schema = getFullSchema();
        TLakeSoulTable tLakeSoulTable = new TLakeSoulTable();
        tLakeSoulTable.setDbName(dbName);
        tLakeSoulTable.setTableName(name);
        tLakeSoulTable.setProperties(new HashMap<>());
        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
                getName(), dbName);
        tTableDescriptor.setLakesoulTable(tLakeSoulTable);
        return tTableDescriptor;

    }

    @Override
    public Optional<SchemaCacheValue> initSchema() {
        TableInfo tableInfo = ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name);
        String tableSchema = tableInfo.getTableSchema();
        DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
        Schema schema;
        LOG.info("tableSchema={}", tableSchema);
        try {
            schema = Schema.fromJSON(tableSchema);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.getFields().size());
        for (Field field : schema.getFields()) {
            boolean isKey =
                    partitionKeys.primaryKeys.contains(field.getName())
                    || partitionKeys.rangeKeys.contains(field.getName());
            tmpSchema.add(new Column(field.getName(), arrowFiledToDorisType(field),
                    isKey,
                    null, field.isNullable(),
                    field.getMetadata().getOrDefault("comment", null),
                    true, schema.getFields().indexOf(field)));
        }
        return Optional.of(new SchemaCacheValue(tmpSchema));
    }

    public TableInfo getLakeSoulTableInfo() {
        return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name);
    }

    public List<PartitionInfo> listPartitionInfo() {
        return ((LakeSoulExternalCatalog) catalog).listPartitionInfo(tableId);
    }

    public String tablePath() {
        return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name).getTablePath();
    }

    public Map<String, String> getHadoopProperties() {
        return catalog.getCatalogProperty().getHadoopProperties();
    }

    public String getTableIdentifier() {
        return dbName + "." + name;
    }
}