IvmDeltaRewriter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.ivm;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* Transforms a normalized MV plan into delta INSERT commands.
*
* <p>Supported patterns:
* <ul>
* <li>SCAN_ONLY: ResultSink → Project → OlapScan</li>
* <li>PROJECT_SCAN: ResultSink → Project → Project → OlapScan</li>
* </ul>
*/
public class IvmDeltaRewriter {
/**
* Rewrites the normalized plan into a list of delta command bundles.
* Currently produces exactly one INSERT bundle for the single base table scan.
*/
public List<DeltaCommandBundle> rewrite(Plan normalizedPlan, IvmDeltaRewriteContext ctx) {
Plan queryPlan = stripResultSink(normalizedPlan);
LogicalOlapScan scan = validateAndExtractScan(queryPlan);
BaseTableInfo baseTableInfo = new BaseTableInfo(scan.getTable(), 0L);
Command insertCommand = buildInsertCommand(queryPlan, ctx);
return Collections.singletonList(new DeltaCommandBundle(baseTableInfo, insertCommand));
}
private Plan stripResultSink(Plan plan) {
while (plan instanceof LogicalResultSink) {
plan = ((LogicalResultSink<?>) plan).child();
}
return plan;
}
private LogicalOlapScan validateAndExtractScan(Plan plan) {
if (plan instanceof LogicalOlapScan) {
return (LogicalOlapScan) plan;
}
if (plan instanceof LogicalProject) {
return validateAndExtractScan(((LogicalProject<?>) plan).child());
}
throw new AnalysisException(
"IVM delta rewrite does not yet support: " + plan.getClass().getSimpleName());
}
private Command buildInsertCommand(Plan queryPlan, IvmDeltaRewriteContext ctx) {
MTMV mtmv = ctx.getMtmv();
List<String> mvNameParts = ImmutableList.of(
InternalCatalog.INTERNAL_CATALOG_NAME,
mtmv.getQualifiedDbName(),
mtmv.getName());
UnboundTableSink<LogicalPlan> sink = new UnboundTableSink<>(
mvNameParts, mtmv.getInsertedColumnNames(), ImmutableList.of(),
false, ImmutableList.of(), false,
TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.INSERT,
Optional.empty(), Optional.empty(), (LogicalPlan) queryPlan);
return new InsertIntoTableCommand(sink, Optional.empty(), Optional.empty(), Optional.empty());
}
}