WriteConstraintExtractor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.connector.api.pushdown.ConnectorAnd;
import org.apache.doris.connector.api.pushdown.ConnectorExpression;
import org.apache.doris.connector.api.pushdown.ConnectorPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

/**
 * Engine-side (fe-core) production half of O5-2 (P6.3-T07b): extracts, from an analyzed DELETE/UPDATE/MERGE
 * plan, the conjuncts that reference <i>only the target table's own columns</i> and hands the connector a
 * neutral {@link ConnectorPredicate} for write-time optimistic conflict detection (via
 * {@code ConnectorTransaction.applyWriteConstraint}). It is the connector-agnostic generalization of legacy
 * {@code IcebergConflictDetectionFilterUtils}'s collection half: the {@code IcebergExternalTable} parameter
 * becomes a plain {@code long targetTableId}, the inline {@code $row_id}/metadata-column exclusion becomes an
 * injected {@link Predicate} (the row-level DML transform supplies the connector-specific predicate in T07c),
 * and the per-conjunct iceberg lowering moves to the connector — here each surviving conjunct is converted to
 * a neutral {@link ConnectorExpression} by {@link NereidsToConnectorExpressionConverter}.
 *
 * <p>Conjuncts the converter cannot represent are dropped: dropping a conjunct only ever <i>widens</i> the
 * resulting conflict-detection filter (more conservative, never missing a real concurrent-write conflict).
 * The wiring of the extracted predicate into {@code applyWriteConstraint} is done by the T07c command shell;
 * this class is independently unit-testable.</p>
 */
public final class WriteConstraintExtractor {

    private WriteConstraintExtractor() {
    }

    /**
     * Extracts the target-only write constraint from an analyzed plan.
     *
     * @param analyzedPlan the analyzed DELETE/UPDATE/MERGE plan (may be {@code null})
     * @param targetTableId the id of the target table whose own-column conjuncts are kept
     * @param exclusion a predicate marking slots to exclude (synthetic {@code $row_id} / metadata columns);
     *                  a conjunct referencing any excluded slot is dropped. May be {@code null} (no exclusion).
     * @return the neutral predicate over the target table's own columns, or empty when none survive
     */
    public static Optional<ConnectorPredicate> extract(Plan analyzedPlan, long targetTableId,
            Predicate<SlotReference> exclusion) {
        if (analyzedPlan == null) {
            return Optional.empty();
        }
        List<Expression> targetConjuncts = new ArrayList<>();
        collectTargetConjuncts(analyzedPlan, targetTableId, exclusion, targetConjuncts);
        if (targetConjuncts.isEmpty()) {
            return Optional.empty();
        }
        List<ConnectorExpression> converted = new ArrayList<>();
        for (Expression conjunct : targetConjuncts) {
            ConnectorExpression neutral = NereidsToConnectorExpressionConverter.convert(conjunct);
            if (neutral != null) {
                converted.add(neutral);
            }
        }
        if (converted.isEmpty()) {
            return Optional.empty();
        }
        ConnectorExpression combined = converted.size() == 1 ? converted.get(0) : new ConnectorAnd(converted);
        return Optional.of(new ConnectorPredicate(combined));
    }

    private static void collectTargetConjuncts(Plan plan, long targetTableId,
            Predicate<SlotReference> exclusion, List<Expression> output) {
        if (plan instanceof LogicalFilter) {
            LogicalFilter<?> filter = (LogicalFilter<?>) plan;
            for (Expression conjunct : filter.getConjuncts()) {
                if (isTargetOnlyPredicate(conjunct, targetTableId, exclusion)) {
                    output.add(conjunct);
                }
            }
        }
        for (Plan child : plan.children()) {
            collectTargetConjuncts(child, targetTableId, exclusion, output);
        }
    }

    private static boolean isTargetOnlyPredicate(Expression predicate, long targetTableId,
            Predicate<SlotReference> exclusion) {
        if (predicate == null) {
            return false;
        }
        Set<Slot> slots = predicate.getInputSlots();
        if (slots.isEmpty()) {
            return false;
        }
        for (Slot slot : slots) {
            if (!(slot instanceof SlotReference)) {
                return false;
            }
            SlotReference slotReference = (SlotReference) slot;
            if (exclusion != null && exclusion.test(slotReference)) {
                return false;
            }
            Optional<TableIf> table = slotReference.getOriginalTable();
            if (!table.isPresent() || table.get().getId() != targetTableId) {
                return false;
            }
        }
        return true;
    }
}