RowLevelDmlCommand.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.connector.api.handle.ConnectorTransaction;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.Callable;
/**
* Generic shell for row-level DML ({@code DELETE}/{@code UPDATE}/{@code MERGE INTO}) against external tables.
*
* <p>Owns the single live planner-drive loop that was triplicated across {@code IcebergDeleteCommand},
* {@code IcebergUpdateCommand} and {@code IcebergMergeCommand}: the per-operation points (mode check, plan
* synthesis, required sink, executor factory, label prefix, conflict-detection wiring, finalize) are routed
* through a {@link RowLevelDmlTransform} resolved from {@link RowLevelDmlRegistry}. The dispatching commands
* ({@code UpdateCommand}/{@code DeleteFromCommand}/{@code MergeIntoCommand}) delegate here once a transform is
* found, so the reverse {@code instanceof} dispatch is consolidated into the registry.</p>
*
* <p>This is intentionally a plain class, not a Nereids {@code Command}: it is invoked from within the
* dispatching commands' {@code run}/{@code getExplainPlan}, so it needs no visitor/plan-type/stmt-type of its
* own (those stay on the dispatching commands, preserving their per-op differences).</p>
*/
public class RowLevelDmlCommand {
private final RowLevelDmlTransform transform;
private final RowLevelDmlArgs args;
private final RowLevelDmlOp op;
public RowLevelDmlCommand(RowLevelDmlTransform transform, RowLevelDmlArgs args, RowLevelDmlOp op) {
this.transform = transform;
this.args = args;
this.op = op;
}
/**
* Execute the row-level DML. Mirrors legacy {@code IcebergDeleteCommand.run} /
* {@code IcebergUpdateCommand.executeMergePlan} / {@code IcebergMergeCommand.executeMergePlan} step-for-step;
* the four divergences (required sink, label prefix, executor + finalize, result) are parameterized by op.
*/
public void run(ConnectContext ctx, StmtExecutor stmtExecutor) throws Exception {
TableIf table = args.getTable();
transform.checkMode(table, op);
long previousTargetTableId = ctx.getSyntheticWriteColTargetTableId();
ctx.setSyntheticWriteColTargetTableId(table.getId());
try {
LogicalPlan plan = transform.synthesize(ctx, args, op);
executeWithExternalTableBatchModeDisabled(ctx, () -> {
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(plan, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
stmtExecutor.setPlanner(planner);
stmtExecutor.checkBlockRules();
PhysicalSink<?> physicalSink = transform.requirePhysicalSink(planner, op);
PlanFragment fragment = planner.getFragments().get(0);
DataSink dataSink = fragment.getSink();
boolean emptyInsert = childIsEmptyRelation(physicalSink);
String label = String.format(transform.labelPrefix(op) + "_%x_%x",
ctx.queryId().hi, ctx.queryId().lo);
BaseExternalTableInsertExecutor insertExecutor =
transform.newExecutor(ctx, table, label, planner, emptyInsert, op);
transform.setupConflictDetection(insertExecutor, planner.getAnalyzedPlan(), table, op);
if (insertExecutor.isEmptyInsert()) {
return null;
}
insertExecutor.beginTransaction();
applyWriteConstraintIfPresent(transform, insertExecutor, planner.getAnalyzedPlan(), table);
transform.finalizeSink(insertExecutor, op, fragment, dataSink, physicalSink);
insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId());
stmtExecutor.setCoord(insertExecutor.getCoordinator());
insertExecutor.executeSingleInsert(stmtExecutor);
return null;
});
} finally {
ctx.setSyntheticWriteColTargetTableId(previousTargetTableId);
}
}
/** EXPLAIN path: synthesis only (no planner-drive loop, no transaction), mirroring legacy getExplainPlan. */
public Plan getExplainPlan(ConnectContext ctx) {
TableIf table = args.getTable();
transform.checkMode(table, op);
long previousTargetTableId = ctx.getSyntheticWriteColTargetTableId();
ctx.setSyntheticWriteColTargetTableId(table.getId());
try {
return transform.synthesize(ctx, args, op);
} finally {
ctx.setSyntheticWriteColTargetTableId(previousTargetTableId);
}
}
/**
* O5-2 new write-constraint path. Dormant until P6.6: only fires when the executor exposes an SPI
* {@link ConnectorTransaction}. Today iceberg DELETE/MERGE run on the legacy {@code IcebergTransaction}
* (the base {@code getConnectorTransactionOrNull()} returns {@code null}), so this is a no-op; the legacy
* 3-hop conflict-detection path ({@link RowLevelDmlTransform#setupConflictDetection}) remains the live one.
*/
@VisibleForTesting
static void applyWriteConstraintIfPresent(RowLevelDmlTransform transform,
BaseExternalTableInsertExecutor executor, Plan analyzedPlan, TableIf table) {
ConnectorTransaction connectorTx = executor.getConnectorTransactionOrNull();
if (connectorTx == null) {
return;
}
transform.extractWriteConstraint(analyzedPlan, table).ifPresent(connectorTx::applyWriteConstraint);
}
/**
* Run {@code action} with external-table batch mode disabled so the iceberg scan node yields all splits
* (needed by {@code IcebergRewritableDeletePlanner.collect}). Byte-identical to the per-command copies
* retained on the legacy {@code Iceberg*Command} classes until P6.7.
*/
static <T> T executeWithExternalTableBatchModeDisabled(ConnectContext ctx, Callable<T> action) throws Exception {
boolean previousEnableExternalTableBatchMode = ctx.getSessionVariable().enableExternalTableBatchMode;
ctx.getSessionVariable().enableExternalTableBatchMode = false;
try {
return action.call();
} finally {
ctx.getSessionVariable().enableExternalTableBatchMode = previousEnableExternalTableBatchMode;
}
}
private static boolean childIsEmptyRelation(PhysicalSink<?> sink) {
return sink.children() != null && sink.children().size() == 1
&& sink.child(0) instanceof PhysicalEmptyRelation;
}
}