IcebergRowLevelDmlTransform.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.DorisConnectorException;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.handle.WriteOperation;
import org.apache.doris.connector.api.pushdown.ConnectorPredicate;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.PluginDrivenExternalCatalog;
import org.apache.doris.datasource.PluginDrivenExternalTable;
import org.apache.doris.datasource.WriteConstraintExtractor;
import org.apache.doris.datasource.iceberg.IcebergConflictDetectionFilterUtils;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergMetadataColumn;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertExecutor;
import org.apache.doris.nereids.trees.plans.commands.insert.IcebergDeleteExecutor;
import org.apache.doris.nereids.trees.plans.commands.insert.IcebergMergeExecutor;
import org.apache.doris.nereids.trees.plans.commands.insert.PluginDrivenInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergDeleteSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergMergeSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
/**
* Iceberg {@link RowLevelDmlTransform}: routes {@code DELETE}/{@code UPDATE}/{@code MERGE INTO} on iceberg
* tables through the generic {@link RowLevelDmlCommand} shell.
*
* <p>Per the T07c "delegated synthesis" decision, the iceberg plan-synthesis algebra is <b>not</b> relocated:
* {@link #synthesize} constructs the corresponding {@code Iceberg*Command} (same package) and calls its
* (now package-visible) synthesis method, so the synthesized {@code LogicalIceberg{Delete,Merge}Sink} tree is
* byte-identical to legacy. The per-executor-only bits (conflict-filter stash, finalize) are routed here via
* {@code instanceof}-free op switches; the O5-2 exclusion predicate mirrors legacy
* {@code IcebergConflictDetectionFilterUtils} (note the {@code equalsIgnoreCase} vs {@code equals} asymmetry).</p>
*/
public class IcebergRowLevelDmlTransform implements RowLevelDmlTransform {
/**
* Slots excluded from the O5-2 target-only write constraint: the synthetic {@code $row_id} column and
* iceberg metadata columns. Mirrors legacy {@code IcebergConflictDetectionFilterUtils.isTargetOnlyPredicate}
* exactly ��� keep the {@code equalsIgnoreCase} (rowid) vs {@code equals} (metadata) asymmetry.
*/
private static final Predicate<SlotReference> ICEBERG_EXCLUSION =
slot -> Column.ICEBERG_ROWID_COL.equalsIgnoreCase(slot.getName())
|| IcebergMetadataColumn.isMetadataColumn(slot.getName());
@Override
public boolean handles(TableIf table) {
return table instanceof IcebergExternalTable
|| (table instanceof PluginDrivenExternalTable
&& pluginConnectorSupportsRowLevelDml((PluginDrivenExternalTable) table));
}
/**
* A plugin-driven (SPI connector) table is routed through the iceberg row-level DML synthesis only if
* its connector declares row-level DML support ({@code supportsDelete()} or {@code supportsMerge()}).
* Mirrors the connector-capability probe in
* {@code InsertOverwriteTableCommand.pluginConnectorSupportsInsertOverwrite}.
*
* <p>This gate is op-agnostic by design: {@code RowLevelDmlRegistry.find} carries no operation, so it
* admits "supports any row-level DML"; per-op validity (e.g. UPDATE against a delete-only connector) is
* enforced later in {@link #checkMode}.</p>
*
* <p>Dormant until the C5 cutover: today only the iceberg connector declares these capabilities (every
* other SPI connector inherits the {@code ConnectorWriteOps} default {@code false}), and iceberg is not
* yet in {@code SPI_READY_TYPES} ��� so no live table presents as a {@link PluginDrivenExternalTable} here.</p>
*/
private static boolean pluginConnectorSupportsRowLevelDml(PluginDrivenExternalTable table) {
PluginDrivenExternalCatalog catalog = (PluginDrivenExternalCatalog) table.getCatalog();
Set<WriteOperation> ops = catalog.getConnector().supportedWriteOperations();
return ops.contains(WriteOperation.DELETE) || ops.contains(WriteOperation.MERGE);
}
@Override
public void checkMode(TableIf table, RowLevelDmlOp op) {
if (table instanceof PluginDrivenExternalTable) {
checkPluginMode((PluginDrivenExternalTable) table, op);
return;
}
IcebergExternalTable icebergTable = (IcebergExternalTable) table;
switch (op) {
case DELETE:
IcebergDmlCommandUtils.checkDeleteMode(icebergTable);
break;
case UPDATE:
IcebergDmlCommandUtils.checkUpdateMode(icebergTable);
break;
default:
IcebergDmlCommandUtils.checkMergeMode(icebergTable);
break;
}
}
/**
* Post-cutover {@link #checkMode}: route the copy-on-write rejection through the connector's neutral
* {@code validateRowLevelDmlMode} SPI, so the iceberg property knowledge and the message stay in the
* connector. A connector {@link DorisConnectorException} is surfaced as the analysis-time
* {@link AnalysisException} the legacy native path threw, preserving the user-facing message and the
* exception type.
*
* <p>Dormant until the C5 cutover: today no live table presents as a {@link PluginDrivenExternalTable}
* here (iceberg is not yet in {@code SPI_READY_TYPES}), so the legacy {@code IcebergDmlCommandUtils} arm
* still runs.</p>
*/
private static void checkPluginMode(PluginDrivenExternalTable table, RowLevelDmlOp op) {
PluginDrivenExternalCatalog catalog = (PluginDrivenExternalCatalog) table.getCatalog();
ConnectorSession session = catalog.buildConnectorSession();
ConnectorMetadata metadata = catalog.getConnector().getMetadata(session);
ConnectorTableHandle handle = metadata.getTableHandle(
session, table.getRemoteDbName(), table.getRemoteName())
.orElseThrow(() -> new AnalysisException("Table not found: "
+ table.getRemoteDbName() + "." + table.getRemoteName()
+ " in catalog " + catalog.getName()));
try {
metadata.validateRowLevelDmlMode(session, handle, toWriteOperation(op));
} catch (DorisConnectorException e) {
throw new AnalysisException(e.getMessage(), e);
}
}
private static WriteOperation toWriteOperation(RowLevelDmlOp op) {
switch (op) {
case DELETE:
return WriteOperation.DELETE;
case UPDATE:
return WriteOperation.UPDATE;
default:
return WriteOperation.MERGE;
}
}
@Override
public LogicalPlan synthesize(ConnectContext ctx, RowLevelDmlArgs args, RowLevelDmlOp op) {
ExternalTable icebergTable = (ExternalTable) args.getTable();
switch (op) {
case DELETE:
return new IcebergDeleteCommand(args.getNameParts(), args.getTableAlias(), args.isTempPart(),
args.getPartitions(), args.getLogicalQuery(), args.getDeleteCtx())
.completeQueryPlan(ctx, args.getLogicalQuery(), icebergTable);
case UPDATE:
return new IcebergUpdateCommand(args.getNameParts(), args.getTableAlias(), args.getAssignments(),
args.getLogicalQuery(), args.getDeleteCtx())
.buildMergePlan(ctx, args.getLogicalQuery(), args.getAssignments(), icebergTable);
default:
return new IcebergMergeCommand(args.getTargetNameParts(), args.getTargetAlias(), args.getCte(),
args.getSource(), args.getOnClause(), args.getMatchedClauses(), args.getNotMatchedClauses())
.buildMergePlan(ctx, icebergTable);
}
}
@Override
public BaseExternalTableInsertExecutor newExecutor(ConnectContext ctx, TableIf table, String label,
NereidsPlanner planner, boolean emptyInsert, RowLevelDmlOp op) {
if (table instanceof PluginDrivenExternalTable) {
// Post-flip: the connector-driven executor opens an SPI ConnectorTransaction (non-null), which
// activates the neutral O5-2 conflict path in RowLevelDmlCommand.applyWriteConstraintIfPresent. The
// op rides the sink's WriteOperation (set by the translator), so one executor serves DELETE/MERGE;
// no InsertCommandContext is needed for a row-level write (mirrors IcebergDeleteExecutor's empty ctx).
return new PluginDrivenInsertExecutor(ctx, (PluginDrivenExternalTable) table, label, planner,
Optional.empty(), emptyInsert, -1L);
}
IcebergExternalTable icebergTable = (IcebergExternalTable) table;
if (op == RowLevelDmlOp.DELETE) {
return new IcebergDeleteExecutor(ctx, icebergTable, label, planner, emptyInsert, -1L);
}
return new IcebergMergeExecutor(ctx, icebergTable, label, planner, emptyInsert, -1L);
}
@Override
public PhysicalSink<?> requirePhysicalSink(NereidsPlanner planner, RowLevelDmlOp op) {
Optional<PhysicalSink<?>> plan = planner.getPhysicalPlan()
.<PhysicalSink<?>>collect(PhysicalSink.class::isInstance).stream().findAny();
switch (op) {
case DELETE:
if (!plan.isPresent()) {
throw new AnalysisException("DELETE command must contain target table");
}
if (!(plan.get() instanceof PhysicalIcebergDeleteSink)) {
throw new AnalysisException("DELETE plan must use Iceberg delete sink");
}
return plan.get();
case UPDATE:
if (!plan.isPresent()) {
throw new AnalysisException("UPDATE command must contain target table");
}
if (!(plan.get() instanceof PhysicalIcebergMergeSink)) {
throw new AnalysisException("UPDATE merge plan must use Iceberg merge sink");
}
return plan.get();
default:
if (!plan.isPresent()) {
throw new AnalysisException("MERGE INTO command must contain target table");
}
if (!(plan.get() instanceof PhysicalIcebergMergeSink)) {
throw new AnalysisException("MERGE INTO plan must use Iceberg merge sink");
}
return plan.get();
}
}
@Override
public String labelPrefix(RowLevelDmlOp op) {
switch (op) {
case DELETE:
return "iceberg_delete";
case UPDATE:
return "iceberg_update_merge";
default:
return "iceberg_merge_into";
}
}
@Override
public void setupConflictDetection(BaseExternalTableInsertExecutor executor, Plan analyzedPlan, TableIf table,
RowLevelDmlOp op) {
if (table instanceof PluginDrivenExternalTable) {
// Post-flip: the conflict filter is supplied through the neutral SPI path
// (RowLevelDmlCommand.applyWriteConstraintIfPresent -> extractWriteConstraint ->
// ConnectorTransaction.applyWriteConstraint), converted to a native iceberg Expression lazily at
// commit. The legacy native 3-hop below is skipped: it casts to the legacy Iceberg{Delete,Merge}-
// Executor, which the plugin arm (a PluginDrivenInsertExecutor) is not. Running ONLY the SPI path
// avoids double-filtering; the SPI converter is byte-verified equivalent to legacy, the residual
// divergence only widening the filter -> at worst a harmless extra OCC retry (see [DEC-S5]).
return;
}
Optional<org.apache.iceberg.expressions.Expression> conflictFilter =
IcebergConflictDetectionFilterUtils.buildConflictDetectionFilter(
analyzedPlan, (IcebergExternalTable) table);
if (op == RowLevelDmlOp.DELETE) {
((IcebergDeleteExecutor) executor).setConflictDetectionFilter(conflictFilter);
} else {
((IcebergMergeExecutor) executor).setConflictDetectionFilter(conflictFilter);
}
}
@Override
public void finalizeSink(BaseExternalTableInsertExecutor executor, RowLevelDmlOp op, PlanFragment fragment,
DataSink sink, PhysicalSink<?> physicalSink) {
if (executor instanceof PluginDrivenInsertExecutor) {
// Post-flip: finalize through the connector's single transaction model (bind tx -> bindDataSink ->
// planWrite), which supplies rewritable_delete_file_sets itself via the scan-time stash. NO manual
// overlay (the legacy arms below append it on top of super.finalizeSink) -> exactly one finalize,
// no double-overlay.
((PluginDrivenInsertExecutor) executor).finalizeRowLevelDmlSink(fragment, sink, physicalSink);
return;
}
if (op == RowLevelDmlOp.DELETE) {
((IcebergDeleteExecutor) executor).finalizeSinkForDelete(fragment, sink, physicalSink);
} else {
((IcebergMergeExecutor) executor).finalizeSinkForMerge(fragment, sink, physicalSink);
}
}
@Override
public Optional<ConnectorPredicate> extractWriteConstraint(Plan analyzedPlan, TableIf table) {
return WriteConstraintExtractor.extract(analyzedPlan, table.getId(), ICEBERG_EXCLUSION);
}
}