PhysicalIcebergMergeSink.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.Column;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.write.ConnectorWritePartitionField;
import org.apache.doris.connector.api.write.ConnectorWritePartitionSpec;
import org.apache.doris.connector.api.write.ConnectorWritePlanProvider;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.PluginDrivenExternalCatalog;
import org.apache.doris.datasource.PluginDrivenExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergMergeOperation;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.DistributionSpecMerge;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.delete.DeleteCommandContext;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
/**
* Physical Iceberg Merge Sink for UPDATE operations.
* This sink is responsible for writing position delete files and data files.
*/
public class PhysicalIcebergMergeSink<CHILD_TYPE extends Plan> extends PhysicalBaseExternalTableSink<CHILD_TYPE> {
private final DeleteCommandContext deleteContext;
/**
* Constructor
*/
public PhysicalIcebergMergeSink(ExternalDatabase database,
ExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
DeleteCommandContext deleteContext,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(database, targetTable, cols, outputExprs, deleteContext, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child);
}
/**
* Constructor
*/
public PhysicalIcebergMergeSink(ExternalDatabase database,
ExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
DeleteCommandContext deleteContext,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_ICEBERG_MERGE_SINK, database, targetTable, cols, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
this.deleteContext = Objects.requireNonNull(
deleteContext, "deleteContext != null in PhysicalIcebergMergeSink");
}
public DeleteCommandContext getDeleteContext() {
return deleteContext;
}
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalIcebergMergeSink<>(
database, targetTable,
cols, outputExprs, deleteContext, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalIcebergMergeSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalIcebergMergeSink<>(
database, targetTable, cols, outputExprs,
deleteContext, groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalIcebergMergeSink<>(
database, targetTable, cols, outputExprs,
deleteContext, groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalIcebergMergeSink<>(
database, targetTable, cols, outputExprs,
deleteContext, groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PhysicalIcebergMergeSink<?> that = (PhysicalIcebergMergeSink<?>) o;
return Objects.equals(deleteContext, that.deleteContext);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), deleteContext);
}
/**
* Get output physical properties.
*/
@Override
public PhysicalProperties getRequirePhysicalProperties() {
ExprId rowIdExprId = null;
ExprId operationExprId = null;
Map<String, ExprId> nameToExprId = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
List<Slot> outputSlots = child().getOutput();
for (Slot slot : outputSlots) {
String name = slot.getName();
if (operationExprId == null && IcebergMergeOperation.OPERATION_COLUMN.equalsIgnoreCase(name)) {
operationExprId = slot.getExprId();
}
if (rowIdExprId == null && Column.ICEBERG_ROWID_COL.equalsIgnoreCase(name)) {
rowIdExprId = slot.getExprId();
}
nameToExprId.put(name, slot.getExprId());
}
ConnectContext ctx = ConnectContext.get();
if (ctx == null || !ctx.getSessionVariable().isEnableIcebergMergePartitioning()) {
if (rowIdExprId != null) {
return PhysicalProperties.createHash(ImmutableList.of(rowIdExprId), ShuffleType.REQUIRE);
}
return PhysicalProperties.GATHER;
}
if (rowIdExprId == null || operationExprId == null) {
return PhysicalProperties.GATHER;
}
List<ExprId> insertPartitionExprIds = new ArrayList<>();
List<DistributionSpecMerge.IcebergPartitionField> insertPartitionFields = new ArrayList<>();
Integer partitionSpecId = null;
List<Column> partitionColumns = targetTable.getPartitionColumns(Optional.empty());
Map<String, ExprId> columnExprIdMap = buildColumnExprIdMap(outputSlots, nameToExprId);
boolean insertExprsOk = false;
if (!partitionColumns.isEmpty()) {
insertExprsOk = buildInsertPartitionExprIds(insertPartitionExprIds, partitionColumns, columnExprIdMap);
}
InsertPartitionFieldResult fieldResult = getIcebergPartitioning(
insertPartitionFields, targetTable, columnExprIdMap);
boolean insertFieldsOk = fieldResult.success;
boolean hasNonIdentity = fieldResult.hasNonIdentity;
if (insertFieldsOk) {
partitionSpecId = fieldResult.partitionSpecId;
}
boolean insertRandom = !(insertExprsOk || insertFieldsOk);
if (!insertFieldsOk && hasNonIdentity) {
insertRandom = true;
insertPartitionExprIds.clear();
}
if (insertRandom) {
insertPartitionExprIds.clear();
insertPartitionFields.clear();
}
return new PhysicalProperties(new DistributionSpecMerge(
operationExprId,
insertPartitionExprIds,
ImmutableList.of(rowIdExprId),
insertRandom,
insertPartitionFields,
partitionSpecId));
}
private boolean buildInsertPartitionExprIds(List<ExprId> insertPartitionExprIds,
List<Column> partitionColumns,
Map<String, ExprId> columnExprIdMap) {
for (Column column : partitionColumns) {
ExprId exprId = columnExprIdMap.get(column.getName());
if (exprId == null) {
insertPartitionExprIds.clear();
return false;
}
insertPartitionExprIds.add(exprId);
}
return insertPartitionExprIds.size() == partitionColumns.size();
}
private Map<String, ExprId> buildColumnExprIdMap(List<Slot> outputSlots,
Map<String, ExprId> nameToExprId) {
List<Column> visibleColumns = new ArrayList<>();
for (Column column : cols) {
if (column.isVisible()) {
visibleColumns.add(column);
}
}
List<Slot> dataSlots = getDataSlots(outputSlots);
if (dataSlots.size() == visibleColumns.size()) {
Map<String, ExprId> columnExprIdMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < visibleColumns.size(); i++) {
columnExprIdMap.put(visibleColumns.get(i).getName(), dataSlots.get(i).getExprId());
}
return columnExprIdMap;
}
return nameToExprId;
}
private List<Slot> getDataSlots(List<Slot> outputSlots) {
List<Slot> dataSlots = new ArrayList<>();
for (Slot slot : outputSlots) {
String name = slot.getName();
if (IcebergMergeOperation.OPERATION_COLUMN.equalsIgnoreCase(name)) {
continue;
}
if (Column.ICEBERG_ROWID_COL.equalsIgnoreCase(name)) {
continue;
}
dataSlots.add(slot);
}
return dataSlots;
}
/**
* Dual-mode partition-field resolution for the merge-write distribution. Pre-flip (legacy
* {@link IcebergExternalTable}) walks the native iceberg {@code PartitionSpec} via
* {@link #buildInsertPartitionFields} (byte-identical to legacy). Post-flip
* ({@link PluginDrivenExternalTable}) asks the connector for its engine-neutral
* {@link ConnectorWritePartitionSpec} and reconstructs the same result via
* {@link #reconstructPartitionFields}, preserving the three legacy parities (hard-fail clear on an
* unresolvable source column, the non-identity pre-pass over all fields, and the spec-id carry).
*
* <p>This sink is a legacy-exempt class, so the {@code instanceof IcebergExternalTable} pre-flip
* branch is permitted here; the post-flip branch routes entirely through neutral connector SPI
* (no {@code instanceof Iceberg*}, no native types).</p>
*/
private InsertPartitionFieldResult getIcebergPartitioning(
List<DistributionSpecMerge.IcebergPartitionField> insertPartitionFields,
ExternalTable table,
Map<String, ExprId> columnExprIdMap) {
if (table instanceof IcebergExternalTable) {
return buildInsertPartitionFields(insertPartitionFields, (IcebergExternalTable) table, columnExprIdMap);
}
return buildInsertPartitionFieldsFromConnector(
insertPartitionFields, (PluginDrivenExternalTable) table, columnExprIdMap);
}
private InsertPartitionFieldResult buildInsertPartitionFields(
List<DistributionSpecMerge.IcebergPartitionField> insertPartitionFields,
IcebergExternalTable icebergTable,
Map<String, ExprId> columnExprIdMap) {
Table table = icebergTable.getIcebergTable();
if (table == null) {
return new InsertPartitionFieldResult(false, false, null);
}
PartitionSpec spec = table.spec();
if (spec == null || !spec.isPartitioned()) {
return new InsertPartitionFieldResult(false, false, null);
}
Schema schema = table.schema();
boolean hasNonIdentity = false;
for (PartitionField field : spec.fields()) {
if (!field.transform().isIdentity()) {
hasNonIdentity = true;
break;
}
}
if (schema == null) {
return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
}
for (PartitionField field : spec.fields()) {
Types.NestedField sourceField = schema.findField(field.sourceId());
if (sourceField == null) {
insertPartitionFields.clear();
return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
}
ExprId exprId = columnExprIdMap.get(sourceField.name());
if (exprId == null) {
insertPartitionFields.clear();
return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
}
String transform = field.transform().toString();
Integer param = parseTransformParam(transform);
insertPartitionFields.add(new DistributionSpecMerge.IcebergPartitionField(
transform, exprId, param, field.name(), field.sourceId()));
}
if (insertPartitionFields.isEmpty()) {
return new InsertPartitionFieldResult(false, hasNonIdentity, spec.specId());
}
return new InsertPartitionFieldResult(true, hasNonIdentity, spec.specId());
}
/**
* Post-flip arm of {@link #getIcebergPartitioning}: fetches the connector's engine-neutral
* {@link ConnectorWritePartitionSpec} via the same canonical access path as
* {@code PhysicalPlanTranslator.visitPhysicalConnectorTableSink}, then reconstructs the partition
* fields. A {@code null} write-plan provider or an unresolvable table handle degrades to the
* non-partitioned result (false, GATHER/random fallback), never an exception ��� matching the legacy
* native walk, which only ever returns result objects from inside the distribution derivation.
*/
private InsertPartitionFieldResult buildInsertPartitionFieldsFromConnector(
List<DistributionSpecMerge.IcebergPartitionField> insertPartitionFields,
PluginDrivenExternalTable table,
Map<String, ExprId> columnExprIdMap) {
PluginDrivenExternalCatalog catalog = (PluginDrivenExternalCatalog) table.getCatalog();
Connector connector = catalog.getConnector();
ConnectorWritePlanProvider writePlanProvider = connector.getWritePlanProvider();
if (writePlanProvider == null) {
return new InsertPartitionFieldResult(false, false, null);
}
ConnectorSession session = catalog.buildConnectorSession();
ConnectorMetadata metadata = connector.getMetadata(session);
ConnectorTableHandle handle = metadata.getTableHandle(
session, table.getRemoteDbName(), table.getRemoteName()).orElse(null);
if (handle == null) {
return new InsertPartitionFieldResult(false, false, null);
}
ConnectorWritePartitionSpec spec = writePlanProvider.getWritePartitioning(session, handle);
return reconstructPartitionFields(insertPartitionFields, spec, columnExprIdMap);
}
/**
* Reconstructs the legacy {@link InsertPartitionFieldResult} from a connector's engine-neutral
* {@link ConnectorWritePartitionSpec}, byte-for-byte equivalent to the native walk in
* {@link #buildInsertPartitionFields}. Pure (no native types, no I/O) so the three parities are
* pinned deterministically:
* <ul>
* <li><b>P1 hard-fail clear:</b> a field with a {@code null} source column name, or one whose name
* does not resolve to a bound expr id, clears the accumulated fields and returns
* {@code success=false} ��� short-circuited <em>before</em> constructing the field, since the
* {@link DistributionSpecMerge.IcebergPartitionField} ctor requires a non-null expr id;</li>
* <li><b>P2 non-identity pre-pass:</b> {@code hasNonIdentity} is computed over <em>all</em> fields
* from the transform string ({@code !"identity".equals}) independently of resolvability,
* matching legacy {@code field.transform().isIdentity()} (only {@code Identity.toString()} is
* {@code "identity"}); it gates the caller's random fallback;</li>
* <li><b>spec-id carry:</b> the spec id is returned on every partitioned outcome (success or
* hard-fail), {@code null} only when unpartitioned.</li>
* </ul>
* A {@code null} spec means the connector reported the target unpartitioned (mirroring legacy
* {@code spec().isPartitioned()}), yielding {@code (false, false, null)}.
*/
static InsertPartitionFieldResult reconstructPartitionFields(
List<DistributionSpecMerge.IcebergPartitionField> insertPartitionFields,
ConnectorWritePartitionSpec spec,
Map<String, ExprId> columnExprIdMap) {
if (spec == null) {
return new InsertPartitionFieldResult(false, false, null);
}
List<ConnectorWritePartitionField> fields = spec.getFields();
boolean hasNonIdentity = false;
for (ConnectorWritePartitionField field : fields) {
if (!"identity".equals(field.getTransform())) {
hasNonIdentity = true;
break;
}
}
for (ConnectorWritePartitionField field : fields) {
String sourceColumnName = field.getSourceColumnName();
if (sourceColumnName == null) {
insertPartitionFields.clear();
return new InsertPartitionFieldResult(false, hasNonIdentity, spec.getSpecId());
}
ExprId exprId = columnExprIdMap.get(sourceColumnName);
if (exprId == null) {
insertPartitionFields.clear();
return new InsertPartitionFieldResult(false, hasNonIdentity, spec.getSpecId());
}
insertPartitionFields.add(new DistributionSpecMerge.IcebergPartitionField(
field.getTransform(), exprId, field.getTransformParam(),
field.getFieldName(), field.getSourceId()));
}
if (insertPartitionFields.isEmpty()) {
return new InsertPartitionFieldResult(false, hasNonIdentity, spec.getSpecId());
}
return new InsertPartitionFieldResult(true, hasNonIdentity, spec.getSpecId());
}
private Integer parseTransformParam(String transform) {
int start = transform.indexOf('[');
int end = transform.indexOf(']');
if (start < 0 || end <= start) {
return null;
}
try {
return Integer.parseInt(transform.substring(start + 1, end));
} catch (NumberFormatException e) {
return null;
}
}
// Package-private (not private) so the same-package parity test can assert on the reconstructed
// result of {@link #reconstructPartitionFields} directly, without driving the full distribution.
static class InsertPartitionFieldResult {
final boolean success;
final boolean hasNonIdentity;
final Integer partitionSpecId;
InsertPartitionFieldResult(boolean success, boolean hasNonIdentity, Integer partitionSpecId) {
this.success = success;
this.hasNonIdentity = hasNonIdentity;
this.partitionSpecId = partitionSpecId;
}
}
}