PluginDrivenInsertExecutor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorColumn;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.ConnectorWriteOps;
import org.apache.doris.connector.api.handle.ConnectorInsertHandle;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.write.ConnectorWriteType;
import org.apache.doris.datasource.ConnectorColumnConverter;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.PluginDrivenExternalCatalog;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.transaction.TransactionType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Insert executor for plugin-driven connector catalogs.
* Delegates the write lifecycle to the connector's ConnectorWriteOps SPI.
*/
public class PluginDrivenInsertExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG = LogManager.getLogger(PluginDrivenInsertExecutor.class);
private transient ConnectorInsertHandle insertHandle;
private transient ConnectorSession connectorSession;
private transient ConnectorWriteOps writeOps;
private transient ConnectorWriteType resolvedWriteType;
/**
* constructor
*/
public PluginDrivenInsertExecutor(ConnectContext ctx, ExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx,
boolean emptyInsert, long jobId) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
}
@Override
protected void beforeExec() throws UserException {
PluginDrivenExternalCatalog catalog =
(PluginDrivenExternalCatalog) ((ExternalTable) table).getCatalog();
Connector connector = catalog.getConnector();
connectorSession = catalog.buildConnectorSession();
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
writeOps = metadata;
if (!writeOps.supportsInsert()) {
throw new UserException("Connector does not support INSERT for table: "
+ table.getName());
}
// Get table handle using remote names (not local/mapped names)
ExternalTable extTable = (ExternalTable) table;
String remoteDbName = extTable.getRemoteDbName();
String remoteTableName = extTable.getRemoteName();
Optional<ConnectorTableHandle> tableHandle = metadata.getTableHandle(
connectorSession, remoteDbName, remoteTableName);
if (!tableHandle.isPresent()) {
throw new UserException("Table not found via connector: "
+ remoteDbName + "." + remoteTableName);
}
// Convert Doris columns to connector columns
List<ConnectorColumn> columns = toConnectorColumns(extTable.getBaseSchema(true));
// Resolve write type for transaction type decision
resolvedWriteType = writeOps.getWriteConfig(
connectorSession, tableHandle.get(), columns).getWriteType();
// Begin insert
insertHandle = writeOps.beginInsert(connectorSession, tableHandle.get(), columns);
LOG.info("Plugin-driven insert started for table {}.{}, txnId={}",
remoteDbName, remoteTableName, txnId);
}
@Override
protected void doBeforeCommit() throws UserException {
if (writeOps != null && insertHandle != null) {
writeOps.finishInsert(connectorSession, insertHandle, Collections.emptyList());
}
}
/**
* Post-commit refresh is best-effort for connector writes.
*
* <p>For JDBC_WRITE, the remote write is committed directly by BE via
* PreparedStatement ��� FE cannot roll it back. If the post-commit cache
* refresh fails (e.g., catalog dropped concurrently, edit log I/O error),
* reporting the INSERT as failed would mislead the user into retrying,
* causing duplicate data. The old JdbcInsertExecutor avoided this by
* not performing any post-commit work at all.</p>
*
* <p>We preserve that safety guarantee while still attempting the refresh
* so that cache stays fresh in the common case.</p>
*/
@Override
protected void doAfterCommit() throws DdlException {
try {
super.doAfterCommit();
} catch (Exception e) {
LOG.warn("Post-commit cache refresh failed for table {} (write type: {}). "
+ "Data was committed successfully; cache may be stale until next refresh.",
table.getName(), resolvedWriteType, e);
}
}
@Override
protected void onFail(Throwable t) {
// Abort the connector-level write before the Doris-level transaction rollback
if (writeOps != null && insertHandle != null) {
try {
writeOps.abortInsert(connectorSession, insertHandle);
} catch (Exception e) {
LOG.warn("Failed to abort connector insert for table {}: {}",
table.getName(), e.getMessage(), e);
}
}
super.onFail(t);
}
@Override
protected TransactionType transactionType() {
if (resolvedWriteType == ConnectorWriteType.JDBC_WRITE) {
return TransactionType.JDBC;
}
return TransactionType.HMS;
}
/**
* Converts a list of Doris {@link Column} to a list of {@link ConnectorColumn}.
* This is the reverse of {@link org.apache.doris.datasource.ConnectorColumnConverter#convertColumns}.
*/
private static List<ConnectorColumn> toConnectorColumns(List<Column> dorisColumns) {
return dorisColumns.stream()
.map(PluginDrivenInsertExecutor::toConnectorColumn)
.collect(Collectors.toList());
}
private static ConnectorColumn toConnectorColumn(Column col) {
return ConnectorColumnConverter.toConnectorColumn(col);
}
}