EngineMtmvSupport.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.metacache;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.datasource.hive.HiveEngineCache;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hudi.source.HudiEngineCache;
import org.apache.doris.datasource.iceberg.IcebergEngineCache;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergPartitionInfo;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.paimon.PaimonEngineCache;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.paimon.PaimonPartitionInfo;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;

import com.google.common.collect.BiMap;
import com.google.common.collect.Maps;
import org.apache.paimon.partition.Partition;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * Unified MTMV metadata bridge based on EngineMetaCache.
 */
public final class EngineMtmvSupport {
    private EngineMtmvSupport() {
    }

    public static Map<String, PartitionItem> getAndCopyPartitionItems(ExternalTable table,
            Optional<MvccSnapshot> snapshot) {
        return Maps.newHashMap(getPartitionItems(table, snapshot));
    }

    public static Map<String, PartitionItem> getPartitionItems(ExternalTable table, Optional<MvccSnapshot> snapshot) {
        EnginePartitionInfo partitionInfo = getEngineMetaCache(table).getPartitionInfo(table, snapshot);
        return partitionItems(partitionInfo, table);
    }

    public static MTMVSnapshotIf getPartitionSnapshot(ExternalTable table, String partitionName,
            Optional<MvccSnapshot> snapshot) throws AnalysisException {
        EngineMetaCache cache = getEngineMetaCache(table);
        EnginePartitionInfo partitionInfo = cache.getPartitionInfo(table, snapshot);
        EngineSnapshot engineSnapshot = cache.getSnapshot(table, snapshot);

        if (partitionInfo instanceof IcebergEngineCache.IcebergPartition
                && engineSnapshot instanceof IcebergEngineCache.IcebergSnapshotMeta) {
            IcebergPartitionInfo icebergPartitionInfo =
                    ((IcebergEngineCache.IcebergPartition) partitionInfo).getPartitionInfo();
            long latestSnapshotId = icebergPartitionInfo.getLatestSnapshotId(partitionName);
            if (latestSnapshotId <= 0) {
                long tableSnapshotId = ((IcebergEngineCache.IcebergSnapshotMeta) engineSnapshot)
                        .getSnapshot().getSnapshotId();
                if (tableSnapshotId <= 0) {
                    throw new AnalysisException("can not find partition: " + partitionName
                            + ", and table snapshot ID is also invalid");
                }
                return new MTMVSnapshotIdSnapshot(tableSnapshotId);
            }
            return new MTMVSnapshotIdSnapshot(latestSnapshotId);
        }

        if (partitionInfo instanceof PaimonEngineCache.PaimonPartition) {
            Map<String, Partition> nameToPartition =
                    ((PaimonEngineCache.PaimonPartition) partitionInfo).getPartitionInfo().getNameToPartition();
            Partition paimonPartition = nameToPartition.get(partitionName);
            if (paimonPartition == null) {
                throw new AnalysisException("can not find partition: " + partitionName);
            }
            return new MTMVTimestampSnapshot(paimonPartition.lastFileCreationTime());
        }

        if (partitionInfo instanceof HiveEngineCache.HivePartition && cache instanceof HiveEngineCache) {
            HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
                    ((HiveEngineCache.HivePartition) partitionInfo).getPartitionValues();
            Long partitionId = hivePartitionValues.getPartitionNameToIdMap().get(partitionName);
            if (partitionId == null) {
                throw new AnalysisException("can not find partition: " + partitionName);
            }
            List<String> partitionValues = hivePartitionValues.getPartitionValuesMap().get(partitionId);
            if (partitionValues == null) {
                throw new AnalysisException("can not find partition values for partition id: " + partitionId);
            }
            HivePartition hivePartition = ((HiveEngineCache) cache).getMetaStoreCache()
                    .getHivePartition(table, partitionValues);
            if (hivePartition == null) {
                throw new AnalysisException("can not find partition: " + partitionName);
            }
            return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime());
        }

        if (partitionInfo instanceof HudiEngineCache.HudiPartition
                && engineSnapshot instanceof HudiEngineCache.HudiSnapshotMeta) {
            TablePartitionValues hudiPartitionValues =
                    ((HudiEngineCache.HudiPartition) partitionInfo).getPartitionValues();
            if (!hudiPartitionValues.getPartitionNameToIdMap().containsKey(partitionName)) {
                throw new AnalysisException("can not find partition: " + partitionName);
            }
            Map<String, Long> partitionNameToLastModifiedMap = hudiPartitionValues.getPartitionNameToLastModifiedMap();
            long partitionTimestamp = partitionNameToLastModifiedMap == null
                    ? 0L
                    : partitionNameToLastModifiedMap.getOrDefault(partitionName, 0L);
            if (partitionTimestamp <= 0) {
                partitionTimestamp = ((HudiEngineCache.HudiSnapshotMeta) engineSnapshot).getTimestamp();
            }
            return new MTMVTimestampSnapshot(partitionTimestamp);
        }

        throw new AnalysisException("Unsupported partition snapshot conversion for table "
                + table.getNameWithFullQualifiers() + ", partitionInfo=" + partitionInfo.getClass().getSimpleName()
                + ", snapshot=" + engineSnapshot.getClass().getSimpleName());
    }

    public static MTMVSnapshotIf getTableSnapshot(ExternalTable table, Optional<MvccSnapshot> snapshot)
            throws AnalysisException {
        EngineSnapshot engineSnapshot = getEngineMetaCache(table).getSnapshot(table, snapshot);
        if (engineSnapshot instanceof IcebergEngineCache.IcebergSnapshotMeta) {
            return new MTMVSnapshotIdSnapshot(
                    ((IcebergEngineCache.IcebergSnapshotMeta) engineSnapshot).getSnapshot().getSnapshotId());
        }
        if (engineSnapshot instanceof PaimonEngineCache.PaimonSnapshotMeta) {
            return new MTMVSnapshotIdSnapshot(
                    ((PaimonEngineCache.PaimonSnapshotMeta) engineSnapshot).getSnapshot().getSnapshotId());
        }
        if (engineSnapshot instanceof HiveEngineCache.HiveSnapshotMeta) {
            HiveEngineCache.HiveSnapshotMeta hiveSnapshotMeta = (HiveEngineCache.HiveSnapshotMeta) engineSnapshot;
            return new MTMVMaxTimestampSnapshot(hiveSnapshotMeta.getPartitionName(), hiveSnapshotMeta.getTimestamp());
        }
        if (engineSnapshot instanceof HudiEngineCache.HudiSnapshotMeta) {
            return new MTMVTimestampSnapshot(((HudiEngineCache.HudiSnapshotMeta) engineSnapshot).getTimestamp());
        }
        throw new AnalysisException("Unsupported table snapshot conversion for table "
                + table.getNameWithFullQualifiers() + ", snapshot=" + engineSnapshot.getClass().getSimpleName());
    }

    public static Map<String, Partition> getPaimonPartitionSnapshot(ExternalTable table,
            Optional<MvccSnapshot> snapshot)
            throws AnalysisException {
        return getPaimonPartitionInfo(table, snapshot).getNameToPartition();
    }

    private static EngineMetaCache getEngineMetaCache(ExternalTable table) {
        ExternalMetaCacheMgr mgr = Env.getCurrentEnv().getExtMetaCacheMgr();

        if (table instanceof PaimonExternalTable) {
            return mgr.getPaimonEngineCache(table.getCatalog());
        }
        if (table instanceof IcebergExternalTable) {
            return mgr.getIcebergEngineCache(table.getCatalog());
        }
        if (table instanceof HMSExternalTable) {
            DLAType dlaType = ((HMSExternalTable) table).getDlaType();
            switch (dlaType) {
                case ICEBERG:
                    return mgr.getIcebergEngineCache(table.getCatalog());
                case HUDI:
                    return mgr.getHudiEngineCache(table.getCatalog());
                case HIVE:
                    return mgr.getHiveEngineCache(table.getCatalog());
                default:
                    break;
            }
        }
        if (table instanceof MaxComputeExternalTable) {
            return mgr.getMaxComputeEngineCache(table.getCatalog());
        }
        throw new IllegalArgumentException("Unsupported MTMV engine cache table type: "
                + table.getClass().getSimpleName() + ", table=" + table.getNameWithFullQualifiers());
    }

    private static Map<String, PartitionItem> partitionItems(EnginePartitionInfo partitionInfo, ExternalTable table) {
        if (partitionInfo instanceof IcebergEngineCache.IcebergPartition) {
            return ((IcebergEngineCache.IcebergPartition) partitionInfo).getPartitionInfo().getNameToPartitionItem();
        }
        if (partitionInfo instanceof PaimonEngineCache.PaimonPartition) {
            return ((PaimonEngineCache.PaimonPartition) partitionInfo).getPartitionInfo().getNameToPartitionItem();
        }
        if (partitionInfo instanceof HiveEngineCache.HivePartition) {
            HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
                    ((HiveEngineCache.HivePartition) partitionInfo).getPartitionValues();
            Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
            BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
            Map<String, PartitionItem> nameToPartitionItems = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
            for (Map.Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
                nameToPartitionItems.put(idToName.get(entry.getKey()), entry.getValue());
            }
            return nameToPartitionItems;
        }
        if (partitionInfo instanceof HudiEngineCache.HudiPartition) {
            TablePartitionValues hudiPartitionValues =
                    ((HudiEngineCache.HudiPartition) partitionInfo).getPartitionValues();
            Map<Long, PartitionItem> idToPartitionItem = hudiPartitionValues.getIdToPartitionItem();
            Map<Long, String> partitionIdToName = hudiPartitionValues.getPartitionIdToNameMap();
            Map<String, PartitionItem> nameToPartitionItems = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
            for (Map.Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
                nameToPartitionItems.put(partitionIdToName.get(entry.getKey()), entry.getValue());
            }
            return nameToPartitionItems;
        }
        throw new IllegalArgumentException("Unsupported partition info type for table "
                + table.getNameWithFullQualifiers() + ": " + partitionInfo.getClass().getSimpleName());
    }

    private static PaimonPartitionInfo getPaimonPartitionInfo(ExternalTable table, Optional<MvccSnapshot> snapshot)
            throws AnalysisException {
        EnginePartitionInfo partitionInfo = getEngineMetaCache(table).getPartitionInfo(table, snapshot);
        if (partitionInfo instanceof PaimonEngineCache.PaimonPartition) {
            return ((PaimonEngineCache.PaimonPartition) partitionInfo).getPartitionInfo();
        }
        throw new AnalysisException("Expected paimon partition info for table "
                + table.getNameWithFullQualifiers() + ", but got " + partitionInfo.getClass().getSimpleName());
    }
}