PluginDrivenTableSink.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.connector.api.ConnectorColumn;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.handle.ConnectorWriteHandle;
import org.apache.doris.connector.api.handle.WriteOperation;
import org.apache.doris.connector.api.write.ConnectorSinkPlan;
import org.apache.doris.connector.api.write.ConnectorWritePlanProvider;
import org.apache.doris.datasource.PluginDrivenExternalTable;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.PluginDrivenInsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TSortInfo;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Generic data sink for plugin-driven external tables.
*
* <p>Extends {@link BaseExternalTableDataSink}. The connector supplies a
* {@link ConnectorWritePlanProvider} and builds its own opaque {@link TDataSink}
* via {@link ConnectorWritePlanProvider#planWrite}; the engine dispatches that
* sink to BE unchanged. This is the single, source-agnostic write path used by
* every write-capable connector (jdbc / maxcompute / iceberg). The connector-
* specific {@code T*TableSink} dialect lives entirely inside the connector.</p>
*/
public class PluginDrivenTableSink extends BaseExternalTableDataSink {
private final PluginDrivenExternalTable targetTable;
// Plan-provider mode (W5): the connector builds its own opaque TDataSink via planWrite().
private final ConnectorWritePlanProvider writePlanProvider;
private final ConnectorSession connectorSession;
private final ConnectorTableHandle tableHandle;
private final List<ConnectorColumn> connectorColumns;
// The engine-built BE sort instruction for a connector that declares write-sort columns (iceberg
// WRITE ORDERED BY); null when the target needs no write sort. The connector cannot build it (the
// bound output exprs live only here), so the translator resolves the connector's declared sort
// columns against the sink output and hands the TSortInfo here to thread onto the write handle.
private final TSortInfo writeSortInfo;
// The DML write operation this sink performs. A plain INSERT sink keeps the default INSERT (the
// connector promotes it to OVERWRITE from the handle's isOverwrite() flag); the row-level DML
// translator arms (DELETE / UPDATE / MERGE) pass the operation here so the connector's planWrite
// dispatches to the matching BE sink dialect (TIcebergDeleteSink / TIcebergMergeSink) instead of
// the INSERT TIcebergTableSink. Threaded onto the write handle so planWrite's buildWriteContext
// reads it via ConnectorWriteHandle.getWriteOperation().
private final WriteOperation writeOperation;
/**
* Plan-provider mode (W5): the connector supplies a {@link ConnectorWritePlanProvider}
* and builds its own opaque {@link TDataSink} via
* {@link ConnectorWritePlanProvider#planWrite}.
*/
public PluginDrivenTableSink(PluginDrivenExternalTable targetTable,
ConnectorWritePlanProvider writePlanProvider, ConnectorSession connectorSession,
ConnectorTableHandle tableHandle, List<ConnectorColumn> connectorColumns) {
this(targetTable, writePlanProvider, connectorSession, tableHandle, connectorColumns, null);
}
/**
* Plan-provider mode with an engine-built write {@link TSortInfo} threaded to the connector's write
* handle (for a connector that declares write-sort columns).
*/
public PluginDrivenTableSink(PluginDrivenExternalTable targetTable,
ConnectorWritePlanProvider writePlanProvider, ConnectorSession connectorSession,
ConnectorTableHandle tableHandle, List<ConnectorColumn> connectorColumns,
TSortInfo writeSortInfo) {
this(targetTable, writePlanProvider, connectorSession, tableHandle, connectorColumns,
writeSortInfo, WriteOperation.INSERT);
}
/**
* Plan-provider mode with the DML {@link WriteOperation} threaded to the connector's write handle, so
* the connector's {@code planWrite} dispatches to the matching BE sink dialect. The two shorter ctors
* default this to {@link WriteOperation#INSERT} (the byte-identical plain-INSERT path); the row-level
* DML translator arms use this ctor with {@code DELETE} / {@code UPDATE} / {@code MERGE}.
*/
public PluginDrivenTableSink(PluginDrivenExternalTable targetTable,
ConnectorWritePlanProvider writePlanProvider, ConnectorSession connectorSession,
ConnectorTableHandle tableHandle, List<ConnectorColumn> connectorColumns,
TSortInfo writeSortInfo, WriteOperation writeOperation) {
super();
this.targetTable = targetTable;
this.writePlanProvider = writePlanProvider;
this.connectorSession = connectorSession;
this.tableHandle = tableHandle;
this.connectorColumns = connectorColumns;
this.writeSortInfo = writeSortInfo;
this.writeOperation = writeOperation == null ? WriteOperation.INSERT : writeOperation;
}
/**
* The connector session this sink's write plan reads. The insert executor binds the
* connector transaction onto it (via {@link ConnectorSession#setCurrentTransaction})
* before {@code bindDataSink} runs, so the connector's {@code planWrite} sees the active
* transaction.
*/
public ConnectorSession getConnectorSession() {
return connectorSession;
}
@Override
protected Set<TFileFormatType> supportedFileFormatTypes() {
// Connector determines format through its own write plan; accept all
return EnumSet.allOf(TFileFormatType.class);
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder sb = new StringBuilder();
sb.append(prefix).append("PLUGIN-DRIVEN TABLE SINK\n");
if (explainLevel == TExplainLevel.BRIEF) {
return sb.toString();
}
sb.append(prefix).append(" WRITE: plan-provider\n");
sb.append(prefix).append(" TABLE: ").append(targetTable.getName()).append("\n");
// Let the connector surface its own write detail (e.g. jdbc INSERT SQL); the sink itself is
// source-agnostic. This runs before the write plan is bound (planWrite has not run yet for an
// EXPLAIN), so the connector derives the detail from the write handle.
ConnectorWriteHandle handle = new PluginDrivenWriteHandle(
tableHandle, connectorColumns, false, Collections.emptyMap(), null, Optional.empty(),
writeOperation);
writePlanProvider.appendExplainInfo(sb, prefix, connectorSession, handle);
return sb.toString();
}
/**
* Delegates sink construction to the connector, which returns its own opaque
* {@link TDataSink}; the engine dispatches it to BE unchanged. The
* {@link ConnectorWriteHandle} carries the bound target table handle and write columns.
*
* <p>Connector-specific write context (OVERWRITE flag, static partition spec) is read from
* the {@link PluginDrivenInsertCommandContext} and passed through to the connector.</p>
*/
@Override
public void bindDataSink(Optional<InsertCommandContext> insertCtx)
throws AnalysisException {
boolean overwrite = false;
Map<String, String> writeContext = Collections.emptyMap();
Optional<String> branchName = Optional.empty();
if (insertCtx.isPresent() && insertCtx.get() instanceof PluginDrivenInsertCommandContext) {
PluginDrivenInsertCommandContext ctx = (PluginDrivenInsertCommandContext) insertCtx.get();
overwrite = ctx.isOverwrite();
writeContext = ctx.getStaticPartitionSpec();
branchName = ctx.getBranchName();
}
ConnectorWriteHandle handle = new PluginDrivenWriteHandle(
tableHandle, connectorColumns, overwrite, writeContext, writeSortInfo, branchName,
writeOperation);
ConnectorSinkPlan sinkPlan = writePlanProvider.planWrite(connectorSession, handle);
this.tDataSink = sinkPlan.getDataSink();
}
/**
* Returns the target table.
*/
public PluginDrivenExternalTable getTargetTable() {
return targetTable;
}
/** Bound {@link ConnectorWriteHandle} passed to {@link ConnectorWritePlanProvider#planWrite}. */
private static final class PluginDrivenWriteHandle implements ConnectorWriteHandle {
private final ConnectorTableHandle tableHandle;
private final List<ConnectorColumn> columns;
private final boolean overwrite;
private final Map<String, String> writeContext;
private final TSortInfo sortInfo;
private final Optional<String> branchName;
private final WriteOperation writeOperation;
private PluginDrivenWriteHandle(ConnectorTableHandle tableHandle, List<ConnectorColumn> columns,
boolean overwrite, Map<String, String> writeContext, TSortInfo sortInfo,
Optional<String> branchName, WriteOperation writeOperation) {
this.tableHandle = tableHandle;
this.columns = columns;
this.overwrite = overwrite;
this.writeContext = writeContext;
this.sortInfo = sortInfo;
this.branchName = branchName == null ? Optional.empty() : branchName;
this.writeOperation = writeOperation == null ? WriteOperation.INSERT : writeOperation;
}
@Override
public TSortInfo getSortInfo() {
return sortInfo;
}
@Override
public Optional<String> getBranchName() {
return branchName;
}
@Override
public ConnectorTableHandle getTableHandle() {
return tableHandle;
}
@Override
public List<ConnectorColumn> getColumns() {
return columns;
}
@Override
public boolean isOverwrite() {
return overwrite;
}
@Override
public Map<String, String> getWriteContext() {
return writeContext;
}
@Override
public WriteOperation getWriteOperation() {
return writeOperation;
}
}
}