MaxComputeExternalTable.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.maxcompute;

import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.thrift.TMCTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Table;
import com.aliyun.odps.type.ArrayTypeInfo;
import com.aliyun.odps.type.CharTypeInfo;
import com.aliyun.odps.type.DecimalTypeInfo;
import com.aliyun.odps.type.MapTypeInfo;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.VarcharTypeInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * MaxCompute external table.
 */
public class MaxComputeExternalTable extends ExternalTable {
    public MaxComputeExternalTable(long id, String name, String remoteName, MaxComputeExternalCatalog catalog,
            MaxComputeExternalDatabase db) {
        super(id, name, remoteName, catalog, db, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
    }

    private Map<String, com.aliyun.odps.Column> columnNameToOdpsColumn = new HashMap();

    @Override
    protected synchronized void makeSureInitialized() {
        super.makeSureInitialized();
        if (!objectCreated) {
            objectCreated = true;
        }
    }

    @Override
    public boolean supportInternalPartitionPruned() {
        return true;
    }

    @Override
    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
        return getPartitionColumns();
    }

    public List<Column> getPartitionColumns() {
        makeSureInitialized();
        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
        return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getPartitionColumns())
                .orElse(Collections.emptyList());
    }

    @Override
    public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
        if (getPartitionColumns().isEmpty()) {
            return Collections.emptyMap();
        }

        TablePartitionValues tablePartitionValues = getPartitionValues();
        Map<Long, PartitionItem> idToPartitionItem = tablePartitionValues.getIdToPartitionItem();
        Map<Long, String> idToNameMap = tablePartitionValues.getPartitionIdToNameMap();

        Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
        for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
            nameToPartitionItem.put(idToNameMap.get(entry.getKey()), entry.getValue());
        }
        return nameToPartitionItem;
    }

    private TablePartitionValues getPartitionValues() {
        makeSureInitialized();
        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
        if (!schemaCacheValue.isPresent()) {
            return new TablePartitionValues();
        }
        Table odpsTable = ((MaxComputeSchemaCacheValue) schemaCacheValue.get()).getOdpsTable();
        String projectName = odpsTable.getProject();
        String tableName = odpsTable.getName();
        MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr()
                .getMaxComputeMetadataCache(catalog.getId());
        return metadataCache.getCachedPartitionValues(
                new MaxComputeCacheKey(projectName, tableName),
                key -> loadPartitionValues((MaxComputeSchemaCacheValue) schemaCacheValue.get()));
    }

    private TablePartitionValues loadPartitionValues(MaxComputeSchemaCacheValue schemaCacheValue) {
        List<String> partitionSpecs = schemaCacheValue.getPartitionSpecs();
        List<Type> partitionTypes = schemaCacheValue.getPartitionTypes();
        List<String> partitionColumnNames = schemaCacheValue.getPartitionColumnNames();
        TablePartitionValues partitionValues = new TablePartitionValues();
        partitionValues.addPartitions(partitionSpecs,
                partitionSpecs.stream()
                        .map(p -> parsePartitionValues(partitionColumnNames, p))
                        .collect(Collectors.toList()),
                partitionTypes, Collections.nCopies(partitionSpecs.size(), 0L));
        return partitionValues;
    }

    /**
     * parse all values from partitionPath to a single list.
     * In MaxCompute : Support special characters : _$#.!@
     * Ref : MaxCompute Error Code: ODPS-0130071  Invalid partition value.
     *
     * @param partitionColumns partitionColumns can contain the part1,part2,part3...
     * @param partitionPath partitionPath format is like the 'part1=123/part2=abc/part3=1bc'
     * @return all values of partitionPath
     */
    private static List<String> parsePartitionValues(List<String> partitionColumns, String partitionPath) {
        String[] partitionFragments = partitionPath.split("/");
        if (partitionFragments.length != partitionColumns.size()) {
            throw new RuntimeException("Failed to parse partition values of path: " + partitionPath);
        }
        List<String> partitionValues = new ArrayList<>(partitionFragments.length);
        for (int i = 0; i < partitionFragments.length; i++) {
            String prefix = partitionColumns.get(i) + "=";
            if (partitionFragments[i].startsWith(prefix)) {
                partitionValues.add(partitionFragments[i].substring(prefix.length()));
            } else {
                partitionValues.add(partitionFragments[i]);
            }
        }
        return partitionValues;
    }

    public Map<String, com.aliyun.odps.Column> getColumnNameToOdpsColumn() {
        return columnNameToOdpsColumn;
    }

    @Override
    public Optional<SchemaCacheValue> initSchema() {
        // this method will be called at semantic parsing.
        makeSureInitialized();
        Table odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(dbName, name);
        List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();


        for (com.aliyun.odps.Column column : columns) {
            columnNameToOdpsColumn.put(column.getName(), column);
        }

        List<Column> schema = Lists.newArrayListWithCapacity(columns.size());
        for (com.aliyun.odps.Column field : columns) {
            schema.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
                    field.isNullable(), field.getComment(), true, -1));
        }

        List<com.aliyun.odps.Column> partitionColumns = odpsTable.getSchema().getPartitionColumns();
        List<String> partitionColumnNames = new ArrayList<>(partitionColumns.size());
        List<Type> partitionTypes = new ArrayList<>(partitionColumns.size());

        // sort partition columns to align partitionTypes and partitionName.
        List<Column> partitionDorisColumns = new ArrayList<>();
        for (com.aliyun.odps.Column partColumn : partitionColumns) {
            Type partitionType = mcTypeToDorisType(partColumn.getTypeInfo());
            Column dorisCol = new Column(partColumn.getName(), partitionType, true, null,
                    true, partColumn.getComment(), true, -1);

            columnNameToOdpsColumn.put(partColumn.getName(), partColumn);
            partitionColumnNames.add(partColumn.getName());
            partitionDorisColumns.add(dorisCol);
            partitionTypes.add(partitionType);
            schema.add(dorisCol);
        }

        List<String> partitionSpecs;
        if (!partitionColumns.isEmpty()) {
            partitionSpecs = odpsTable.getPartitions().stream()
                    .map(e -> e.getPartitionSpec().toString(false, true))
                    .collect(Collectors.toList());
        } else {
            partitionSpecs = ImmutableList.of();
        }

        return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable, partitionColumnNames,
                partitionSpecs, partitionDorisColumns, partitionTypes));
    }

    private Type mcTypeToDorisType(TypeInfo typeInfo) {
        OdpsType odpsType = typeInfo.getOdpsType();
        switch (odpsType) {
            case VOID: {
                return Type.NULL;
            }
            case BOOLEAN: {
                return Type.BOOLEAN;
            }
            case TINYINT: {
                return Type.TINYINT;
            }
            case SMALLINT: {
                return Type.SMALLINT;
            }
            case INT: {
                return Type.INT;
            }
            case BIGINT: {
                return Type.BIGINT;
            }
            case CHAR: {
                CharTypeInfo charType = (CharTypeInfo) typeInfo;
                return ScalarType.createChar(charType.getLength());
            }
            case STRING: {
                return ScalarType.createStringType();
            }
            case VARCHAR: {
                VarcharTypeInfo varcharType = (VarcharTypeInfo) typeInfo;
                return ScalarType.createVarchar(varcharType.getLength());
            }
            case JSON: {
                return Type.UNSUPPORTED;
                // return Type.JSONB;
            }
            case FLOAT: {
                return Type.FLOAT;
            }
            case DOUBLE: {
                return Type.DOUBLE;
            }
            case DECIMAL: {
                DecimalTypeInfo decimal = (DecimalTypeInfo) typeInfo;
                return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale());
            }
            case DATE: {
                return ScalarType.createDateV2Type();
            }
            case DATETIME: {
                return ScalarType.createDatetimeV2Type(3);
            }
            case TIMESTAMP:
            case TIMESTAMP_NTZ: {
                return ScalarType.createDatetimeV2Type(6);
            }
            case ARRAY: {
                ArrayTypeInfo arrayType = (ArrayTypeInfo) typeInfo;
                Type innerType = mcTypeToDorisType(arrayType.getElementTypeInfo());
                return ArrayType.create(innerType, true);
            }
            case MAP: {
                MapTypeInfo mapType = (MapTypeInfo) typeInfo;
                return new MapType(mcTypeToDorisType(mapType.getKeyTypeInfo()),
                        mcTypeToDorisType(mapType.getValueTypeInfo()));
            }
            case STRUCT: {
                ArrayList<StructField> fields = new ArrayList<>();
                StructTypeInfo structType = (StructTypeInfo) typeInfo;
                List<String> fieldNames = structType.getFieldNames();
                List<TypeInfo> fieldTypeInfos = structType.getFieldTypeInfos();
                for (int i = 0; i < structType.getFieldCount(); i++) {
                    Type innerType = mcTypeToDorisType(fieldTypeInfos.get(i));
                    fields.add(new StructField(fieldNames.get(i), innerType));
                }
                return new StructType(fields);
            }
            case BINARY:
            case INTERVAL_DAY_TIME:
            case INTERVAL_YEAR_MONTH:
                return Type.UNSUPPORTED;
            default:
                throw new IllegalArgumentException("Cannot transform unknown type: " + odpsType);
        }
    }

    @Override
    public TTableDescriptor toThrift() {
        // ak sk endpoint project  quota
        List<Column> schema = getFullSchema();
        TMCTable tMcTable = new TMCTable();
        MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog);

        tMcTable.setAccessKey(mcCatalog.getAccessKey());
        tMcTable.setSecretKey(mcCatalog.getSecretKey());
        tMcTable.setOdpsUrl("deprecated");
        tMcTable.setRegion("deprecated");
        tMcTable.setEndpoint(mcCatalog.getEndpoint());
        // use mc project as dbName
        tMcTable.setProject(dbName);
        tMcTable.setQuota(mcCatalog.getQuota());

        tMcTable.setTunnelUrl("deprecated");
        tMcTable.setProject("deprecated");
        tMcTable.setTable(name);
        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MAX_COMPUTE_TABLE,
                schema.size(), 0, getName(), dbName);
        tTableDescriptor.setMcTable(tMcTable);
        return tTableDescriptor;
    }

    public Table getOdpsTable() {
        makeSureInitialized();
        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
        return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getOdpsTable())
                .orElse(null);
    }
}