IvmRefreshManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv.ivm;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVAnalyzeQueryInfo;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.qe.ConnectContext;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
 * Minimal orchestration entry point for incremental refresh.
 */
public class IvmRefreshManager {
    private static final Logger LOG = LogManager.getLogger(IvmRefreshManager.class);
    private final IvmDeltaExecutor deltaExecutor;
    private IvmPlanSignature currentPlanSignatureForFallback;

    public IvmRefreshManager() {
        this(new IvmDeltaExecutor());
    }

    @VisibleForTesting
    IvmRefreshManager(IvmDeltaExecutor deltaExecutor) {
        this.deltaExecutor = Objects.requireNonNull(deltaExecutor, "deltaExecutor can not be null");
    }

    public IvmRefreshResult doRefresh(MTMV mtmv) {
        Objects.requireNonNull(mtmv, "mtmv can not be null");
        currentPlanSignatureForFallback = null;
        IvmRefreshResult precheckResult = precheck(mtmv);
        if (!precheckResult.isSuccess()) {
            LOG.warn("IVM precheck failed for mv={}, result={}", mtmv.getName(), precheckResult);
            return precheckResult;
        }
        final IvmRefreshContext context;
        try {
            context = buildRefreshContext(mtmv);
        } catch (Exception e) {
            IvmRefreshResult result = IvmRefreshResult.fallback(
                    IvmFailureReason.SNAPSHOT_ALIGNMENT_UNSUPPORTED, e.getMessage());
            LOG.warn("IVM context build failed for mv={}, result={}", mtmv.getName(), result);
            return result;
        }
        return doRefreshInternal(context);
    }

    @VisibleForTesting
    IvmRefreshResult precheck(MTMV mtmv) {
        Objects.requireNonNull(mtmv, "mtmv can not be null");
        if (mtmv.getIvmInfo().isRunningIvmRefresh()) {
            return IvmRefreshResult.fallback(IvmFailureReason.PREVIOUS_RUN_INCOMPLETE,
                    "A previous incremental refresh did not complete; full refresh is required");
        }
        if (mtmv.getIvmInfo().isBinlogBroken()) {
            return IvmRefreshResult.fallback(IvmFailureReason.BINLOG_BROKEN,
                    "Stream binlog is marked as broken");
        }
        // return checkStreamSupport(mtmv);
        return IvmRefreshResult.success();
    }

    @VisibleForTesting
    IvmRefreshContext buildRefreshContext(MTMV mtmv) throws Exception {
        ConnectContext connectContext = MTMVPlanUtil.createMTMVContext(mtmv,
                MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
        return new IvmRefreshContext(mtmv, connectContext);
    }

    @VisibleForTesting
    List<Command> analyzeDeltaCommands(IvmRefreshContext context) throws Exception {
        MTMV mtmv = context.getMtmv();
        MTMVAnalyzeQueryInfo queryInfo = MTMVPlanUtil.analyzeQueryWithSql(
                mtmv, context.getConnectContext(), true);
        validatePlanSignature(mtmv, queryInfo);
        IvmNormalizeResult normalizeResult = queryInfo.getIvmNormalizeResult();
        Plan normalizedPlan = queryInfo.getIvmNormalizedPlan();
        if (normalizedPlan == null) {
            return Collections.emptyList();
        }

        IvmRefreshContext rewriteCtx = new IvmRefreshContext(
                mtmv, context.getConnectContext(), normalizeResult);
        return new IvmDeltaRewriter().rewrite(normalizedPlan, rewriteCtx);
    }

    /**
     * Builds the IVM normalized plan and all dry-run delta plans for EXPLAIN REFRESH.
     * This method does not mutate persisted IVM state and intentionally includes
     * no-op streams so users can inspect every delta plan shape.
     */
    public IvmRefreshExplainResult explainRefresh(MTMV mtmv) throws Exception {
        Objects.requireNonNull(mtmv, "mtmv can not be null");
        IvmRefreshContext context = buildRefreshContext(mtmv);
        MTMVAnalyzeQueryInfo queryInfo = MTMVPlanUtil.analyzeQueryWithSql(
                mtmv, context.getConnectContext(), true);
        validatePlanSignature(mtmv, queryInfo);
        IvmNormalizeResult normalizeResult = queryInfo.getIvmNormalizeResult();
        Plan normalizedPlan = queryInfo.getIvmNormalizedPlan();
        if (normalizedPlan == null) {
            throw new AnalysisException("IVM normalized plan is empty");
        }

        IvmRefreshContext rewriteCtx = new IvmRefreshContext(
                mtmv, context.getConnectContext(), normalizeResult);
        IvmDeltaRewriter rewriter = new IvmDeltaRewriter();
        Plan mergedDeltaPlan = rewriter.generateMergedDeltaPlan(normalizedPlan, rewriteCtx,
                scan -> rewriter.isExcludedTriggerTable(scan, mtmv.getExcludedTriggerTables()), true);
        return new IvmRefreshExplainResult(normalizedPlan, mergedDeltaPlan);
    }

    @VisibleForTesting
    void validatePlanSignature(MTMV mtmv, MTMVAnalyzeQueryInfo queryInfo) {
        IvmNormalizeResult normalizeResult = queryInfo.getIvmNormalizeResult();
        IvmPlanSignature currentSignature = normalizeResult == null ? null : normalizeResult.getPlanSignature();
        currentPlanSignatureForFallback = currentSignature;
        IvmInfo ivmInfo = mtmv.getIvmInfo();
        String storedSignature = ivmInfo.getPlanSignature();
        boolean signatureMatched = currentSignature != null
                && Objects.equals(storedSignature, currentSignature.getSha256());
        if (signatureMatched) {
            return;
        }
        LOG.info("IVM layout signature mismatch for mv={}, storedSignature={}, currentSignature={}, "
                        + "currentCanonicalLayout={}",
                mtmv.getName(), storedSignature,
                currentSignature == null ? "null" : currentSignature.getSha256(),
                currentSignature == null ? "null" : currentSignature.getCanonicalString());
        String detail = "IVM layout signature mismatch for mv=" + mtmv.getName()
                + ", storedSignature=" + storedSignature
                + ", currentSignature=" + (currentSignature == null ? "null" : currentSignature.getSha256())
                + ". Run a full refresh to rebuild IVM layout baseline.";
        throw new IvmException(IvmFailureReason.PLAN_SIGNATURE_MISMATCH, detail);
    }


    private IvmRefreshResult doRefreshInternal(IvmRefreshContext context) {
        Objects.requireNonNull(context, "context can not be null");
        MTMV mtmv = context.getMtmv();

        // Run Nereids with IVM rewrite enabled ��� per-pattern delta rules write bundles to CascadesContext
        List<Command> commands;
        try {
            commands = analyzeDeltaCommands(context);
        } catch (IvmException e) {
            // Analysis has not written MV data yet, so unsupported IVM patterns
            // can be represented as a fallback result for the task planner. Preserve
            // the typed failure reason so MTMVTask can decide whether ordinary partition
            // fallback is enough or a full layout-baseline rebuild is required.
            IvmPlanSignature currentSignature = e.getFailureReason() == IvmFailureReason.PLAN_SIGNATURE_MISMATCH
                    ? currentPlanSignatureForFallback : null;
            IvmRefreshResult result = IvmRefreshResult.fallback(
                    e.getFailureReason(), e.getMessage(), currentSignature);
            LOG.warn("IVM plan analysis failed for mv={}, result={}", mtmv.getName(), result, e);
            return result;
        } catch (Exception e) {
            String detail = e.getMessage() != null ? e.getMessage()
                    : e.getClass().getName() + " (no message)";
            // Unknown analysis errors are still pre-execution failures. Return a
            // fallback result instead of throwing so AUTO/INCREMENTAL FALLBACK
            // can try PARTITIONS/COMPLETE.
            IvmRefreshResult result = IvmRefreshResult.fallback(
                    IvmFailureReason.PLAN_PATTERN_UNSUPPORTED, detail);
            LOG.warn("IVM plan analysis failed for mv={}, result={}", mtmv.getName(), result, e);
            return result;
        }

        if (commands == null || commands.isEmpty()) {
            // All base tables are up to date ��� no delta to apply. This is a success (no-op).
            LOG.info("IVM no delta commands for mv={} (all base tables up to date)", mtmv.getName());
            return IvmRefreshResult.success();
        }

        // Mark incremental refresh in progress and persist BEFORE execution.
        // If FE crashes during execution, on restart the flag triggers full refresh.
        IvmInfo ivmInfo = mtmv.getIvmInfo();
        ivmInfo.setRunningIvmRefresh(true);
        persistIvmInfo(mtmv, ivmInfo);

        // Consume one ExprId from the analysis StatementContext to obtain the next safe start
        // value for execution. This prevents ExprId collisions between plan-embedded ExprIds
        // (allocated during analyzeDeltaCommandBundles) and new ExprIds allocated during
        // bundle execution (in a fresh StatementContext). See: apache/doris#58494.
        // StatementContext may be null in unit-test paths where analyzeDeltaCommandBundles is mocked out;
        // in that case start from 0 (safe because no real plan ExprIds exist).
        StatementContext analysisStmtCtx =
                context.getConnectContext().getStatementContext();
        int exprIdStart = analysisStmtCtx != null
                ? analysisStmtCtx.getNextExprId().asInt() : 0;
        try {
            deltaExecutor.execute(context, commands, exprIdStart);
        } catch (Exception e) {
            // Leave runningIvmRefresh=true ��� the next task will detect this and
            // require full refresh recovery, which resets the flag on success.
            // Do not return a fallback result here: delta commands are executed
            // one by one and may already have partially modified the MV.
            String detail = e.getMessage() != null ? e.getMessage()
                    : e.getClass().getName() + " (no message)";
            LOG.warn("IVM execution failed for mv={}, detail={}", mtmv.getName(), detail, e);
            throw new IvmException(IvmFailureClassifier.classifyExecutionFailure(detail)
                    .orElse(IvmFailureReason.INCREMENTAL_EXECUTION_FAILED), detail);
        }

        // Advance consumedTso to latestTso for all base tables and clear the flag,
        // persisting everything in one editlog entry.
        advanceStreamOffsetAndClearFlag(mtmv);

        return IvmRefreshResult.success();
    }

    /**
     * After successful bundle execution, advances each base table's stream offset
     * and clears the runningIvmRefresh flag, then persists via editlog.
     *
     * TODO: Implement stream offset advancement via OlapTableStream.unprotectedUpdateStreamUpdate()
     * once streams are auto-created in Phase 1.
     */
    private void advanceStreamOffsetAndClearFlag(MTMV mtmv) {
        IvmInfo ivmInfo = mtmv.getIvmInfo();
        // TODO: advance stream offsets for each base table
        ivmInfo.setRunningIvmRefresh(false);
        persistIvmInfo(mtmv, ivmInfo);
    }

    /**
     * Persists the IvmInfo via the AlterMTMV editlog mechanism.
     * Package-private so tests can override to avoid Env dependency.
     */
    @VisibleForTesting
    void persistIvmInfo(MTMV mtmv, IvmInfo ivmInfo) {
        TableNameInfo tableName = new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName());
        Env.getCurrentEnv().alterMTMVIvmInfo(tableName, ivmInfo);
    }

    /**
     * Resets IVM state after a successful full (COMPLETE) refresh. Called from MTMVTask
     * when the partition-based refresh succeeds and a previous IVM run left
     * {@code runningIvmRefresh=true}. Resets each base table's stream offset to the
     * pre-captured snapshot TSO and clears the flag.
     *
     * TODO: Implement stream offset reset via OlapTableStream.unprotectedUpdateStreamUpdate()
     * once streams are auto-created in Phase 1. For now, just clear the flag.
     */
    public static void resetIvmStateAfterFullRefresh(MTMV mtmv,
            Map<BaseTableInfo, Long> capturedTsos) {
        IvmInfo ivmInfo = mtmv.getIvmInfo();
        clearRunningIvmRefreshAfterFullRefresh(ivmInfo);
        TableNameInfo tableName = new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName());
        Env.getCurrentEnv().alterMTMVIvmInfo(tableName, ivmInfo);
        LOG.info("IVM state reset after full refresh for mv={}", mtmv.getName());
    }

    public static void clearRunningIvmRefreshAfterFullRefresh(MTMV mtmv) {
        IvmInfo ivmInfo = mtmv.getIvmInfo();
        clearRunningIvmRefreshAfterFullRefresh(ivmInfo);
        TableNameInfo tableName = new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName());
        Env.getCurrentEnv().alterMTMVIvmInfo(tableName, ivmInfo);
        LOG.info("IVM running refresh flag cleared after full refresh for mv={}", mtmv.getName());
    }

    public static void updatePlanSignatureAfterFullRefresh(MTMV mtmv, String planSignature,
            String canonicalString) {
        IvmInfo ivmInfo = mtmv.getIvmInfo();
        ivmInfo.setPlanSignature(planSignature);
        TableNameInfo tableName = new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName());
        Env.getCurrentEnv().alterMTMVIvmInfo(tableName, ivmInfo);
        LOG.info("IVM layout signature baseline updated after full refresh for mv={}, signature={}, "
                        + "canonicalLayout={}",
                mtmv.getName(), planSignature, canonicalString == null ? "null" : canonicalString);
    }

    @VisibleForTesting
    static void clearRunningIvmRefreshAfterFullRefresh(IvmInfo ivmInfo) {
        ivmInfo.setRunningIvmRefresh(false);
    }

    // resetIvmStateAfterFullRefresh(IvmInfo, Map) removed ��� stream offsets are now
    // reset directly via OlapTableStream.unprotectedUpdateStreamUpdate() in the public
    // resetIvmStateAfterFullRefresh(MTMV, Map) method.

    /**
     * Captures the current visible TSO for each base table. Should be called
     * BEFORE a full refresh executes, so the captured values represent the snapshot
     * that the refresh will read. On failure, logs a warning and returns an empty map.
     *
     * TODO: Update to get table list from MTMV relation (not IvmStreamRef) once
     * streams are auto-created in Phase 1.
     */
    public static Map<BaseTableInfo, Long> captureBaseTableTsos(MTMV mtmv) {
        Map<BaseTableInfo, Long> result = new HashMap<>();
        Set<BaseTableInfo> baseTables = getBaseTablesForIvmState(mtmv);
        if (baseTables == null || baseTables.isEmpty()) {
            return result;
        }
        for (BaseTableInfo tableInfo : baseTables) {
            try {
                TableIf table = MTMVUtil.getTable(tableInfo);
                if (table instanceof OlapTable) {
                    result.put(tableInfo, ((OlapTable) table).getVisibleTso());
                }
            } catch (Exception e) {
                LOG.warn("IVM: failed to capture TSO for table {} before full refresh: {}. "
                        + "IVM state reset will be skipped.", tableInfo, e.getMessage());
                return Collections.emptyMap();
            }
        }
        return result;
    }

    private static Set<BaseTableInfo> getBaseTablesForIvmState(MTMV mtmv) {
        MTMVRelation relation = mtmv.getRelation();
        return relation == null ? null : relation.getBaseTablesOneLevel();
    }

}