IcebergDeleteCommand.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.iceberg.IcebergConflictDetectionFilterUtils;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergMergeOperation;
import org.apache.doris.datasource.iceberg.IcebergRowId;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.Unbound;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.delete.DeleteCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.IcebergDeleteExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergDeleteSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergDeleteSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
* DELETE command for Iceberg tables.
*
* This command converts DELETE operations to INSERT operations that generate
* position DeleteFile entries instead of data files.
*
* Example:
* DELETE FROM iceberg_table WHERE id = 1
*
* This will:
* 1. Scan rows matching the WHERE condition
* 2. Generate DeleteFile containing the matching rows
* 3. Commit the DeleteFile to Iceberg table using RowDelta API
*/
public class IcebergDeleteCommand extends Command implements ForwardWithSync, Explainable {
protected final List<String> nameParts;
protected final String tableAlias;
protected final boolean isTempPart;
protected final List<String> partitions;
protected final LogicalPlan logicalQuery;
protected final DeleteCommandContext deleteCtx;
/**
* constructor
*/
public IcebergDeleteCommand(
List<String> nameParts,
String tableAlias,
boolean isTempPart,
List<String> partitions,
LogicalPlan logicalQuery,
DeleteCommandContext deleteCtx) {
super(PlanType.DELETE_COMMAND);
this.nameParts = Utils.copyRequiredList(nameParts);
this.tableAlias = tableAlias;
this.isTempPart = isTempPart;
this.partitions = Utils.copyRequiredList(partitions);
this.logicalQuery = logicalQuery;
this.deleteCtx = deleteCtx != null ? deleteCtx : new DeleteCommandContext();
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// Check if target table is Iceberg table
List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, nameParts);
TableIf table = RelationUtil.getTable(qualifiedTableName, ctx.getEnv(), Optional.empty());
if (!(table instanceof IcebergExternalTable)) {
throw new AnalysisException("DELETE command can only be used on Iceberg tables. "
+ "Table " + Util.getTempTableDisplayName(table.getName()) + " is not an Iceberg table.");
}
IcebergExternalTable icebergTable = (IcebergExternalTable) table;
// Verify table format version (must be v2+ for delete support)
// org.apache.iceberg.Table icebergTableObj = icebergTable.getIcebergTable();
// String formatVersionStr = icebergTableObj.properties().get("format-version");
// int formatVersion = formatVersionStr != null ? Integer.parseInt(formatVersionStr) : 1;
// if (formatVersion < 2) {
// throw new AnalysisException("Iceberg table DELETE requires format version >= 2. "
// + "Current format version: " + formatVersion);
// }
boolean previousNeedIcebergRowId = ctx.needIcebergRowId();
ctx.setNeedIcebergRowId(true);
try {
// Build query plan with DELETE sink
LogicalPlan deleteQueryPlan = completeQueryPlan(ctx, logicalQuery, icebergTable);
// Create planner and plan the delete operation
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(deleteQueryPlan, ctx.getStatementContext());
// Plan the delete query to generate physical plan and distributed plan
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
// Set planner in executor for later use
executor.setPlanner(planner);
executor.checkBlockRules();
Optional<org.apache.iceberg.expressions.Expression> conflictFilter =
IcebergConflictDetectionFilterUtils.buildConflictDetectionFilter(
planner.getAnalyzedPlan(), icebergTable);
PhysicalSink<?> physicalSink = getPhysicalSink(planner);
PlanFragment fragment = planner.getFragments().get(0);
DataSink dataSink = fragment.getSink();
boolean emptyInsert = childIsEmptyRelation(physicalSink);
String label = String.format("iceberg_delete_%x_%x", ctx.queryId().hi, ctx.queryId().lo);
// Create IcebergDeleteExecutor and execute
IcebergDeleteExecutor deleteExecutor = new IcebergDeleteExecutor(
ctx,
icebergTable,
label,
planner,
emptyInsert,
-1L);
deleteExecutor.setConflictDetectionFilter(conflictFilter);
if (deleteExecutor.isEmptyInsert()) {
return;
}
deleteExecutor.beginTransaction();
deleteExecutor.finalizeSinkForDelete(fragment, dataSink, physicalSink);
deleteExecutor.getCoordinator().setTxnId(deleteExecutor.getTxnId());
executor.setCoord(deleteExecutor.getCoordinator());
deleteExecutor.executeSingleInsert(executor);
} finally {
ctx.setNeedIcebergRowId(previousNeedIcebergRowId);
}
}
/**
* Complete the query plan by adding necessary columns for position delete operation.
* Select $row_id (file_path, row_position, partition info).
*/
private LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuery,
IcebergExternalTable icebergTable) {
LogicalPlan queryPlan = buildPositionDeletePlan(ctx, logicalQuery, icebergTable);
// Convert output to NamedExpression list
List<NamedExpression> outputExprs;
if (!hasUnboundPlan(queryPlan)) {
outputExprs = queryPlan.getOutput().stream()
.map(slot -> (NamedExpression) slot)
.collect(java.util.stream.Collectors.toList());
} else if (queryPlan instanceof LogicalProject) {
outputExprs = ((LogicalProject<?>) queryPlan).getProjects();
} else {
outputExprs = ImmutableList.of();
}
// Wrap query plan with LogicalIcebergDeleteSink
LogicalIcebergDeleteSink<LogicalPlan> deleteSink = new LogicalIcebergDeleteSink<>(
(IcebergExternalDatabase) icebergTable.getDatabase(),
icebergTable,
icebergTable.getBaseSchema(true), // cols
outputExprs, // outputExprs
deleteCtx,
Optional.empty(), // groupExpression
Optional.empty(), // logicalProperties
queryPlan // child
);
return deleteSink;
}
/**
* Build query plan for position delete.
* Add $row_id column to select list.
*
* This follows Trino's approach:
* 1. Original query filters rows based on WHERE clause
* 2. We project $row_id metadata column from matching rows
* 3. The $row_id contains (file_path, row_position, partition_spec_id, partition_data)
* 4. These will be written to Position Delete file
*/
private LogicalPlan buildPositionDeletePlan(ConnectContext ctx, LogicalPlan logicalQuery,
IcebergExternalTable icebergTable) {
// Step 1: Inject $row_id metadata column into the scan
// ������������ getFullSchema() ������������������������������ Schema
LogicalPlan planWithRowId = injectRowIdColumn(logicalQuery);
// Step 2: Project operation + __DORIS_ICEBERG_ROWID_COL__
// These are sent to the delete file writer and used for shuffle requirements
Optional<Slot> rowIdSlot = Optional.empty();
if (!hasUnboundPlan(planWithRowId)) {
rowIdSlot = findRowIdSlot(planWithRowId.getOutput());
}
NamedExpression operationColumn = new UnboundAlias(
new TinyIntLiteral(IcebergMergeOperation.DELETE_OPERATION_NUMBER),
IcebergMergeOperation.OPERATION_COLUMN);
NamedExpression rowIdColumn = rowIdSlot.isPresent()
? (NamedExpression) rowIdSlot.get()
: new UnboundSlot(Column.ICEBERG_ROWID_COL);
List<NamedExpression> projectItems = ImmutableList.of(operationColumn, rowIdColumn);
return new LogicalProject<>(projectItems, planWithRowId);
}
private PhysicalSink<?> getPhysicalSink(NereidsPlanner planner) {
Optional<PhysicalSink<?>> plan = planner.getPhysicalPlan()
.<PhysicalSink<?>>collect(PhysicalSink.class::isInstance).stream().findAny();
if (!plan.isPresent()) {
throw new AnalysisException("DELETE command must contain target table");
}
PhysicalSink<?> sink = plan.get();
if (!(sink instanceof PhysicalIcebergDeleteSink)) {
throw new AnalysisException("DELETE plan must use Iceberg delete sink");
}
return sink;
}
private boolean childIsEmptyRelation(PhysicalSink<?> sink) {
return sink.children() != null && sink.children().size() == 1
&& sink.child(0) instanceof PhysicalEmptyRelation;
}
/**
* Inject $row_id metadata column into the logical plan.
*
* This method traverses the plan tree and marks scans to include the $row_id column.
* The actual column generation happens in BE during execution.
*
* Similar to Trino's IcebergPageSourceProvider which injects row position
* during page source creation.
*/
private LogicalPlan injectRowIdColumn(LogicalPlan plan) {
if (hasUnboundPlan(plan)) {
return plan;
}
return (LogicalPlan) plan.accept(new IcebergRowIdInjector(), null);
}
private static boolean hasUnboundPlan(Plan plan) {
return plan.anyMatch(node -> node instanceof Unbound || ((Plan) node).hasUnboundExpression());
}
private static class IcebergRowIdInjector extends DefaultPlanRewriter<Void> {
@Override
public Plan visitLogicalFileScan(LogicalFileScan scan, Void context) {
if (!(scan.getTable() instanceof IcebergExternalTable)) {
return scan;
}
if (hasRowIdSlot(scan.getOutput())) {
return scan;
}
IcebergExternalTable table = (IcebergExternalTable) scan.getTable();
Column rowIdColumn = getRowIdColumn(table);
SlotReference rowIdSlot = SlotReference.fromColumn(
StatementScopeIdGenerator.newExprId(), table, rowIdColumn, scan.getQualifier());
List<Slot> outputs = new ArrayList<>(scan.getOutput());
outputs.add(rowIdSlot);
return scan.withCachedOutput(outputs);
}
@Override
public Plan visitLogicalProject(LogicalProject<? extends Plan> project, Void context) {
project = (LogicalProject<? extends Plan>) visitChildren(this, project, context);
Optional<Slot> rowIdSlot = findRowIdSlot(project.child().getOutput());
if (!rowIdSlot.isPresent() || hasRowIdProject(project.getProjects())) {
return project;
}
List<NamedExpression> newProjects = new ArrayList<>(project.getProjects());
newProjects.add((NamedExpression) rowIdSlot.get());
return project.withProjects(newProjects);
}
}
private static boolean hasRowIdSlot(List<Slot> slots) {
return findRowIdSlot(slots).isPresent();
}
private static Optional<Slot> findRowIdSlot(List<Slot> slots) {
for (Slot slot : slots) {
if (Column.ICEBERG_ROWID_COL.equalsIgnoreCase(slot.getName())) {
return Optional.of(slot);
}
}
return Optional.empty();
}
private static boolean hasRowIdProject(List<NamedExpression> projects) {
for (NamedExpression project : projects) {
if (project instanceof Slot
&& Column.ICEBERG_ROWID_COL.equalsIgnoreCase(((Slot) project).getName())) {
return true;
}
}
return false;
}
private static Column getRowIdColumn(IcebergExternalTable table) {
List<Column> fullSchema = table.getFullSchema();
if (fullSchema != null) {
for (Column column : fullSchema) {
if (Column.ICEBERG_ROWID_COL.equalsIgnoreCase(column.getName())) {
return column;
}
}
}
return IcebergRowId.createHiddenColumn();
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, nameParts);
TableIf table = RelationUtil.getTable(qualifiedTableName, ctx.getEnv(), Optional.empty());
if (!(table instanceof IcebergExternalTable)) {
throw new AnalysisException("Table must be IcebergExternalTable in DELETE command");
}
boolean previousNeedIcebergRowId = ctx.needIcebergRowId();
ctx.setNeedIcebergRowId(true);
try {
return completeQueryPlan(ctx, logicalQuery, (IcebergExternalTable) table);
} finally {
ctx.setNeedIcebergRowId(previousNeedIcebergRowId);
}
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCommand(this, context);
}
@Override
public StmtType stmtType() {
return StmtType.DELETE;
}
public DeleteCommandContext getDeleteCtx() {
return deleteCtx;
}
}