IcebergDeleteExecutor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileContent;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.transaction.TransactionType;
import org.apache.iceberg.expressions.Expression;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Executor for Iceberg DELETE operations.
*
* DELETE is implemented by generating Position Delete files
* instead of rewriting data files.
*
* Flow:
* 1. Execute query to get rows matching WHERE clause (with $row_id column)
* 2. Collect $row_id information grouped by file
* 3. Write Position Delete files (file_path, pos)
* 4. Commit DeleteFiles to Iceberg table using RowDelta API
*/
public class IcebergDeleteExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG = LogManager.getLogger(IcebergDeleteExecutor.class);
private Optional<Expression> conflictDetectionFilter = Optional.empty();
public IcebergDeleteExecutor(ConnectContext ctx, IcebergExternalTable table,
String labelName, NereidsPlanner planner,
boolean emptyInsert, long jobId) {
// BaseExternalTableInsertExecutor requires Optional<InsertCommandContext>
// For DELETE operations, we pass Optional.empty().
super(ctx, table, labelName, planner, Optional.empty(), emptyInsert, jobId);
}
public void finalizeSinkForDelete(PlanFragment fragment, DataSink sink, PhysicalSink<?> physicalSink) {
super.finalizeSink(fragment, sink, physicalSink);
}
public void setConflictDetectionFilter(Optional<Expression> filter) {
conflictDetectionFilter = filter == null ? Optional.empty() : filter;
}
@Override
protected void beforeExec() throws UserException {
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
transaction.beginDelete((IcebergExternalTable) table);
if (conflictDetectionFilter.isPresent()) {
transaction.setConflictDetectionFilter(conflictDetectionFilter.get());
} else {
transaction.clearConflictDetectionFilter();
}
}
@Override
protected void doBeforeCommit() throws UserException {
IcebergExternalTable dorisTable = (IcebergExternalTable) table;
// For Position Delete: collect $row_id information from query results.
// In current implementation, the delete information is collected by BE
// and sent to FE through the normal data flow (as part of the query result).
//
// TODO Phase 2: Implement specialized collection mechanism
// 1. Parse $row_id column from query result blocks
// 2. Group by file_path
// 3. Generate Position Delete files with (file_path, pos) pairs
//
// For now, the transaction will collect commit data from BE through
// the existing mechanism (TIcebergCommitData).
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
// TODO Phase 2: Process delete files from commit data
// List<TIcebergCommitData> commitDataList = transaction.getCommitDataList();
// TODO Phase 2: Process commit data from BE
// For Position Delete, we need to process $row_id data
// This will be implemented when we add the data collection mechanism
LOG.info("Processing Position Delete for table: {}", dorisTable.getName());
// TODO: Extract $row_id from query result and write Position Delete files
// Map<String, List<Long>> fileToPositions = extractRowIdData();
// for (Map.Entry<String, List<Long>> entry : fileToPositions.entrySet()) {
// writePositionDeleteFile(entry.getKey(), entry.getValue());
// }
this.loadedRows = transaction.getUpdateCnt();
// Finish delete and commit
org.apache.doris.datasource.NameMapping nameMapping =
new org.apache.doris.datasource.NameMapping(
dorisTable.getCatalog().getId(),
dorisTable.getDbName(),
dorisTable.getName(),
dorisTable.getRemoteDbName(),
dorisTable.getRemoteName());
transaction.finishDelete(nameMapping);
}
@Override
protected TransactionType transactionType() {
return TransactionType.ICEBERG;
}
/**
* Extract $row_id data from query results and group by file.
*
* This method will parse the $row_id STRUCT column which contains:
* - file_path: STRING
* - row_position: BIGINT
* - partition_spec_id: INT
* - partition_data: STRING
*
* Returns a map of file_path -> list of row positions to delete.
*/
private Map<String, RowIdGroup> extractRowIdData() throws UserException {
Map<String, RowIdGroup> result = new HashMap<>();
// Get the commit data list which contains the delete information
// In Doris, the BE sends back TIcebergCommitData through the transaction
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
List<TIcebergCommitData> commitDataList = transaction.getCommitDataList();
if (commitDataList == null || commitDataList.isEmpty()) {
LOG.info("No commit data from BE for delete operation");
return result;
}
// Process each commit data entry
// For Position Delete, BE should have collected $row_id information
for (TIcebergCommitData commitData : commitDataList) {
if (commitData.getFileContent() != TFileContent.POSITION_DELETES) {
continue;
}
// Extract file path and row positions from commit data
String filePath = commitData.getFilePath();
if (filePath == null || filePath.isEmpty()) {
LOG.warn("Empty file path in commit data, skipping");
continue;
}
// Get or create RowIdGroup for this file
result.computeIfAbsent(filePath, k -> {
// Extract partition information
int partitionSpecId = 0;
String partitionDataJson = "";
if (commitData.partition_spec_id > 0) {
partitionSpecId = commitData.partition_spec_id;
}
if (commitData.getPartitionDataJson() != null) {
partitionDataJson = commitData.getPartitionDataJson();
}
return new RowIdGroup(k, partitionSpecId, partitionDataJson);
});
// Add row count (in real implementation, BE should provide individual positions)
long rowCount = commitData.getRowCount();
LOG.info("Extracted {} rows to delete from file: {}", rowCount, filePath);
}
return result;
}
/**
* Write a Position Delete file for a specific data file.
*
* Creates a Parquet file with schema: (file_path: STRING, pos: BIGINT)
* Each row represents a row to delete in the data file.
*/
private void writePositionDeleteFile(String dataFilePath, List<Long> positions,
RowIdGroup rowIdGroup) throws UserException {
// Position Delete files are created by the Iceberg transaction mechanism
// The commit data has already been collected in extractRowIdData()
//
// In Doris architecture:
// 1. BE generates delete file data during query execution
// 2. BE sends TIcebergCommitData to FE via transaction
// 3. FE collects these in IcebergTransaction.commitDataList
// 4. FE calls IcebergWriterHelper.convertToDeleteFiles() to create DeleteFile objects
// 5. FE commits via RowDelta API
//
// This method is primarily for logging and validation
if (positions == null || positions.isEmpty()) {
LOG.warn("No positions to delete for file: {}", dataFilePath);
return;
}
LOG.info("Position Delete file info:");
LOG.info(" Data file: {}", dataFilePath);
LOG.info(" Rows to delete: {}", positions.size());
LOG.info(" Partition spec ID: {}", rowIdGroup.partitionSpecId);
LOG.info(" Partition data: {}", rowIdGroup.partitionDataJson);
// The actual DeleteFile creation happens in:
// - IcebergWriterHelper.convertToDeleteFiles() (converts TIcebergCommitData -> DeleteFile)
// - IcebergTransaction.finishDelete() (commits via RowDelta)
//
// Both are already implemented and called in doBeforeCommit()
}
/**
* Helper class to group $row_id data by file.
*/
private static class RowIdGroup {
private final String filePath;
private final List<Long> positions;
private final int partitionSpecId;
private final String partitionDataJson;
public RowIdGroup(String filePath, int partitionSpecId, String partitionDataJson) {
this.filePath = filePath;
this.positions = new ArrayList<>();
this.partitionSpecId = partitionSpecId;
this.partitionDataJson = partitionDataJson;
}
public void addPosition(long position) {
positions.add(position);
}
public String getFilePath() {
return filePath;
}
public List<Long> getPositions() {
return positions;
}
public int getPartitionSpecId() {
return partitionSpecId;
}
public String getPartitionDataJson() {
return partitionDataJson;
}
}
}