IcebergTransaction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
// and modified by Doris

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
import org.apache.doris.nereids.trees.plans.commands.insert.IcebergInsertCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TFileContent;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class IcebergTransaction implements Transaction {

    private static final Logger LOG = LogManager.getLogger(IcebergTransaction.class);
    private static final String DELETE_ISOLATION_LEVEL = "delete_isolation_level";
    private static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";

    private final IcebergMetadataOps ops;
    private Table table;

    private org.apache.iceberg.Transaction transaction;
    private final List<TIcebergCommitData> commitDataList = Lists.newArrayList();
    private Optional<Expression> conflictDetectionFilter = Optional.empty();

    private IcebergInsertCommandContext insertCtx;
    private String branchName;
    private Long baseSnapshotId;

    // Rewrite operation support
    private final List<DataFile> filesToDelete = Lists.newArrayList();
    private final List<DataFile> filesToAdd = Lists.newArrayList();
    private boolean isRewriteMode = false;

    public IcebergTransaction(IcebergMetadataOps ops) {
        this.ops = ops;
    }

    public void updateIcebergCommitData(List<TIcebergCommitData> commitDataList) {
        synchronized (this) {
            this.commitDataList.addAll(commitDataList);
        }
    }

    public void setConflictDetectionFilter(Expression filter) {
        conflictDetectionFilter = Optional.ofNullable(filter);
    }

    public void clearConflictDetectionFilter() {
        conflictDetectionFilter = Optional.empty();
    }

    public List<TIcebergCommitData> getCommitDataList() {
        return commitDataList;
    }

    public void updateRewriteFiles(List<DataFile> filesToDelete) {
        synchronized (this) {
            this.filesToDelete.addAll(filesToDelete);
        }
    }

    public void beginInsert(ExternalTable dorisTable, Optional<InsertCommandContext> ctx) throws UserException {
        ctx.ifPresent(c -> this.insertCtx = (IcebergInsertCommandContext) c);
        try {
            ops.getExecutionAuthenticator().execute(() -> {
                // create and start the iceberg transaction
                this.table = IcebergUtils.getIcebergTable(dorisTable);
                this.baseSnapshotId = null;
                // check branch
                if (insertCtx != null && insertCtx.getBranchName().isPresent()) {
                    this.branchName = insertCtx.getBranchName().get();
                    SnapshotRef branchRef = table.refs().get(branchName);
                    if (branchRef == null) {
                        throw new RuntimeException(branchName + " is not founded in " + dorisTable.getName());
                    } else if (!branchRef.isBranch()) {
                        throw new RuntimeException(
                                branchName
                                        + " is a tag, not a branch. Tags cannot be targets for producing snapshots");
                    }
                }
                this.transaction = table.newTransaction();
            });
        } catch (Exception e) {
            throw new UserException("Failed to begin insert for iceberg table " + dorisTable.getName()
                    + "because: " + e.getMessage(), e);
        }

    }

    /**
     * Begin rewrite transaction for data file rewrite operations
     */
    public void beginRewrite(ExternalTable dorisTable) throws UserException {
        // For rewrite operations, we work directly on the main table
        this.branchName = null;
        this.isRewriteMode = true;

        try {
            ops.getExecutionAuthenticator().execute(() -> {
                // create and start the iceberg transaction
                this.table = IcebergUtils.getIcebergTable(dorisTable);
                this.baseSnapshotId = null;

                // For rewrite operations, we work directly on the main table
                // No branch information needed
                this.transaction = table.newTransaction();
                LOG.info("Started rewrite transaction for table: {} (main table)",
                        dorisTable.getName());
                return null;
            });
        } catch (Exception e) {
            throw new UserException("Failed to begin rewrite for iceberg table " + dorisTable.getName()
                    + " because: " + e.getMessage(), e);
        }
    }

    /**
     * Finish rewrite operation by committing all file changes using RewriteFiles
     * API
     */
    public void finishRewrite() {
        // TODO: refactor IcebergTransaction to make code cleaner
        convertCommitDataListToDataFilesToAdd();

        if (LOG.isDebugEnabled()) {
            LOG.debug("Finishing rewrite with {} files to delete and {} files to add",
                    filesToDelete.size(), filesToAdd.size());
        }

        try {
            ops.getExecutionAuthenticator().execute(() -> {
                updateManifestAfterRewrite();
                return null;
            });
        } catch (Exception e) {
            LOG.error("Failed to finish rewrite transaction", e);
            throw new RuntimeException(e);
        }
    }

    private void convertCommitDataListToDataFilesToAdd() {
        if (commitDataList.isEmpty()) {
            LOG.debug("No commit data to convert for rewrite operation");
            return;
        }

        // Get table specification information
        PartitionSpec spec = transaction.table().spec();
        FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table());

        // Convert commit data to DataFile objects using the same logic as insert
        WriteResult writeResult = IcebergWriterHelper.convertToWriterResult(fileFormat, spec, commitDataList);

        // Add the generated DataFiles to filesToAdd list
        synchronized (filesToAdd) {
            for (DataFile dataFile : writeResult.dataFiles()) {
                filesToAdd.add(dataFile);
            }
        }

        LOG.info("Converted {} commit data entries to {} DataFiles for rewrite operation",
                commitDataList.size(), writeResult.dataFiles().length);
    }

    private void updateManifestAfterRewrite() {
        if (filesToDelete.isEmpty() && filesToAdd.isEmpty()) {
            LOG.info("No files to rewrite, skipping commit");
            return;
        }

        RewriteFiles rewriteFiles = transaction.newRewrite();

        // For rewrite operations, we work directly on the main table
        rewriteFiles = rewriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());

        // Add files to delete
        for (DataFile dataFile : filesToDelete) {
            rewriteFiles.deleteFile(dataFile);
        }

        // Add files to add
        for (DataFile dataFile : filesToAdd) {
            rewriteFiles.addFile(dataFile);
        }

        // Commit the rewrite operation
        rewriteFiles.commit();

        if (LOG.isDebugEnabled()) {
            LOG.debug("Rewrite committed with {} files deleted and {} files added",
                    filesToDelete.size(), filesToAdd.size());
        }
    }

    /**
     * Begin delete operation for Iceberg table
     */
    public void beginDelete(ExternalTable dorisTable) throws UserException {
        try {
            ops.getExecutionAuthenticator().execute(() -> {
                // create and start the iceberg transaction
                this.table = IcebergUtils.getIcebergTable(dorisTable);
                this.baseSnapshotId = getSnapshotIdIfPresent(table);
                // // Verify table format version (must be v2+ for delete support)
                // String formatVersionStr = table.properties().get("format-version");
                // int formatVersion = formatVersionStr != null ? Integer.parseInt(formatVersionStr) : 1;
                // if (formatVersion < 2) {
                //     throw new RuntimeException("Iceberg table DELETE requires format version >= 2. "
                //             + "Current format version: " + formatVersion);
                // }
                this.transaction = table.newTransaction();
                LOG.info("Started delete transaction for table: {}", dorisTable.getName());
            });
        } catch (Exception e) {
            throw new UserException("Failed to begin delete for iceberg table " + dorisTable.getName()
                    + " because: " + e.getMessage(), e);
        }
    }

    /**
     * Begin merge operation for Iceberg UPDATE (single scan RowDelta).
     */
    public void beginMerge(ExternalTable dorisTable) throws UserException {
        try {
            ops.getExecutionAuthenticator().execute(() -> {
                this.branchName = null;
                this.table = IcebergUtils.getIcebergTable(dorisTable);
                this.baseSnapshotId = getSnapshotIdIfPresent(table);
                this.transaction = table.newTransaction();
                LOG.info("Started merge transaction for table: {}", dorisTable.getName());
                return null;
            });
        } catch (Exception e) {
            throw new UserException("Failed to begin merge for iceberg table " + dorisTable.getName()
                    + " because: " + e.getMessage(), e);
        }
    }

    /**
     * Finish delete operation by committing DeleteFiles using RowDelta API
     */
    public void finishDelete(NameMapping nameMapping) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("iceberg table {} delete operation finished!", nameMapping.getFullLocalName());
        }
        try {
            ops.getExecutionAuthenticator().execute(() -> {
                updateManifestAfterDelete();
            });
        } catch (Exception e) {
            LOG.warn("Failed to finish delete for iceberg table {}.", nameMapping.getFullLocalName(), e);
            throw new RuntimeException(e);
        }
    }

    /**
     * Finish merge operation by committing data and delete files using RowDelta.
     */
    public void finishMerge(NameMapping nameMapping) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("iceberg table {} merge operation finished!", nameMapping.getFullLocalName());
        }
        try {
            ops.getExecutionAuthenticator().execute(() -> {
                updateManifestAfterMerge();
            });
        } catch (Exception e) {
            LOG.warn("Failed to finish merge for iceberg table {}.", nameMapping.getFullLocalName(), e);
            throw new RuntimeException(e);
        }
    }

    /**
     * Update manifest after delete operation using RowDelta API
     */
    private void updateManifestAfterDelete() {
        FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table());

        if (commitDataList.isEmpty()) {
            LOG.info("No delete files to commit");
            return;
        }
        List<DeleteFile> deleteFiles = convertCommitDataToDeleteFiles(fileFormat, commitDataList);

        if (deleteFiles.isEmpty()) {
            LOG.info("No delete files generated from commit data");
            return;
        }

        // Create RowDelta operation
        RowDelta rowDelta = transaction.newRowDelta();
        applyRowDeltaValidations(rowDelta, transaction.table(), commitDataList,
                collectReferencedDataFiles(commitDataList));
        rowDelta.scanManifestsWith(ops.getThreadPoolWithPreAuth());

        // Add all delete files
        for (DeleteFile deleteFile : deleteFiles) {
            rowDelta.addDeletes(deleteFile);
        }

        // Commit the delete operation
        rowDelta.commit();

        LOG.info("Committed {} delete files", deleteFiles.size());
    }

    private List<DeleteFile> convertCommitDataToDeleteFiles(FileFormat fileFormat,
            List<TIcebergCommitData> commitDataList) {
        if (commitDataList.isEmpty()) {
            return Collections.emptyList();
        }

        PartitionSpec currentSpec = transaction.table().spec();
        Map<Integer, PartitionSpec> specsById = transaction.table().specs();
        Map<Integer, List<TIcebergCommitData>> commitDataBySpecId = new HashMap<>();
        List<TIcebergCommitData> missingSpecId = new ArrayList<>();

        for (TIcebergCommitData commitData : commitDataList) {
            if (commitData.isSetPartitionSpecId()) {
                commitDataBySpecId.computeIfAbsent(commitData.getPartitionSpecId(), k -> new ArrayList<>())
                        .add(commitData);
            } else {
                missingSpecId.add(commitData);
            }
        }

        if (!missingSpecId.isEmpty()) {
            Preconditions.checkState(!currentSpec.isPartitioned(),
                    "Missing partition spec id for delete files in partitioned table %s",
                    transaction.table().name());
            commitDataBySpecId.computeIfAbsent(currentSpec.specId(), k -> new ArrayList<>())
                    .addAll(missingSpecId);
        }

        List<DeleteFile> deleteFiles = new ArrayList<>();
        for (Map.Entry<Integer, List<TIcebergCommitData>> entry : commitDataBySpecId.entrySet()) {
            int specId = entry.getKey();
            PartitionSpec spec = specsById.get(specId);
            Preconditions.checkState(spec != null,
                    "Unknown partition spec id %s for delete files in table %s",
                    specId, transaction.table().name());
            deleteFiles.addAll(IcebergWriterHelper.convertToDeleteFiles(fileFormat, spec, entry.getValue()));
        }

        return deleteFiles;
    }

    private void updateManifestAfterMerge() {
        if (commitDataList.isEmpty()) {
            LOG.info("No commit data for merge operation");
            return;
        }

        FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table());
        PartitionSpec spec = transaction.table().spec();

        List<TIcebergCommitData> dataCommitData = new ArrayList<>();
        List<TIcebergCommitData> deleteCommitData = new ArrayList<>();

        for (TIcebergCommitData commitData : commitDataList) {
            if (commitData.isSetFileContent()
                    && commitData.getFileContent() == TFileContent.POSITION_DELETES) {
                deleteCommitData.add(commitData);
            } else {
                dataCommitData.add(commitData);
            }
        }

        List<DataFile> dataFiles = new ArrayList<>();
        if (!dataCommitData.isEmpty()) {
            WriteResult writeResult = IcebergWriterHelper.convertToWriterResult(
                    fileFormat, spec, dataCommitData);
            dataFiles.addAll(Arrays.asList(writeResult.dataFiles()));
        }

        List<DeleteFile> deleteFiles = convertCommitDataToDeleteFiles(fileFormat, deleteCommitData);

        if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
            LOG.info("No data or delete files generated from commit data");
            return;
        }

        RowDelta rowDelta = transaction.newRowDelta();
        applyRowDeltaValidations(rowDelta, transaction.table(), commitDataList,
                collectReferencedDataFiles(deleteCommitData));
        rowDelta.scanManifestsWith(ops.getThreadPoolWithPreAuth());

        for (DataFile dataFile : dataFiles) {
            rowDelta.addRows(dataFile);
        }
        for (DeleteFile deleteFile : deleteFiles) {
            rowDelta.addDeletes(deleteFile);
        }

        rowDelta.commit();
        LOG.info("Committed merge with {} data files and {} delete files",
                dataFiles.size(), deleteFiles.size());
    }

    public void finishInsert(NameMapping nameMapping) {
        if (LOG.isDebugEnabled()) {
            LOG.info("iceberg table {} insert table finished!", nameMapping.getFullLocalName());
        }
        try {
            ops.getExecutionAuthenticator().execute(() -> {
                //create and start the iceberg transaction
                TUpdateMode updateMode = TUpdateMode.APPEND;
                if (insertCtx != null) {
                    updateMode = insertCtx.isOverwrite()
                            ? TUpdateMode.OVERWRITE
                            : TUpdateMode.APPEND;
                }
                updateManifestAfterInsert(updateMode);
                return null;
            });
        } catch (Exception e) {
            LOG.warn("Failed to finish insert for iceberg table {}.", nameMapping.getFullLocalName(), e);
            throw new RuntimeException(e);
        }

    }

    private void updateManifestAfterInsert(TUpdateMode updateMode) {
        PartitionSpec spec = transaction.table().spec();
        FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table());

        List<WriteResult> pendingResults;
        if (commitDataList.isEmpty()) {
            pendingResults = Collections.emptyList();
        } else {
            //convert commitDataList to writeResult
            WriteResult writeResult = IcebergWriterHelper
                    .convertToWriterResult(fileFormat, spec, commitDataList);
            pendingResults = Lists.newArrayList(writeResult);
        }

        if (updateMode == TUpdateMode.APPEND) {
            commitAppendTxn(pendingResults);
        } else {
            // Check if this is a static partition overwrite
            if (insertCtx != null && insertCtx.isStaticPartitionOverwrite()) {
                commitStaticPartitionOverwrite(pendingResults);
            } else {
                commitReplaceTxn(pendingResults);
            }
        }
    }

    @Override
    public void commit() throws UserException {
        // commit the iceberg transaction
        transaction.commitTransaction();
    }

    @Override
    public void rollback() {
        if (isRewriteMode) {
            // Clear the collected files for rewrite mode
            synchronized (filesToDelete) {
                filesToDelete.clear();
            }
            synchronized (filesToAdd) {
                filesToAdd.clear();
            }
            LOG.info("Rewrite transaction rolled back");
        }
        // For insert mode, do nothing as original implementation
    }

    public long getUpdateCnt() {
        long dataRows = 0;
        long deleteRows = 0;
        for (TIcebergCommitData commitData : commitDataList) {
            if (commitData.isSetFileContent()
                    && commitData.getFileContent() == TFileContent.POSITION_DELETES) {
                deleteRows += commitData.getRowCount();
            } else {
                dataRows += commitData.getRowCount();
            }
        }
        return dataRows > 0 ? dataRows : deleteRows;
    }

    /**
     * Get the number of files that will be deleted in rewrite operation
     */
    public int getFilesToDeleteCount() {
        synchronized (filesToDelete) {
            return filesToDelete.size();
        }
    }

    /**
     * Get the number of files that will be added in rewrite operation
     */
    public int getFilesToAddCount() {
        synchronized (filesToAdd) {
            return filesToAdd.size();
        }
    }

    /**
     * Get the total size of files to be deleted in rewrite operation
     */
    public long getFilesToDeleteSize() {
        synchronized (filesToDelete) {
            return filesToDelete.stream().mapToLong(DataFile::fileSizeInBytes).sum();
        }
    }

    /**
     * Get the total size of files to be added in rewrite operation
     */
    public long getFilesToAddSize() {
        synchronized (filesToAdd) {
            return filesToAdd.stream().mapToLong(DataFile::fileSizeInBytes).sum();
        }
    }

    private void commitAppendTxn(List<WriteResult> pendingResults) {
        // commit append files.
        AppendFiles appendFiles = transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
        if (branchName != null) {
            appendFiles = appendFiles.toBranch(branchName);
        }
        for (WriteResult result : pendingResults) {
            Preconditions.checkState(result.referencedDataFiles().length == 0,
                    "Should have no referenced data files for append.");
            Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
        }
        appendFiles.commit();
    }

    private Long getSnapshotIdIfPresent(Table icebergTable) {
        if (icebergTable == null || icebergTable.currentSnapshot() == null) {
            return null;
        }
        return icebergTable.currentSnapshot().snapshotId();
    }

    private void applyBaseSnapshotValidation(RowDelta rowDelta) {
        if (baseSnapshotId != null) {
            rowDelta.validateFromSnapshot(baseSnapshotId);
        }
    }

    private void applyRowDeltaValidations(RowDelta rowDelta, Table icebergTable,
            List<TIcebergCommitData> commitDataList, List<String> referencedDataFiles) {
        applyBaseSnapshotValidation(rowDelta);
        applyConflictDetectionFilter(rowDelta, icebergTable, commitDataList);
        if (isSerializableIsolationLevel(icebergTable)) {
            rowDelta.validateNoConflictingDataFiles();
        }
        rowDelta.validateDeletedFiles();
        rowDelta.validateNoConflictingDeleteFiles();
        if (!referencedDataFiles.isEmpty()) {
            rowDelta.validateDataFilesExist(referencedDataFiles);
        }
    }

    private void applyConflictDetectionFilter(RowDelta rowDelta, Table icebergTable,
            List<TIcebergCommitData> commitDataList) {
        Optional<Expression> partitionFilter = buildConflictDetectionFilter(icebergTable, commitDataList);
        Optional<Expression> combined =
                combineConflictDetectionFilters(conflictDetectionFilter, partitionFilter);
        combined.ifPresent(rowDelta::conflictDetectionFilter);
    }

    private Optional<Expression> combineConflictDetectionFilters(Optional<Expression> queryFilter,
            Optional<Expression> partitionFilter) {
        if (queryFilter.isPresent() && partitionFilter.isPresent()) {
            return Optional.of(Expressions.and(queryFilter.get(), partitionFilter.get()));
        }
        return queryFilter.isPresent() ? queryFilter : partitionFilter;
    }

    private Optional<Expression> buildConflictDetectionFilter(Table icebergTable,
            List<TIcebergCommitData> commitDataList) {
        if (icebergTable == null || commitDataList == null || commitDataList.isEmpty()) {
            return Optional.empty();
        }

        PartitionSpec spec = icebergTable.spec();
        if (!spec.isPartitioned()) {
            return Optional.empty();
        }
        if (!areAllIdentityPartitions(spec)) {
            return Optional.empty();
        }

        Schema schema = icebergTable.schema();
        int currentSpecId = spec.specId();

        Expression combined = null;
        for (TIcebergCommitData commitData : commitDataList) {
            if (commitData.isSetPartitionSpecId()
                    && commitData.getPartitionSpecId() != currentSpecId) {
                return Optional.empty();
            }
            if (!commitData.isSetPartitionSpecId() && spec.isPartitioned()) {
                return Optional.empty();
            }

            Expression partitionExpr = buildIdentityPartitionExpression(spec, schema, commitData);
            if (partitionExpr == null) {
                return Optional.empty();
            }
            combined = combined == null ? partitionExpr : Expressions.or(combined, partitionExpr);
        }
        return combined == null ? Optional.empty() : Optional.of(combined);
    }

    private boolean areAllIdentityPartitions(PartitionSpec spec) {
        for (PartitionField field : spec.fields()) {
            if (!field.transform().isIdentity()) {
                return false;
            }
        }
        return true;
    }

    private Expression buildIdentityPartitionExpression(PartitionSpec spec, Schema schema,
            TIcebergCommitData commitData) {
        List<String> partitionValues = commitData.getPartitionValues();
        if (partitionValues != null && !partitionValues.isEmpty()) {
            return buildIdentityPartitionExpression(spec, schema, partitionValues);
        }
        if (commitData.getPartitionDataJson() != null && !commitData.getPartitionDataJson().isEmpty()) {
            PartitionData partitionData = IcebergUtils.parsePartitionDataFromJson(
                    commitData.getPartitionDataJson(), spec);
            if (partitionData != null) {
                return buildIdentityPartitionExpression(spec, schema, partitionData);
            }
        }
        return null;
    }

    private Expression buildIdentityPartitionExpression(PartitionSpec spec, Schema schema,
            List<String> partitionValues) {
        if (partitionValues.isEmpty() || partitionValues.size() != spec.fields().size()) {
            return null;
        }
        Expression expression = null;
        List<PartitionField> fields = spec.fields();
        for (int i = 0; i < fields.size(); i++) {
            PartitionField field = fields.get(i);
            Types.NestedField sourceField = schema.findField(field.sourceId());
            if (sourceField == null) {
                return null;
            }
            String valueStr = partitionValues.get(i);
            if (valueStr == null || "null".equals(valueStr)) {
                valueStr = null;
            }
            Object value = IcebergUtils.parsePartitionValueFromString(valueStr, sourceField.type());
            Expression predicate = value == null
                    ? Expressions.isNull(sourceField.name())
                    : Expressions.equal(sourceField.name(), value);
            expression = expression == null ? predicate : Expressions.and(expression, predicate);
        }
        return expression;
    }

    private Expression buildIdentityPartitionExpression(PartitionSpec spec, Schema schema,
            PartitionData partitionData) {
        if (partitionData == null || partitionData.size() != spec.fields().size()) {
            return null;
        }
        Expression expression = null;
        List<PartitionField> fields = spec.fields();
        for (int i = 0; i < fields.size(); i++) {
            PartitionField field = fields.get(i);
            Types.NestedField sourceField = schema.findField(field.sourceId());
            if (sourceField == null) {
                return null;
            }
            Object value = partitionData.get(i);
            Expression predicate = value == null
                    ? Expressions.isNull(sourceField.name())
                    : Expressions.equal(sourceField.name(), value);
            expression = expression == null ? predicate : Expressions.and(expression, predicate);
        }
        return expression;
    }

    private boolean isSerializableIsolationLevel(Table icebergTable) {
        if (icebergTable == null) {
            return true;
        }
        String level = icebergTable.properties()
                .getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT);
        return "serializable".equalsIgnoreCase(level);
    }

    private List<String> collectReferencedDataFiles(List<TIcebergCommitData> commitDataList) {
        if (commitDataList == null || commitDataList.isEmpty()) {
            return Collections.emptyList();
        }

        List<String> referencedDataFiles = new ArrayList<>();
        for (TIcebergCommitData commitData : commitDataList) {
            if (commitData.isSetFileContent()
                    && commitData.getFileContent() != TFileContent.POSITION_DELETES) {
                continue;
            }
            if (commitData.isSetReferencedDataFiles()) {
                for (String dataFile : commitData.getReferencedDataFiles()) {
                    if (dataFile != null && !dataFile.isEmpty()) {
                        referencedDataFiles.add(dataFile);
                    }
                }
            }
            if (commitData.isSetReferencedDataFilePath()
                    && commitData.getReferencedDataFilePath() != null
                    && !commitData.getReferencedDataFilePath().isEmpty()) {
                referencedDataFiles.add(commitData.getReferencedDataFilePath());
            }
        }
        return referencedDataFiles;
    }

    private void commitReplaceTxn(List<WriteResult> pendingResults) {
        if (pendingResults.isEmpty()) {
            // such as : insert overwrite table `dst_tb` select * from `empty_tb`
            // 1. if dst_tb is a partitioned table, it will return directly.
            // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied.
            if (!transaction.table().spec().isPartitioned()) {
                OverwriteFiles overwriteFiles = transaction.newOverwrite();
                if (branchName != null) {
                    overwriteFiles = overwriteFiles.toBranch(branchName);
                }
                overwriteFiles = overwriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());
                try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
                    OverwriteFiles finalOverwriteFiles = overwriteFiles;
                    fileScanTasks.forEach(f -> finalOverwriteFiles.deleteFile(f.file()));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                overwriteFiles.commit();
            }
            return;
        }

        // commit replace partitions
        ReplacePartitions appendPartitionOp = transaction.newReplacePartitions();
        if (branchName != null) {
            appendPartitionOp = appendPartitionOp.toBranch(branchName);
        }
        appendPartitionOp = appendPartitionOp.scanManifestsWith(ops.getThreadPoolWithPreAuth());
        for (WriteResult result : pendingResults) {
            Preconditions.checkState(result.referencedDataFiles().length == 0,
                    "Should have no referenced data files.");
            Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
        }
        appendPartitionOp.commit();
    }

    /**
     * Commit static partition overwrite operation
     * This method uses OverwriteFiles.overwriteByRowFilter() to overwrite only the specified partitions
     */
    private void commitStaticPartitionOverwrite(List<WriteResult> pendingResults) {
        Table icebergTable = transaction.table();
        PartitionSpec spec = icebergTable.spec();
        Schema schema = icebergTable.schema();

        // Build partition filter expression from static partition values
        Expression partitionFilter = buildPartitionFilter(
                insertCtx.getStaticPartitionValues(), spec, schema);

        // Create OverwriteFiles operation
        OverwriteFiles overwriteFiles = transaction.newOverwrite();
        if (branchName != null) {
            overwriteFiles = overwriteFiles.toBranch(branchName);
        }
        overwriteFiles = overwriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());

        // Set partition filter to overwrite only matching partitions
        overwriteFiles = overwriteFiles.overwriteByRowFilter(partitionFilter);

        // Add new data files
        for (WriteResult result : pendingResults) {
            Preconditions.checkState(result.referencedDataFiles().length == 0,
                    "Should have no referenced data files for static partition overwrite.");
            Arrays.stream(result.dataFiles()).forEach(overwriteFiles::addFile);
        }

        // Commit the overwrite operation
        overwriteFiles.commit();
    }

    /**
     * Build partition filter expression from static partition key-value pairs
     *
     * @param staticPartitions Map of partition column name to partition value (as String)
     * @param spec PartitionSpec of the table
     * @param schema Schema of the table
     * @return Iceberg Expression for partition filtering
     */
    private Expression buildPartitionFilter(
            Map<String, String> staticPartitions,
            PartitionSpec spec,
            Schema schema) {
        if (staticPartitions == null || staticPartitions.isEmpty()) {
            return Expressions.alwaysTrue();
        }

        List<Expression> predicates = new ArrayList<>();

        for (PartitionField field : spec.fields()) {
            String partitionColName = field.name();
            if (staticPartitions.containsKey(partitionColName)) {
                String partitionValueStr = staticPartitions.get(partitionColName);

                // Get source field to determine the type
                Types.NestedField sourceField = schema.findField(field.sourceId());
                if (sourceField == null) {
                    throw new RuntimeException(String.format("Source field not found for partition field: %s",
                        partitionColName));
                }

                // Convert partition value string to appropriate type
                Object partitionValue = IcebergUtils.parsePartitionValueFromString(
                        partitionValueStr, sourceField.type());

                // Build equality expression using source field name (not partition field name)
                // For identity partitions, Iceberg requires the source column name in expressions
                String sourceColName = sourceField.name();
                Expression eqExpr;
                if (partitionValue == null) {
                    eqExpr = Expressions.isNull(sourceColName);
                } else {
                    eqExpr = Expressions.equal(sourceColName, partitionValue);
                }
                predicates.add(eqExpr);
            }
        }

        if (predicates.isEmpty()) {
            return Expressions.alwaysTrue();
        }

        // Combine all predicates with AND
        Expression result = predicates.get(0);
        for (int i = 1; i < predicates.size(); i++) {
            result = Expressions.and(result, predicates.get(i));
        }
        return result;
    }
}