IcebergDeleteSink.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.credentials.VendedCredentialsFactory;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.trees.plans.commands.delete.DeleteCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergDeleteSink;

import org.apache.iceberg.Table;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
 * Planner sink for Iceberg DELETE operations.
 * Generates TIcebergDeleteSink for BE to write delete files.
 */
public class IcebergDeleteSink extends BaseExternalTableDataSink {

    private final IcebergExternalTable targetTable;
    private final DeleteCommandContext deleteContext;

    private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
            add(TFileFormatType.FORMAT_PARQUET);
            add(TFileFormatType.FORMAT_ORC);
        }};

    // Store PropertiesMap, including vended credentials or static credentials
    private Map<StorageProperties.Type, StorageProperties> storagePropertiesMap;

    public IcebergDeleteSink(IcebergExternalTable targetTable, DeleteCommandContext deleteContext) {
        super();
        if (targetTable.isView()) {
            throw new UnsupportedOperationException("DELETE from iceberg view is not supported");
        }
        this.targetTable = targetTable;
        this.deleteContext = deleteContext;

        IcebergExternalCatalog catalog = (IcebergExternalCatalog) targetTable.getCatalog();
        storagePropertiesMap = VendedCredentialsFactory.getStoragePropertiesMapWithVendedCredentials(
                catalog.getCatalogProperty().getMetastoreProperties(),
                catalog.getCatalogProperty().getStoragePropertiesMap(),
                targetTable.getIcebergTable());
    }

    @Override
    protected Set<TFileFormatType> supportedFileFormatTypes() {
        return supportedTypes;
    }

    @Override
    public String getExplainString(String prefix, TExplainLevel explainLevel) {
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append(prefix).append("ICEBERG DELETE SINK\n");
        if (explainLevel == TExplainLevel.BRIEF) {
            return strBuilder.toString();
        }
        strBuilder.append(prefix).append("  DeleteType: ")
                .append(deleteContext.getDeleteFileType()).append("\n");
        return strBuilder.toString();
    }

    @Override
    public void bindDataSink(Optional<InsertCommandContext> insertCtx)
            throws AnalysisException {

        TIcebergDeleteSink tSink = new TIcebergDeleteSink();

        Table icebergTable = targetTable.getIcebergTable();

        tSink.setDbName(targetTable.getDbName());
        tSink.setTbName(targetTable.getName());

        // Set delete type (POSITION_DELETES only)
        tSink.setDeleteType(deleteContext.toTFileContent());

        // File format and compression
        tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name()));
        tSink.setCompressType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));

        // Hadoop config
        Map<String, String> props = new HashMap<>();
        for (StorageProperties storageProperties : storagePropertiesMap.values()) {
            props.putAll(storageProperties.getBackendConfigProperties());
        }
        tSink.setHadoopConfig(props);

        // Location for delete files (typically under metadata/)
        String tableLocation = IcebergUtils.dataLocation(icebergTable);
        LocationPath locationPath = LocationPath.of(tableLocation, storagePropertiesMap);
        tSink.setOutputPath(locationPath.toStorageLocation().toString());
        tSink.setTableLocation(tableLocation);

        TFileType fileType = locationPath.getTFileTypeForBE();
        tSink.setFileType(fileType);
        if (fileType.equals(TFileType.FILE_BROKER)) {
            tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
        }

        // Partition information
        if (icebergTable.spec().isPartitioned()) {
            tSink.setPartitionSpecId(icebergTable.spec().specId());
            // Partition data JSON will be set by BE based on actual data
        }

        tDataSink = new TDataSink(TDataSinkType.ICEBERG_DELETE_SINK);
        tDataSink.setIcebergDeleteSink(tSink);
    }
}