IVMRefreshManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.ivm;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVAnalyzeQueryInfo;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.qe.ConnectContext;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Minimal orchestration entry point for incremental refresh.
*/
public class IVMRefreshManager {
private static final Logger LOG = LogManager.getLogger(IVMRefreshManager.class);
private final IVMCapabilityChecker capabilityChecker;
private final IVMDeltaExecutor deltaExecutor;
public IVMRefreshManager() {
this(new IVMCapabilityChecker(), new IVMDeltaExecutor());
}
@VisibleForTesting
IVMRefreshManager(IVMCapabilityChecker capabilityChecker, IVMDeltaExecutor deltaExecutor) {
this.capabilityChecker = Objects.requireNonNull(capabilityChecker, "capabilityChecker can not be null");
this.deltaExecutor = Objects.requireNonNull(deltaExecutor, "deltaExecutor can not be null");
}
public IVMRefreshResult doRefresh(MTMV mtmv) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
IVMRefreshResult precheckResult = precheck(mtmv);
if (!precheckResult.isSuccess()) {
LOG.warn("IVM precheck failed for mv={}, result={}", mtmv.getName(), precheckResult);
return precheckResult;
}
final IVMRefreshContext context;
try {
context = buildRefreshContext(mtmv);
} catch (Exception e) {
IVMRefreshResult result = IVMRefreshResult.fallback(
FallbackReason.SNAPSHOT_ALIGNMENT_UNSUPPORTED, e.getMessage());
LOG.warn("IVM context build failed for mv={}, result={}", mtmv.getName(), result);
return result;
}
return doRefreshInternal(context);
}
@VisibleForTesting
IVMRefreshResult precheck(MTMV mtmv) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
if (mtmv.getIvmInfo().isBinlogBroken()) {
return IVMRefreshResult.fallback(FallbackReason.BINLOG_BROKEN,
"Stream binlog is marked as broken");
}
// return checkStreamSupport(mtmv);
return IVMRefreshResult.success();
}
@VisibleForTesting
IVMRefreshContext buildRefreshContext(MTMV mtmv) throws Exception {
ConnectContext connectContext = MTMVPlanUtil.createMTMVContext(mtmv,
MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
MTMVRefreshContext mtmvRefreshContext = MTMVRefreshContext.buildContext(mtmv);
return new IVMRefreshContext(mtmv, connectContext, mtmvRefreshContext);
}
@VisibleForTesting
List<DeltaCommandBundle> analyzeDeltaCommandBundles(IVMRefreshContext context) throws Exception {
MTMVAnalyzeQueryInfo queryInfo = MTMVPlanUtil.analyzeQueryWithSql(
context.getMtmv(), context.getConnectContext(), true);
Plan normalizedPlan = queryInfo.getIvmNormalizedPlan();
if (normalizedPlan == null) {
return Collections.emptyList();
}
IvmDeltaRewriteContext rewriteCtx = new IvmDeltaRewriteContext(
context.getMtmv(), context.getConnectContext());
return new IvmDeltaRewriter().rewrite(normalizedPlan, rewriteCtx);
}
private IVMRefreshResult doRefreshInternal(IVMRefreshContext context) {
Objects.requireNonNull(context, "context can not be null");
// Run Nereids with IVM rewrite enabled — per-pattern delta rules write bundles to CascadesContext
List<DeltaCommandBundle> bundles;
try {
bundles = analyzeDeltaCommandBundles(context);
} catch (Exception e) {
IVMRefreshResult result = IVMRefreshResult.fallback(
FallbackReason.PLAN_PATTERN_UNSUPPORTED, e.getMessage());
LOG.warn("IVM plan analysis failed for mv={}, result={}", context.getMtmv().getName(), result);
return result;
}
if (bundles == null || bundles.isEmpty()) {
IVMRefreshResult result = IVMRefreshResult.fallback(
FallbackReason.PLAN_PATTERN_UNSUPPORTED, "No IVM delta rule matched the MV define plan");
LOG.warn("IVM no delta command bundles for mv={}, result={}", context.getMtmv().getName(), result);
return result;
}
IVMCapabilityResult capabilityResult = capabilityChecker.check(context, bundles);
Objects.requireNonNull(capabilityResult, "capabilityResult can not be null");
if (!capabilityResult.isIncremental()) {
IVMRefreshResult result = IVMRefreshResult.fallback(
capabilityResult.getFallbackReason(), capabilityResult.getDetailMessage());
LOG.warn("IVM capability check failed for mv={}, result={}", context.getMtmv().getName(), result);
return result;
}
try {
deltaExecutor.execute(context, bundles);
return IVMRefreshResult.success();
} catch (Exception e) {
IVMRefreshResult result = IVMRefreshResult.fallback(
FallbackReason.INCREMENTAL_EXECUTION_FAILED, e.getMessage());
LOG.warn("IVM execution failed for mv={}, result={}", context.getMtmv().getName(), result, e);
return result;
}
}
private IVMRefreshResult checkStreamSupport(MTMV mtmv) {
MTMVRelation relation = mtmv.getRelation();
if (relation == null) {
return IVMRefreshResult.fallback(FallbackReason.STREAM_UNSUPPORTED,
"No base table relation found for incremental refresh");
}
Set<BaseTableInfo> baseTables = relation.getBaseTablesOneLevelAndFromView();
if (baseTables == null || baseTables.isEmpty()) {
return IVMRefreshResult.fallback(FallbackReason.STREAM_UNSUPPORTED,
"No base tables found for incremental refresh");
}
Map<BaseTableInfo, IVMStreamRef> baseTableStreams = mtmv.getIvmInfo().getBaseTableStreams();
if (baseTableStreams == null || baseTableStreams.isEmpty()) {
return IVMRefreshResult.fallback(FallbackReason.STREAM_UNSUPPORTED,
"No stream bindings are registered for this materialized view");
}
for (BaseTableInfo baseTableInfo : baseTables) {
IVMStreamRef streamRef = baseTableStreams.get(baseTableInfo);
if (streamRef == null) {
return IVMRefreshResult.fallback(FallbackReason.STREAM_UNSUPPORTED,
"No stream binding found for base table: " + baseTableInfo);
}
if (streamRef.getStreamType() != StreamType.OLAP) {
return IVMRefreshResult.fallback(FallbackReason.STREAM_UNSUPPORTED,
"Only OLAP base table streams are supported for incremental refresh: " + baseTableInfo);
}
final TableIf table;
try {
table = MTMVUtil.getTable(baseTableInfo);
} catch (Exception e) {
return IVMRefreshResult.fallback(FallbackReason.STREAM_UNSUPPORTED,
"Failed to resolve base table metadata for incremental refresh: "
+ baseTableInfo + ", reason=" + e.getMessage());
}
if (!(table instanceof OlapTable)) {
return IVMRefreshResult.fallback(FallbackReason.STREAM_UNSUPPORTED,
"Only OLAP base tables are supported for incremental refresh: " + baseTableInfo);
}
}
return IVMRefreshResult.success();
}
}