PhysicalIcebergMergeSink.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.physical;

import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergMergeOperation;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.DistributionSpecMerge;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.delete.DeleteCommandContext;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;

import com.google.common.collect.ImmutableList;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;

/**
 * Physical Iceberg Merge Sink for UPDATE operations.
 * This sink is responsible for writing position delete files and data files.
 */
public class PhysicalIcebergMergeSink<CHILD_TYPE extends Plan> extends PhysicalBaseExternalTableSink<CHILD_TYPE> {
    private final DeleteCommandContext deleteContext;

    /**
     * Constructor
     */
    public PhysicalIcebergMergeSink(IcebergExternalDatabase database,
                                    IcebergExternalTable targetTable,
                                    List<Column> cols,
                                    List<NamedExpression> outputExprs,
                                    DeleteCommandContext deleteContext,
                                    Optional<GroupExpression> groupExpression,
                                    LogicalProperties logicalProperties,
                                    CHILD_TYPE child) {
        this(database, targetTable, cols, outputExprs, deleteContext, groupExpression, logicalProperties,
                PhysicalProperties.GATHER, null, child);
    }

    /**
     * Constructor
     */
    public PhysicalIcebergMergeSink(IcebergExternalDatabase database,
                                    IcebergExternalTable targetTable,
                                    List<Column> cols,
                                    List<NamedExpression> outputExprs,
                                    DeleteCommandContext deleteContext,
                                    Optional<GroupExpression> groupExpression,
                                    LogicalProperties logicalProperties,
                                    PhysicalProperties physicalProperties,
                                    Statistics statistics,
                                    CHILD_TYPE child) {
        super(PlanType.PHYSICAL_ICEBERG_MERGE_SINK, database, targetTable, cols, outputExprs, groupExpression,
                logicalProperties, physicalProperties, statistics, child);
        this.deleteContext = Objects.requireNonNull(
                deleteContext, "deleteContext != null in PhysicalIcebergMergeSink");
    }

    public DeleteCommandContext getDeleteContext() {
        return deleteContext;
    }

    @Override
    public Plan withChildren(List<Plan> children) {
        return new PhysicalIcebergMergeSink<>(
                (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable,
                cols, outputExprs, deleteContext, groupExpression,
                getLogicalProperties(), physicalProperties, statistics, children.get(0));
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitPhysicalIcebergMergeSink(this, context);
    }

    @Override
    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
        return new PhysicalIcebergMergeSink<>(
                (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs,
                deleteContext, groupExpression, getLogicalProperties(), child());
    }

    @Override
    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
                                                 Optional<LogicalProperties> logicalProperties, List<Plan> children) {
        return new PhysicalIcebergMergeSink<>(
                (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs,
                deleteContext, groupExpression, logicalProperties.get(), children.get(0));
    }

    @Override
    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
        return new PhysicalIcebergMergeSink<>(
                (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs,
                deleteContext, groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        if (!super.equals(o)) {
            return false;
        }
        PhysicalIcebergMergeSink<?> that = (PhysicalIcebergMergeSink<?>) o;
        return Objects.equals(deleteContext, that.deleteContext);
    }

    @Override
    public int hashCode() {
        return Objects.hash(super.hashCode(), deleteContext);
    }

    /**
     * Get output physical properties.
     */
    @Override
    public PhysicalProperties getRequirePhysicalProperties() {
        ExprId rowIdExprId = null;
        ExprId operationExprId = null;
        Map<String, ExprId> nameToExprId = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        List<Slot> outputSlots = child().getOutput();
        for (Slot slot : outputSlots) {
            String name = slot.getName();
            if (operationExprId == null && IcebergMergeOperation.OPERATION_COLUMN.equalsIgnoreCase(name)) {
                operationExprId = slot.getExprId();
            }
            if (rowIdExprId == null && Column.ICEBERG_ROWID_COL.equalsIgnoreCase(name)) {
                rowIdExprId = slot.getExprId();
            }
            nameToExprId.put(name, slot.getExprId());
        }

        ConnectContext ctx = ConnectContext.get();
        if (ctx == null || !ctx.getSessionVariable().isEnableIcebergMergePartitioning()) {
            if (rowIdExprId != null) {
                return PhysicalProperties.createHash(ImmutableList.of(rowIdExprId), ShuffleType.REQUIRE);
            }
            return PhysicalProperties.GATHER;
        }

        if (rowIdExprId == null || operationExprId == null) {
            return PhysicalProperties.GATHER;
        }

        List<ExprId> insertPartitionExprIds = new ArrayList<>();
        List<DistributionSpecMerge.IcebergPartitionField> insertPartitionFields = new ArrayList<>();
        Integer partitionSpecId = null;
        List<Column> partitionColumns = ((IcebergExternalTable) targetTable).getPartitionColumns(Optional.empty());
        Map<String, ExprId> columnExprIdMap = buildColumnExprIdMap(outputSlots, nameToExprId);
        boolean insertExprsOk = false;
        if (!partitionColumns.isEmpty()) {
            insertExprsOk = buildInsertPartitionExprIds(insertPartitionExprIds, partitionColumns, columnExprIdMap);
        }
        InsertPartitionFieldResult fieldResult = buildInsertPartitionFields(
                insertPartitionFields, (IcebergExternalTable) targetTable, columnExprIdMap);
        boolean insertFieldsOk = fieldResult.success;
        boolean hasNonIdentity = fieldResult.hasNonIdentity;
        if (insertFieldsOk) {
            partitionSpecId = fieldResult.partitionSpecId;
        }

        boolean insertRandom = !(insertExprsOk || insertFieldsOk);
        if (!insertFieldsOk && hasNonIdentity) {
            insertRandom = true;
            insertPartitionExprIds.clear();
        }
        if (insertRandom) {
            insertPartitionExprIds.clear();
            insertPartitionFields.clear();
        }

        return new PhysicalProperties(new DistributionSpecMerge(
                operationExprId,
                insertPartitionExprIds,
                ImmutableList.of(rowIdExprId),
                insertRandom,
                insertPartitionFields,
                partitionSpecId));
    }

    private boolean buildInsertPartitionExprIds(List<ExprId> insertPartitionExprIds,
                                                List<Column> partitionColumns,
                                                Map<String, ExprId> columnExprIdMap) {
        for (Column column : partitionColumns) {
            ExprId exprId = columnExprIdMap.get(column.getName());
            if (exprId == null) {
                insertPartitionExprIds.clear();
                return false;
            }
            insertPartitionExprIds.add(exprId);
        }
        return insertPartitionExprIds.size() == partitionColumns.size();
    }

    private Map<String, ExprId> buildColumnExprIdMap(List<Slot> outputSlots,
                                                     Map<String, ExprId> nameToExprId) {
        List<Column> visibleColumns = new ArrayList<>();
        for (Column column : cols) {
            if (column.isVisible()) {
                visibleColumns.add(column);
            }
        }
        List<Slot> dataSlots = getDataSlots(outputSlots);
        if (dataSlots.size() == visibleColumns.size()) {
            Map<String, ExprId> columnExprIdMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
            for (int i = 0; i < visibleColumns.size(); i++) {
                columnExprIdMap.put(visibleColumns.get(i).getName(), dataSlots.get(i).getExprId());
            }
            return columnExprIdMap;
        }
        return nameToExprId;
    }

    private List<Slot> getDataSlots(List<Slot> outputSlots) {
        List<Slot> dataSlots = new ArrayList<>();
        for (Slot slot : outputSlots) {
            String name = slot.getName();
            if (IcebergMergeOperation.OPERATION_COLUMN.equalsIgnoreCase(name)) {
                continue;
            }
            if (Column.ICEBERG_ROWID_COL.equalsIgnoreCase(name)) {
                continue;
            }
            dataSlots.add(slot);
        }
        return dataSlots;
    }

    private InsertPartitionFieldResult buildInsertPartitionFields(
            List<DistributionSpecMerge.IcebergPartitionField> insertPartitionFields,
            IcebergExternalTable icebergTable,
            Map<String, ExprId> columnExprIdMap) {
        Table table = icebergTable.getIcebergTable();
        if (table == null) {
            return new InsertPartitionFieldResult(false, false, null);
        }
        PartitionSpec spec = table.spec();
        if (spec == null || !spec.isPartitioned()) {
            return new InsertPartitionFieldResult(false, false, null);
        }
        Schema schema = table.schema();
        boolean hasNonIdentity = false;
        for (PartitionField field : spec.fields()) {
            if (!field.transform().isIdentity()) {
                hasNonIdentity = true;
                break;
            }
        }
        if (schema == null) {
            return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
        }
        for (PartitionField field : spec.fields()) {
            Types.NestedField sourceField = schema.findField(field.sourceId());
            if (sourceField == null) {
                insertPartitionFields.clear();
                return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
            }
            ExprId exprId = columnExprIdMap.get(sourceField.name());
            if (exprId == null) {
                insertPartitionFields.clear();
                return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
            }
            String transform = field.transform().toString();
            Integer param = parseTransformParam(transform);
            insertPartitionFields.add(new DistributionSpecMerge.IcebergPartitionField(
                    transform, exprId, param, field.name(), field.sourceId()));
        }
        if (insertPartitionFields.isEmpty()) {
            return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
        }
        return new InsertPartitionFieldResult(true, hasNonIdentity, spec.specId());
    }

    private Integer parseTransformParam(String transform) {
        int start = transform.indexOf('[');
        int end = transform.indexOf(']');
        if (start < 0 || end <= start) {
            return null;
        }
        try {
            return Integer.parseInt(transform.substring(start + 1, end));
        } catch (NumberFormatException e) {
            return null;
        }
    }

    private static class InsertPartitionFieldResult {
        private final boolean success;
        private final boolean hasNonIdentity;
        private final Integer partitionSpecId;

        private InsertPartitionFieldResult(boolean success, boolean hasNonIdentity, Integer partitionSpecId) {
            this.success = success;
            this.hasNonIdentity = hasNonIdentity;
            this.partitionSpecId = partitionSpecId;
        }
    }
}