PluginDrivenInsertExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.catalog.Column;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorColumn;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.ConnectorWriteOps;
import org.apache.doris.connector.api.handle.ConnectorInsertHandle;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.handle.ConnectorTransaction;
import org.apache.doris.connector.api.write.ConnectorWriteType;
import org.apache.doris.datasource.ConnectorColumnConverter;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.PluginDrivenExternalCatalog;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PluginDrivenTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.transaction.PluginDrivenTransactionManager;
import org.apache.doris.transaction.TransactionType;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * Insert executor for plugin-driven connector catalogs.
 * Delegates the write lifecycle to the connector's ConnectorWriteOps SPI.
 */
public class PluginDrivenInsertExecutor extends BaseExternalTableInsertExecutor {

    private static final Logger LOG = LogManager.getLogger(PluginDrivenInsertExecutor.class);

    private transient ConnectorInsertHandle insertHandle;
    private transient ConnectorSession connectorSession;
    private transient ConnectorWriteOps writeOps;
    private transient ConnectorWriteType resolvedWriteType;
    // Non-null only for the SPI transaction model (e.g. maxcompute): opened in beginTransaction(),
    // bound onto the sink's session in finalizeSink(), and committed via the transaction manager
    // in onComplete(). Null for the JDBC / auto-commit insert-handle path.
    private transient ConnectorTransaction connectorTx;

    /**
     * constructor
     */
    public PluginDrivenInsertExecutor(ConnectContext ctx, ExternalTable table,
                                      String labelName, NereidsPlanner planner,
                                      Optional<InsertCommandContext> insertCtx,
                                      boolean emptyInsert, long jobId) {
        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
    }

    @Override
    public void beginTransaction() {
        ensureConnectorSetup();
        if (writeOps.usesConnectorTransaction()) {
            // SPI transaction model (e.g. maxcompute): open a connector transaction and let the
            // plugin-driven transaction manager register it globally, so the BE block-allocation
            // RPC and commit-data feedback can look it up by id. The ODPS write session that backs
            // it is created later by planWrite (reached through finalizeSink -> bindDataSink).
            connectorTx = writeOps.beginTransaction(connectorSession);
            txnId = ((PluginDrivenTransactionManager) transactionManager).begin(connectorTx);
        } else {
            // JDBC / auto-commit handle model: allocate a no-op engine txn id.
            super.beginTransaction();
        }
    }

    @Override
    protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
        // Transaction model: bind the connector transaction onto the SINK's session BEFORE
        // super.finalizeSink -> bindDataSink -> planWrite, which reads it via
        // ConnectorSession.getCurrentTransaction() (fail-loud if absent). The sink carries its own
        // ConnectorSession built at translate time; the txn is shared with it by reference.
        if (connectorTx != null && sink instanceof PluginDrivenTableSink) {
            ((PluginDrivenTableSink) sink).getConnectorSession().setCurrentTransaction(connectorTx);
        }
        super.finalizeSink(fragment, sink, physicalSink);
    }

    @Override
    protected void beforeExec() throws UserException {
        if (connectorTx != null) {
            // Transaction model: the write session was already created by planWrite (in
            // finalizeSink). There is no per-statement insert handle to open here.
            return;
        }
        // JDBC / auto-commit handle model.
        ensureConnectorSetup();
        if (!writeOps.supportsInsert()) {
            throw new UserException("Connector does not support INSERT for table: "
                    + table.getName());
        }

        // Get table handle using remote names (not local/mapped names)
        ExternalTable extTable = (ExternalTable) table;
        String remoteDbName = extTable.getRemoteDbName();
        String remoteTableName = extTable.getRemoteName();
        Optional<ConnectorTableHandle> tableHandle = ((ConnectorMetadata) writeOps).getTableHandle(
                connectorSession, remoteDbName, remoteTableName);
        if (!tableHandle.isPresent()) {
            throw new UserException("Table not found via connector: "
                    + remoteDbName + "." + remoteTableName);
        }

        // Convert Doris columns to connector columns
        List<ConnectorColumn> columns = toConnectorColumns(extTable.getBaseSchema(true));

        // Resolve write type for transaction type decision
        resolvedWriteType = writeOps.getWriteConfig(
                connectorSession, tableHandle.get(), columns).getWriteType();

        // Begin insert
        insertHandle = writeOps.beginInsert(connectorSession, tableHandle.get(), columns);
        LOG.info("Plugin-driven insert started for table {}.{}, txnId={}",
                remoteDbName, remoteTableName, txnId);
    }

    @Override
    protected void doBeforeCommit() throws UserException {
        if (writeOps != null && insertHandle != null) {
            writeOps.finishInsert(connectorSession, insertHandle, Collections.emptyList());
        }
        if (connectorTx != null) {
            // SPI transaction model (e.g. maxcompute): the BE sink reports row counts via the
            // connector transaction's commit-data (TMCCommitData.row_count), NOT via the
            // coordinator's DPP_NORMAL_ALL load counter, so AbstractInsertExecutor leaves
            // loadedRows at 0. Backfill it here, mirroring legacy MCInsertExecutor.doBeforeCommit
            // (loadedRows = transaction.getUpdateCnt()); without it the client / SHOW INSERT RESULT
            // / audit log report "affected rows: 0" even though data was written. The commit itself
            // happens via the transaction manager (onComplete), so no finishInsert is needed here.
            // This branch is mutually exclusive with the insert-handle branch above (the transaction
            // model never opens a per-statement insert handle).
            loadedRows = connectorTx.getUpdateCnt();
        }
    }

    /**
     * Post-commit refresh is best-effort for ALL connector write paths — both the
     * JDBC / auto-commit handle model and the SPI connector-transaction model
     * (e.g. maxcompute).
     *
     * <p>By the time this runs, the remote write is already durably committed and
     * FE cannot roll it back: for JDBC_WRITE the BE commits directly via
     * PreparedStatement; for the connector-transaction path (maxcompute) the ODPS
     * write session is committed by the transaction manager in onComplete, before
     * this step. {@code super.doAfterCommit()} only refreshes FE-side metadata
     * cache and writes an external-table refresh edit log (a cache-invalidation
     * hint to followers); it never touches the already-committed remote data.</p>
     *
     * <p>If that refresh fails (e.g., catalog dropped concurrently, edit log I/O
     * error), reporting the INSERT as failed would mislead the user into retrying
     * and writing duplicate data. The worst case of swallowing is transient cache
     * staleness, which self-heals on the next refresh / TTL.</p>
     *
     * <p>This intentionally diverges from legacy MCInsertExecutor, which does not
     * override doAfterCommit so a refresh failure propagates and the INSERT is
     * reported FAILED (see deviations-log DV-018). We preserve the safer
     * swallow-and-warn behavior — matching the old JdbcInsertExecutor, which did
     * no post-commit work at all — while still attempting the refresh so the cache
     * stays fresh in the common case.</p>
     */
    @Override
    protected void doAfterCommit() throws DdlException {
        try {
            super.doAfterCommit();
        } catch (Exception e) {
            LOG.warn("Post-commit cache refresh failed for table {} (write type: {}). "
                    + "Data was committed successfully; cache may be stale until next refresh.",
                    table.getName(), resolvedWriteType, e);
        }
    }

    @Override
    protected void onFail(Throwable t) {
        // Abort the connector-level write before the Doris-level transaction rollback
        if (writeOps != null && insertHandle != null) {
            try {
                writeOps.abortInsert(connectorSession, insertHandle);
            } catch (Exception e) {
                LOG.warn("Failed to abort connector insert for table {}: {}",
                        table.getName(), e.getMessage(), e);
            }
        }
        super.onFail(t);
    }

    @Override
    protected TransactionType transactionType() {
        if (connectorTx != null) {
            // SPI transaction model. maxcompute is currently the sole adopter; this value is
            // profiling-only. Revisit when a second transaction-model connector arrives.
            return TransactionType.MAXCOMPUTE;
        }
        if (resolvedWriteType == ConnectorWriteType.JDBC_WRITE) {
            return TransactionType.JDBC;
        }
        return TransactionType.HMS;
    }

    /**
     * Lazily builds the connector session and write-ops handle for this insert. Idempotent so
     * both {@link #beginTransaction()} and {@link #beforeExec()} can call it: the empty-insert
     * path skips beginTransaction, so beforeExec must still be able to set up on its own.
     */
    private void ensureConnectorSetup() {
        if (connectorSession != null) {
            return;
        }
        PluginDrivenExternalCatalog catalog =
                (PluginDrivenExternalCatalog) ((ExternalTable) table).getCatalog();
        Connector connector = catalog.getConnector();
        connectorSession = catalog.buildConnectorSession();
        writeOps = connector.getMetadata(connectorSession);
    }

    /**
     * Converts a list of Doris {@link Column} to a list of {@link ConnectorColumn}.
     * This is the reverse of {@link org.apache.doris.datasource.ConnectorColumnConverter#convertColumns}.
     */
    private static List<ConnectorColumn> toConnectorColumns(List<Column> dorisColumns) {
        return dorisColumns.stream()
                .map(PluginDrivenInsertExecutor::toConnectorColumn)
                .collect(Collectors.toList());
    }

    private static ConnectorColumn toConnectorColumn(Column col) {
        return ConnectorColumnConverter.toConnectorColumn(col);
    }
}