PluginDrivenInsertExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.ConnectorWriteOps;
import org.apache.doris.connector.api.handle.ConnectorTransaction;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.PluginDrivenExternalCatalog;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PluginDrivenTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.transaction.PluginDrivenTransactionManager;
import org.apache.doris.transaction.TransactionType;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Optional;

/**
 * Insert executor for plugin-driven connector catalogs.
 *
 * <p>Delegates the write lifecycle to the connector's {@link ConnectorWriteOps} SPI through a
 * single transaction model: {@link #beginTransaction()} opens a {@link ConnectorTransaction}
 * and registers it globally; {@link #finalizeSink} binds it onto the sink's session so the
 * connector's {@code planWrite} sees it; BE feeds commit fragments back through the transaction;
 * {@code onComplete} commits / {@code onFail} rolls back via the transaction manager. Connectors
 * whose writes are auto-committed by BE (e.g. jdbc) return a degenerate no-op transaction.</p>
 */
public class PluginDrivenInsertExecutor extends BaseExternalTableInsertExecutor {

    private static final Logger LOG = LogManager.getLogger(PluginDrivenInsertExecutor.class);

    private transient ConnectorSession connectorSession;
    private transient ConnectorWriteOps writeOps;
    // The connector transaction for this write: opened in beginTransaction(), bound onto the
    // sink's session in finalizeSink(), and committed / rolled back via the transaction manager
    // in onComplete() / onFail(). Null only on the empty-insert path, which skips beginTransaction.
    private transient ConnectorTransaction connectorTx;

    /**
     * constructor
     */
    public PluginDrivenInsertExecutor(ConnectContext ctx, ExternalTable table,
                                      String labelName, NereidsPlanner planner,
                                      Optional<InsertCommandContext> insertCtx,
                                      boolean emptyInsert, long jobId) {
        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
    }

    @Override
    public void beginTransaction() {
        ensureConnectorSetup();
        // Single transaction model: every plugin-driven write opens a ConnectorTransaction and
        // registers it globally, so the BE block-allocation RPC and commit-data feedback can look
        // it up by id. Connectors whose writes are auto-committed by BE (jdbc) return a degenerate
        // no-op transaction; maxcompute returns a real one. The connector-specific write session is
        // created later by planWrite (reached through finalizeSink -> bindDataSink).
        connectorTx = writeOps.beginTransaction(connectorSession);
        txnId = ((PluginDrivenTransactionManager) transactionManager).begin(connectorTx);
    }

    @Override
    public ConnectorTransaction getConnectorTransactionOrNull() {
        return connectorTx;
    }

    @Override
    protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
        // Bind the connector transaction onto the SINK's session BEFORE super.finalizeSink ->
        // bindDataSink -> planWrite, which reads it via ConnectorSession.getCurrentTransaction().
        // Only plan-provider sinks (e.g. maxcompute) carry a connector session; config-bag sinks
        // (jdbc) have none and build their TDataSink without the transaction, so skip binding for
        // them (getConnectorSession() == null) ��� otherwise this would NPE.
        if (connectorTx != null && sink instanceof PluginDrivenTableSink) {
            ConnectorSession sinkSession = ((PluginDrivenTableSink) sink).getConnectorSession();
            if (sinkSession != null) {
                sinkSession.setCurrentTransaction(connectorTx);
            }
        }
        super.finalizeSink(fragment, sink, physicalSink);
    }

    /**
     * Public finalize entry for the row-level DML shell ({@code RowLevelDmlCommand} via
     * {@code IcebergRowLevelDmlTransform.finalizeSink}), which lives outside this package and so cannot reach
     * the {@code protected} {@link #finalizeSink}. Mirrors the legacy
     * {@code IcebergDeleteExecutor.finalizeSinkForDelete} public entry, but with NO rewritable-delete overlay:
     * the connector's {@code planWrite} supplies {@code rewritable_delete_file_sets} via the write handle (the
     * scan-time stash), so the base finalize (bind tx &rarr; {@code bindDataSink} &rarr; {@code planWrite}) is
     * the single, complete finalize for a row-level DELETE/MERGE write. {@code executeSingleInsert} does not
     * finalize, so this is the one and only finalize call on the row-level DML path.
     */
    public void finalizeRowLevelDmlSink(PlanFragment fragment, DataSink sink, PhysicalSink<?> physicalSink) {
        finalizeSink(fragment, sink, physicalSink);
    }

    @Override
    protected void beforeExec() throws UserException {
        // Single transaction model: the connector write session is created by planWrite
        // (in finalizeSink). There is no per-statement handle to open here.
    }

    @Override
    protected void doBeforeCommit() throws UserException {
        if (connectorTx != null) {
            // BE reports the affected-row count either through the connector transaction's
            // commit-data (e.g. maxcompute TMCCommitData.row_count) or through the coordinator's
            // DPP_NORMAL_ALL load counter (e.g. jdbc). getUpdateCnt() == -1 means "no count from
            // the transaction; keep the coordinator counter" (NoOpConnectorTransaction); a value
            // >= 0 is authoritative and backfills loadedRows, which AbstractInsertExecutor otherwise
            // leaves at 0 for the transaction model. Mirrors legacy MCInsertExecutor.doBeforeCommit.
            long cnt = connectorTx.getUpdateCnt();
            if (cnt >= 0) {
                loadedRows = cnt;
            }
        }
    }

    /**
     * Post-commit refresh is best-effort for ALL connector write paths.
     *
     * <p>By the time this runs, the remote write is already durably committed and FE cannot roll
     * it back: for jdbc the BE commits directly via PreparedStatement; for the connector-transaction
     * path (maxcompute) the write session is committed by the transaction manager in onComplete,
     * before this step. {@code super.doAfterCommit()} only refreshes FE-side metadata cache and
     * writes an external-table refresh edit log (a cache-invalidation hint to followers); it never
     * touches the already-committed remote data.</p>
     *
     * <p>If that refresh fails (e.g., catalog dropped concurrently, edit log I/O error), reporting
     * the INSERT as failed would mislead the user into retrying and writing duplicate data. The
     * worst case of swallowing is transient cache staleness, which self-heals on the next refresh /
     * TTL. This intentionally diverges from legacy MCInsertExecutor (see deviations-log DV-018),
     * preserving the safer swallow-and-warn behavior of the old JdbcInsertExecutor.</p>
     */
    @Override
    protected void doAfterCommit() throws DdlException {
        try {
            super.doAfterCommit();
        } catch (Exception e) {
            LOG.warn("Post-commit cache refresh failed for table {}. "
                    + "Data was committed successfully; cache may be stale until next refresh.",
                    table.getName(), e);
        }
    }

    @Override
    protected TransactionType transactionType() {
        if (connectorTx == null) {
            // empty-insert path skips beginTransaction; no transaction was opened.
            return TransactionType.UNKNOWN;
        }
        // The connector tags its transaction with a profile label (e.g. "JDBC" / "MAXCOMPUTE");
        // map it to the profiling TransactionType. Unknown labels fall back to UNKNOWN.
        String label = connectorTx.profileLabel();
        for (TransactionType type : TransactionType.values()) {
            if (type.name().equals(label)) {
                return type;
            }
        }
        return TransactionType.UNKNOWN;
    }

    /**
     * Lazily builds the connector session and write-ops handle for this insert. Called from
     * {@link #beginTransaction()} before opening the connector transaction. Idempotent.
     */
    private void ensureConnectorSetup() {
        if (connectorSession != null) {
            return;
        }
        PluginDrivenExternalCatalog catalog =
                (PluginDrivenExternalCatalog) ((ExternalTable) table).getCatalog();
        Connector connector = catalog.getConnector();
        connectorSession = catalog.buildConnectorSession();
        writeOps = connector.getMetadata(connectorSession);
    }
}