MTMVRefreshPartitionSnapshot.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

public class MTMVRefreshPartitionSnapshot {
    private static final Logger LOG = LogManager.getLogger(MTMV.class);
    @SerializedName("p")
    private Map<String, MTMVSnapshotIf> partitions;
    // old version only persist table id, we need `BaseTableInfo`, `tables` only for compatible old version
    @SerializedName("t")
    @Deprecated
    private Map<Long, MTMVSnapshotIf> tables;
    @SerializedName("ti")
    private Map<BaseTableInfo, MTMVSnapshotIf> tablesInfo;

    public MTMVRefreshPartitionSnapshot() {
        this.partitions = Maps.newConcurrentMap();
        this.tables = Maps.newConcurrentMap();
        this.tablesInfo = Maps.newConcurrentMap();
    }

    public Map<String, MTMVSnapshotIf> getPartitions() {
        return partitions;
    }

    public MTMVSnapshotIf getTableSnapshot(BaseTableInfo table) {
        if (tablesInfo.containsKey(table)) {
            return tablesInfo.get(table);
        }
        // for compatible old version
        return tables.get(table.getTableId());
    }

    public void addTableSnapshot(BaseTableInfo baseTableInfo, MTMVSnapshotIf tableSnapshot) {
        tablesInfo.put(baseTableInfo, tableSnapshot);
        // for compatible old version
        tables.put(baseTableInfo.getTableId(), tableSnapshot);
    }

    @Override
    public String toString() {
        return "MTMVRefreshPartitionSnapshot{"
                + "partitions=" + partitions
                + ", tablesInfo=" + tablesInfo
                + '}';
    }

    public void compatible(MTMV mtmv) {
        try {
            // snapshot add partitionId resolve problem of insert overwrite
            compatiblePartitions(mtmv);
        } catch (Throwable e) {
            LOG.warn("MTMV compatiblePartitions failed, mtmv: {}", mtmv.getName(), e);
        }
        try {
            // change table id to BaseTableInfo
            compatibleTables(mtmv);
        } catch (Throwable e) {
            LOG.warn("MTMV compatibleTables failed, mtmv: {}", mtmv.getName(), e);
        }

        try {
            // snapshot add tableId resolve problem of recreate table
            compatibleTablesSnapshot();
        } catch (Throwable e) {
            LOG.warn("MTMV compatibleTables failed, mtmv: {}", mtmv.getName(), e);
        }
    }

    private void compatiblePartitions(MTMV mtmv) throws AnalysisException {
        MTMVRelatedTableIf relatedTableIf = mtmv.getMvPartitionInfo().getRelatedTable();
        // Only olapTable has historical data issues that require compatibility
        if (!(relatedTableIf instanceof OlapTable)) {
            return;
        }
        if (!checkHasDataWithoutPartitionId()) {
            return;
        }
        OlapTable relatedTable = (OlapTable) relatedTableIf;
        for (Entry<String, MTMVSnapshotIf> entry : partitions.entrySet()) {
            MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue();
            if (versionSnapshot.getId() == 0) {
                Partition partition = relatedTable.getPartition(entry.getKey());
                if (partition != null) {
                    (versionSnapshot).setId(partition.getId());
                }
            }
        }
    }

    private boolean checkHasDataWithoutPartitionId() {
        for (MTMVSnapshotIf snapshot : partitions.values()) {
            if (snapshot instanceof MTMVVersionSnapshot && ((MTMVVersionSnapshot) snapshot).getId() == 0) {
                return true;
            }
        }
        return false;
    }

    private void compatibleTablesSnapshot() {
        if (!checkHasDataWithoutTableId()) {
            return;
        }
        for (Entry<BaseTableInfo, MTMVSnapshotIf> entry : tablesInfo.entrySet()) {
            MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue();
            if (versionSnapshot.getId() == 0) {
                try {
                    TableIf table = MTMVUtil.getTable(entry.getKey());
                    versionSnapshot.setId(table.getId());
                } catch (AnalysisException e) {
                    LOG.warn("MTMV compatibleTablesSnapshot failed, can not get table by: {}", entry.getKey());
                }
            }
        }
    }

    private boolean checkHasDataWithoutTableId() {
        for (MTMVSnapshotIf snapshot : tablesInfo.values()) {
            if (snapshot instanceof MTMVVersionSnapshot && ((MTMVVersionSnapshot) snapshot).getId() == 0) {
                return true;
            }
        }
        return false;
    }

    private void compatibleTables(MTMV mtmv) {
        if (tables.size() == tablesInfo.size()) {
            return;
        }
        MTMVRelation relation = mtmv.getRelation();
        if (relation == null || CollectionUtils.isEmpty(relation.getBaseTablesOneLevel())) {
            return;
        }
        for (Entry<Long, MTMVSnapshotIf> entry : tables.entrySet()) {
            Optional<BaseTableInfo> tableInfo = getByTableId(entry.getKey(),
                    relation.getBaseTablesOneLevel());
            if (tableInfo.isPresent()) {
                tablesInfo.put(tableInfo.get(), entry.getValue());
            } else {
                LOG.warn("MTMV compatibleTables failed, tableId: {}, relationTables: {}", entry.getKey(),
                        relation.getBaseTablesOneLevel());
            }
        }
    }

    private Optional<BaseTableInfo> getByTableId(Long tableId, Set<BaseTableInfo> baseTables) {
        for (BaseTableInfo info : baseTables) {
            if (info.getTableId() == tableId) {
                return Optional.of(info);
            }
        }
        return Optional.empty();
    }
}