CreateMTMVCommand.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.catalog.stream.BaseTableStream.StreamScanType;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mtmv.ivm.IvmUtil;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateStreamInfo;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
 * create multi table materialized view
 */
public class CreateMTMVCommand extends Command implements ForwardWithSync {

    public static final Logger LOG = LogManager.getLogger(CreateMTMVCommand.class);
    private final CreateMTMVInfo createMTMVInfo;

    /**
     * constructor
     */
    public CreateMTMVCommand(CreateMTMVInfo createMTMVInfo) {
        super(PlanType.CREATE_MTMV_COMMAND);
        this.createMTMVInfo = Objects.requireNonNull(createMTMVInfo, "require CreateMTMVInfo object");
    }

    @Override
    public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
        createMTMVInfo.analyze(ctx);
        Env.getCurrentEnv().createTable(this.createMTMVInfo);
        List<String> createdStreamNames = new ArrayList<>();
        try {
            createIvmStreams(ctx, createdStreamNames);
        } catch (Exception e) {
            // Rollback: drop any streams we already created, then force-drop the MTMV
            dropStreamsForce(createdStreamNames);
            dropMtmvForce();
            throw e;
        }
    }

    /**
     * If the MTMV enables IVM (explicit INCREMENTAL refresh), automatically
     * creates a stream for each base table so the delta rewrite path can
     * read binlog data through the existing stream scan infrastructure.
     */
    private void createIvmStreams(ConnectContext ctx,
            List<String> createdStreamNames) throws Exception {
        if (!createMTMVInfo.isEnableIvm()) {
            return;
        }
        MTMVRelation relation = createMTMVInfo.getRelation();
        if (relation == null) {
            LOG.warn("IVM: no relation found for MTMV {}, skip stream creation",
                    createMTMVInfo.getTableName());
            return;
        }
        Set<BaseTableInfo> baseTables = relation.getBaseTables();
        if (baseTables == null || baseTables.isEmpty()) {
            return;
        }
        String mvDbName = createMTMVInfo.getDbName();
        String mvName = createMTMVInfo.getTableName();
        Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(mvDbName);
        MTMV mtmv = (MTMV) db.getTableOrAnalysisException(mvName);
        Set<TableNameInfo> excluded = mtmv.getExcludedTriggerTables();
        for (BaseTableInfo baseTableInfo : baseTables) {
            // Skip excluded trigger tables ��� they don't participate in incremental refresh
            if (excluded != null && MTMVPartitionUtil.isTableExcluded(excluded,
                    new TableNameInfo(baseTableInfo.getCtlName(),
                            baseTableInfo.getDbName(), baseTableInfo.getTableName()))) {
                LOG.info("IVM: skipping stream creation for excluded trigger table {}",
                        baseTableInfo.getTableName());
                continue;
            }
            TableIf table = MTMVUtil.getTable(baseTableInfo);
            createTableStream(ctx, db, mtmv, table, baseTableInfo.getDbName(), createdStreamNames);
        }
    }

    /**
     * Creates a stream for a single base table of an IVM MTMV.
     * Drops any existing stream first, then creates a fresh one.
     *
     * @param ctx connection context
     * @param mvDb database containing the MTMV
     * @param mtmv the MTMV
     * @param baseTable the base table to create stream for
     * @param baseTableDbName the database name of the base table
     * @param createdStreamNames output list to collect created stream names (for rollback)
     */
    static void createTableStream(ConnectContext ctx, Database mvDb, MTMV mtmv, TableIf baseTable,
            String baseTableDbName, List<String> createdStreamNames) throws Exception {
        String mvDbName = mvDb.getFullName();
        long mvId = mtmv.getId();
        String streamName = IvmUtil.streamName(mvId, baseTable.getName());
        TableNameInfo streamTableName = new TableNameInfo(
                InternalCatalog.INTERNAL_CATALOG_NAME, mvDbName, streamName);
        TableNameInfo baseTableName = new TableNameInfo(
                InternalCatalog.INTERNAL_CATALOG_NAME, baseTableDbName, baseTable.getName());
        // Drop old stream if exists, so validation always runs on the fresh stream
        TableIf oldStream = mvDb.getTableNullable(streamName);
        if (oldStream != null) {
            Env.getCurrentInternalCatalog().dropTableWithoutCheck(
                    mvDb, (Table) oldStream, false, true /* forceDrop */);
        }
        Map<String, String> streamProps = new HashMap<>();
        streamProps.put(PropertyAnalyzer.PROPERTIES_STREAM_SHOW_INITIAL_ROWS, "true");
        if (baseTable instanceof OlapTable && ((OlapTable) baseTable).isUniqKeyMergeOnWrite()) {
            streamProps.put(PropertyAnalyzer.PROPERTIES_STREAM_TYPE,
                    StreamScanType.MIN_DELTA.name().toLowerCase());
        }
        CreateStreamInfo streamInfo = new CreateStreamInfo(
                false /* ifNotExists */, false /* orReplace */,
                streamTableName, baseTableName,
                streamProps, "" /* comment */);
        streamInfo.validate(ctx);
        Env.getCurrentEnv().getInternalCatalog().createTableStream(
                new CreateStreamCommand(streamInfo));
        if (createdStreamNames != null) {
            createdStreamNames.add(streamName);
        }
        LOG.info("IVM: auto-created stream {} for MTMV {} base table {}",
                streamName, mvId, baseTable.getName());
    }

    private void dropStreamsForce(List<String> streamNames) {
        String mvDbName = createMTMVInfo.getDbName();
        Database db = null;
        try {
            db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(mvDbName);
        } catch (Exception ignored) {
            return;
        }
        for (String streamName : streamNames) {
            try {
                TableIf t = db.getTableOrAnalysisException(streamName);
                Env.getCurrentInternalCatalog().dropTableWithoutCheck(
                        db, (Table) t, false, true /* forceDrop */);
            } catch (Exception ignored) {
                LOG.warn("IVM: failed to force-drop stream {} during rollback, ignored", streamName);
            }
        }
    }

    private void dropMtmvForce() {
        try {
            Database db = Env.getCurrentInternalCatalog()
                    .getDbOrAnalysisException(createMTMVInfo.getDbName());
            TableIf t = db.getTableOrAnalysisException(
                    createMTMVInfo.getTableName());
            Env.getCurrentInternalCatalog().dropTableWithoutCheck(
                    db, (Table) t, false, true /* forceDrop */);
        } catch (Exception ignored) {
            LOG.warn("IVM: failed to force-drop MTMV {} during rollback, ignored",
                    createMTMVInfo.getTableName());
        }
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitCreateMTMVCommand(this, context);
    }

    @Override
    public StmtType stmtType() {
        return StmtType.CREATE;
    }

    public CreateMTMVInfo getCreateMTMVInfo() {
        return createMTMVInfo;
    }

}