IvmRefreshManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.ivm;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVAnalyzeQueryInfo;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.qe.ConnectContext;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Minimal orchestration entry point for incremental refresh.
*/
public class IvmRefreshManager {
private static final Logger LOG = LogManager.getLogger(IvmRefreshManager.class);
private final IvmDeltaExecutor deltaExecutor;
public IvmRefreshManager() {
this(new IvmDeltaExecutor());
}
@VisibleForTesting
IvmRefreshManager(IvmDeltaExecutor deltaExecutor) {
this.deltaExecutor = Objects.requireNonNull(deltaExecutor, "deltaExecutor can not be null");
}
public IvmRefreshResult doRefresh(MTMV mtmv) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
IvmRefreshResult precheckResult = precheck(mtmv);
if (!precheckResult.isSuccess()) {
LOG.warn("IVM precheck failed for mv={}, result={}", mtmv.getName(), precheckResult);
return precheckResult;
}
final IvmRefreshContext context;
try {
context = buildRefreshContext(mtmv);
} catch (Exception e) {
IvmRefreshResult result = IvmRefreshResult.fallback(
IvmFailureReason.SNAPSHOT_ALIGNMENT_UNSUPPORTED, e.getMessage());
LOG.warn("IVM context build failed for mv={}, result={}", mtmv.getName(), result);
return result;
}
return doRefreshInternal(context);
}
@VisibleForTesting
IvmRefreshResult precheck(MTMV mtmv) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
if (mtmv.getIvmInfo().isRunningIvmRefresh()) {
return IvmRefreshResult.fallback(IvmFailureReason.PREVIOUS_RUN_INCOMPLETE,
"A previous incremental refresh did not complete; full refresh is required");
}
if (mtmv.getIvmInfo().isBinlogBroken()) {
return IvmRefreshResult.fallback(IvmFailureReason.BINLOG_BROKEN,
"Stream binlog is marked as broken");
}
// return checkStreamSupport(mtmv);
return IvmRefreshResult.success();
}
@VisibleForTesting
IvmRefreshContext buildRefreshContext(MTMV mtmv) throws Exception {
ConnectContext connectContext = MTMVPlanUtil.createMTMVContext(mtmv,
MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
MTMVRefreshContext mtmvRefreshContext = MTMVRefreshContext.buildContext(mtmv);
return new IvmRefreshContext(mtmv, connectContext, mtmvRefreshContext);
}
@VisibleForTesting
List<Command> analyzeDeltaCommands(IvmRefreshContext context) throws Exception {
MTMV mtmv = context.getMtmv();
MTMVAnalyzeQueryInfo queryInfo = MTMVPlanUtil.analyzeQueryWithSql(
mtmv, context.getConnectContext(), true);
IvmNormalizeResult normalizeResult = queryInfo.getIvmNormalizeResult();
Plan normalizedPlan = queryInfo.getIvmNormalizedPlan();
if (normalizedPlan == null) {
return Collections.emptyList();
}
// Ensure base table streams are initialized (first refresh may have empty map)
ensureBaseTableStreamsInitialized(mtmv);
// Read latestTso for each base table and pass baseTableStreams to rewrite context
Map<BaseTableInfo, IvmStreamRef> baseTableStreams = mtmv.getIvmInfo().getBaseTableStreams();
populateLatestTso(baseTableStreams);
IvmDeltaRewriteContext rewriteCtx = new IvmDeltaRewriteContext(
mtmv, context.getConnectContext(), normalizeResult, baseTableStreams);
return new IvmDeltaRewriter().rewrite(normalizedPlan, rewriteCtx);
}
/**
* Reads the current visible TSO from each base table and stores it in the
* corresponding {@link IvmStreamRef#setLatestTso}. Throws on failure to ensure
* the caller falls back to full refresh rather than proceeding with stale TSO
* values (which could cause missed data or false no-op results).
*/
@VisibleForTesting
void populateLatestTso(Map<BaseTableInfo, IvmStreamRef> baseTableStreams) {
if (baseTableStreams == null) {
return;
}
for (Map.Entry<BaseTableInfo, IvmStreamRef> entry : baseTableStreams.entrySet()) {
BaseTableInfo tableInfo = entry.getKey();
IvmStreamRef ref = entry.getValue();
TableIf table;
try {
table = MTMVUtil.getTable(tableInfo);
} catch (Exception e) {
throw new AnalysisException("IVM: failed to resolve base table: " + tableInfo, e);
}
if (!(table instanceof OlapTable)) {
throw new AnalysisException(
"IVM: base table is not OlapTable: " + tableInfo);
}
try {
ref.setLatestTso(((OlapTable) table).getVisibleTso());
} catch (Exception e) {
throw new AnalysisException(
"IVM: failed to get visible TSO for table: " + tableInfo, e);
}
}
}
/**
* Ensures that baseTableStreams in IvmInfo is populated. On the first
* incremental refresh the map is empty; this method initializes it from
* the MTMV's relation metadata, creating an IvmStreamRef with
* consumedTso=0 for each base table.
*
* <p>Note: uses getBaseTablesOneLevel() which returns all base tables referenced
* in the MV query, including multi-table joins. When view support is added,
* this should align with getBaseTablesOneLevelAndFromView() and also backfill
* missing entries in partially populated maps.
*/
@VisibleForTesting
void ensureBaseTableStreamsInitialized(MTMV mtmv) {
IvmInfo ivmInfo = mtmv.getIvmInfo();
Map<BaseTableInfo, IvmStreamRef> streams = ivmInfo.getBaseTableStreams();
if (streams != null && !streams.isEmpty()) {
return;
}
MTMVRelation relation = mtmv.getRelation();
if (relation == null) {
return;
}
Set<BaseTableInfo> baseTables = relation.getBaseTablesOneLevel();
if (baseTables == null || baseTables.isEmpty()) {
return;
}
Map<BaseTableInfo, IvmStreamRef> newStreams = new HashMap<>();
for (BaseTableInfo tableInfo : baseTables) {
newStreams.put(tableInfo, new IvmStreamRef());
}
ivmInfo.setBaseTableStreams(newStreams);
LOG.info("IVM initialized baseTableStreams for mv={} with {} base tables",
mtmv.getName(), newStreams.size());
}
private IvmRefreshResult doRefreshInternal(IvmRefreshContext context) {
Objects.requireNonNull(context, "context can not be null");
MTMV mtmv = context.getMtmv();
// Run Nereids with IVM rewrite enabled ��� per-pattern delta rules write bundles to CascadesContext
List<Command> commands;
try {
commands = analyzeDeltaCommands(context);
} catch (IvmException e) {
IvmRefreshResult result = IvmRefreshResult.fallback(e.getFailureReason(), e.getMessage());
LOG.warn("IVM plan analysis failed for mv={}, result={}", mtmv.getName(), result, e);
return result;
} catch (Exception e) {
String detail = e.getMessage() != null ? e.getMessage()
: e.getClass().getName() + " (no message)";
IvmRefreshResult result = IvmRefreshResult.fallback(
IvmFailureReason.PLAN_PATTERN_UNSUPPORTED, detail);
LOG.warn("IVM plan analysis failed for mv={}, result={}", mtmv.getName(), result, e);
return result;
}
if (commands == null || commands.isEmpty()) {
// All base tables are up to date ��� no delta to apply. This is a success (no-op).
LOG.info("IVM no delta commands for mv={} (all base tables up to date)", mtmv.getName());
return IvmRefreshResult.success();
}
// Mark incremental refresh in progress and persist BEFORE execution.
// If FE crashes during execution, on restart the flag triggers full refresh.
IvmInfo ivmInfo = mtmv.getIvmInfo();
ivmInfo.setRunningIvmRefresh(true);
persistIvmInfo(mtmv, ivmInfo);
// Consume one ExprId from the analysis StatementContext to obtain the next safe start
// value for execution. This prevents ExprId collisions between plan-embedded ExprIds
// (allocated during analyzeDeltaCommandBundles) and new ExprIds allocated during
// bundle execution (in a fresh StatementContext). See: apache/doris#58494.
// StatementContext may be null in unit-test paths where analyzeDeltaCommandBundles is mocked out;
// in that case start from 0 (safe because no real plan ExprIds exist).
StatementContext analysisStmtCtx =
context.getConnectContext().getStatementContext();
int exprIdStart = analysisStmtCtx != null
? analysisStmtCtx.getNextExprId().asInt() : 0;
try {
deltaExecutor.execute(context, commands, exprIdStart);
} catch (Exception e) {
// Leave runningIvmRefresh=true ��� the next task will detect this and
// fall back to full refresh, which resets the flag on success.
String detail = e.getMessage() != null ? e.getMessage()
: e.getClass().getName() + " (no message)";
if (detail.contains("IVM: deleted row may be current")) {
IvmRefreshResult result = IvmRefreshResult.fallback(
IvmFailureReason.MIN_MAX_BOUNDARY_HIT, detail);
LOG.info("IVM MIN/MAX boundary hit for mv={}, falling back to COMPLETE refresh, result={}",
mtmv.getName(), result);
return result;
}
if (detail.contains("IVM fallback: delete on non-deterministic row_id")) {
IvmRefreshResult result = IvmRefreshResult.fallback(
IvmFailureReason.NON_DETERMINISTIC_ROW_ID, detail);
LOG.info("IVM non-deterministic row_id for mv={}, falling back to COMPLETE refresh, result={}",
mtmv.getName(), result);
return result;
}
IvmRefreshResult result = IvmRefreshResult.fallback(
IvmFailureReason.INCREMENTAL_EXECUTION_FAILED, detail);
LOG.warn("IVM execution failed for mv={}, result={}", mtmv.getName(), result, e);
return result;
}
// Advance consumedTso to latestTso for all base tables and clear the flag,
// persisting everything in one editlog entry.
advanceConsumedTsoAndClearFlag(mtmv);
return IvmRefreshResult.success();
}
/**
* After successful bundle execution, advances each stream's consumedTso to
* latestTso and clears the runningIvmRefresh flag, then persists via editlog.
*/
private void advanceConsumedTsoAndClearFlag(MTMV mtmv) {
IvmInfo ivmInfo = mtmv.getIvmInfo();
for (IvmStreamRef ref : ivmInfo.getBaseTableStreams().values()) {
if (ref.getLatestTso() >= ref.getConsumedTso()) {
ref.setConsumedTso(ref.getLatestTso());
}
}
ivmInfo.setRunningIvmRefresh(false);
persistIvmInfo(mtmv, ivmInfo);
}
/**
* Persists the IvmInfo via the AlterMTMV editlog mechanism.
* Package-private so tests can override to avoid Env dependency.
*/
@VisibleForTesting
void persistIvmInfo(MTMV mtmv, IvmInfo ivmInfo) {
TableNameInfo tableName = new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName());
Env.getCurrentEnv().alterMTMVIvmInfo(tableName, ivmInfo);
}
/**
* Resets IVM state after a successful full (COMPLETE) refresh. Called from MTMVTask
* when the partition-based refresh succeeds and a previous IVM run left
* {@code runningIvmRefresh=true}. Sets each base table's consumedTso to the
* pre-captured snapshot TSO so the next incremental refresh starts from the correct
* position, and clears the flag.
*
* @param mtmv the materialized view
* @param capturedTsos TSO values captured before the full refresh executed,
* keyed by BaseTableInfo
*/
public static void resetIvmStateAfterFullRefresh(MTMV mtmv,
Map<BaseTableInfo, Long> capturedTsos) {
IvmInfo ivmInfo = mtmv.getIvmInfo();
Map<BaseTableInfo, IvmStreamRef> streams = ivmInfo.getBaseTableStreams();
if (streams != null && capturedTsos != null) {
for (Map.Entry<BaseTableInfo, IvmStreamRef> entry : streams.entrySet()) {
Long tso = capturedTsos.get(entry.getKey());
if (tso != null) {
entry.getValue().setConsumedTso(tso);
entry.getValue().setLatestTso(tso);
}
}
}
ivmInfo.setRunningIvmRefresh(false);
TableNameInfo tableName = new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName());
Env.getCurrentEnv().alterMTMVIvmInfo(tableName, ivmInfo);
LOG.info("IVM state reset after full refresh for mv={}", mtmv.getName());
}
/**
* Captures the current visible TSO for each base table stream. Should be called
* BEFORE a full refresh executes, so the captured values represent the snapshot
* that the refresh will read. On failure, logs a warning and returns an empty map;
* the caller should check the result size and skip consumedTso reset if incomplete.
*/
public static Map<BaseTableInfo, Long> captureBaseTableTsos(MTMV mtmv) {
Map<BaseTableInfo, Long> result = new HashMap<>();
Map<BaseTableInfo, IvmStreamRef> streams = mtmv.getIvmInfo().getBaseTableStreams();
if (streams == null) {
return result;
}
for (BaseTableInfo tableInfo : streams.keySet()) {
try {
TableIf table = MTMVUtil.getTable(tableInfo);
if (table instanceof OlapTable) {
result.put(tableInfo, ((OlapTable) table).getVisibleTso());
}
} catch (Exception e) {
LOG.warn("IVM: failed to capture TSO for table {} before full refresh: {}. "
+ "IVM state reset will be skipped.", tableInfo, e.getMessage());
return Collections.emptyMap();
}
}
return result;
}
private IvmRefreshResult checkStreamSupport(MTMV mtmv) {
MTMVRelation relation = mtmv.getRelation();
if (relation == null) {
return IvmRefreshResult.fallback(IvmFailureReason.STREAM_UNSUPPORTED,
"No base table relation found for incremental refresh");
}
Set<BaseTableInfo> baseTables = relation.getBaseTablesOneLevelAndFromView();
if (baseTables == null || baseTables.isEmpty()) {
return IvmRefreshResult.fallback(IvmFailureReason.STREAM_UNSUPPORTED,
"No base tables found for incremental refresh");
}
Map<BaseTableInfo, IvmStreamRef> baseTableStreams = mtmv.getIvmInfo().getBaseTableStreams();
if (baseTableStreams == null || baseTableStreams.isEmpty()) {
return IvmRefreshResult.fallback(IvmFailureReason.STREAM_UNSUPPORTED,
"No stream bindings are registered for this materialized view");
}
for (BaseTableInfo baseTableInfo : baseTables) {
IvmStreamRef streamRef = baseTableStreams.get(baseTableInfo);
if (streamRef == null) {
return IvmRefreshResult.fallback(IvmFailureReason.STREAM_UNSUPPORTED,
"No stream binding found for base table: " + baseTableInfo);
}
final TableIf table;
try {
table = MTMVUtil.getTable(baseTableInfo);
} catch (Exception e) {
return IvmRefreshResult.fallback(IvmFailureReason.STREAM_UNSUPPORTED,
"Failed to resolve base table metadata for incremental refresh: "
+ baseTableInfo + ", reason=" + e.getMessage());
}
if (!(table instanceof OlapTable)) {
return IvmRefreshResult.fallback(IvmFailureReason.STREAM_UNSUPPORTED,
"Only OLAP base tables are supported for incremental refresh: " + baseTableInfo);
}
}
return IvmRefreshResult.success();
}
}