ConnectorExecuteAction.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.execute;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.info.PartitionNamesInfo;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorColumn;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.DorisConnectorException;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.procedure.ConnectorProcedureOps;
import org.apache.doris.connector.api.procedure.ConnectorProcedureResult;
import org.apache.doris.connector.api.procedure.ProcedureExecutionMode;
import org.apache.doris.connector.api.pushdown.ConnectorPredicate;
import org.apache.doris.datasource.ConnectorColumnConverter;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.PluginDrivenExternalCatalog;
import org.apache.doris.datasource.PluginDrivenExternalTable;
import org.apache.doris.datasource.UnboundExpressionToConnectorPredicateConverter;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Engine-side {@link ExecuteAction} adapter that routes {@code ALTER TABLE t EXECUTE proc(...)} on a
* {@link PluginDrivenExternalTable} to the connector's {@link ConnectorProcedureOps} (P6.4-T07).
*
* <p>The procedure-side analogue of the connector scan/write dispatch: it threads the catalog's
* {@link ConnectorSession} and the resolved {@link ConnectorTableHandle} into
* {@code getProcedureOps().execute(...)} (mirroring {@code PhysicalPlanTranslator.visitPhysicalConnectorTableSink}),
* then wraps the engine-neutral {@link ConnectorProcedureResult} back into a {@code ResultSet}.</p>
*
* <p><b>Engine/connector split (D-062 §2).</b> The engine keeps the command shell — this adapter performs
* the {@code ALTER} privilege check ({@link #validate}) and the single-row {@code CommonResultSet} wrapping
* ({@link #execute}); {@code ExecuteActionCommand} keeps the edit-log refresh. The connector owns the
* procedure body — per-argument validation (the {@code NamedArguments} framework is not reachable across the
* import gate), the underlying SDK call and the result schema/rows. The connector signals failures with an
* unchecked {@link DorisConnectorException}; this adapter converts it to a {@code UserException} so
* {@code ExecuteActionCommand.run()} re-wraps it with the legacy {@code "Failed to execute action:"} prefix.</p>
*
* <p><b>Dormant pre-cutover.</b> Iceberg tables are {@code IcebergExternalTable} (not
* {@code PluginDrivenExternalTable}) until they enter {@code SPI_READY_TYPES} at P6.6, so this adapter is
* never constructed pre-flip — live {@code ALTER TABLE EXECUTE} still routes to the legacy fe-core actions.</p>
*/
public class ConnectorExecuteAction implements ExecuteAction {
private final String actionType;
private final Map<String, String> properties;
private final Optional<PartitionNamesInfo> partitionNamesInfo;
private final Optional<Expression> whereCondition;
private final PluginDrivenExternalTable table;
public ConnectorExecuteAction(String actionType, Map<String, String> properties,
Optional<PartitionNamesInfo> partitionNamesInfo, Optional<Expression> whereCondition,
PluginDrivenExternalTable table) {
this.actionType = actionType;
this.properties = properties != null ? properties : Collections.emptyMap();
this.partitionNamesInfo = partitionNamesInfo;
this.whereCondition = whereCondition;
this.table = table;
}
@Override
public void validate(TableNameInfo tableNameInfo, UserIdentity currentUser) throws UserException {
// Engine keeps the ALTER privilege check (D-062 §2); per-argument validation is connector-owned and
// runs inside execute() (the NamedArguments framework is not reachable across the connector import gate).
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), tableNameInfo.getCtl(), tableNameInfo.getDb(),
tableNameInfo.getTbl(), PrivPredicate.ALTER)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "ALTER",
currentUser.getQualifiedUser(), ConnectContext.get().getRemoteIP(),
tableNameInfo.getTbl());
}
}
@Override
public ResultSet execute(TableIf ignored) throws UserException {
PluginDrivenExternalCatalog catalog = (PluginDrivenExternalCatalog) table.getCatalog();
Connector connector = catalog.getConnector();
ConnectorProcedureOps procedureOps = connector.getProcedureOps();
if (procedureOps == null) {
throw new DdlException("Connector '" + catalog.getName() + "' (type: " + catalog.getType()
+ ") does not support EXECUTE actions");
}
// The execution mode (the connector decides; no instanceof Iceberg, no procedure name hard-coded in the
// engine) gates BOTH the WHERE handling and the dispatch arm.
ProcedureExecutionMode mode = procedureOps.getExecutionMode(actionType);
// WHERE handling is mode-split. Only a DISTRIBUTED rewrite (rewrite_data_files) scopes its work by a
// WHERE; the eight pure-SDK SINGLE_CALL procedures reject any WHERE (fail-loud over silently dropping a
// user predicate). The DISTRIBUTED arm lowers the WHERE to a neutral ConnectorPredicate below.
if (whereCondition.isPresent() && mode != ProcedureExecutionMode.DISTRIBUTED) {
throw new DdlException("WHERE condition is not supported for this EXECUTE action");
}
// Resolve the shared connector prerequisites — both dispatch arms (single-call and distributed) need
// the session, the resolved table handle and the partition names.
ConnectorSession session = catalog.buildConnectorSession();
ConnectorMetadata metadata = connector.getMetadata(session);
ConnectorTableHandle tableHandle = metadata
.getTableHandle(session, table.getRemoteDbName(), table.getRemoteName())
.orElseThrow(() -> new AnalysisException("Table not found: " + table.getRemoteDbName()
+ "." + table.getRemoteName() + " in catalog " + catalog.getName()));
List<String> partitionNames = partitionNamesInfo
.map(PartitionNamesInfo::getPartitionNames).orElse(Collections.emptyList());
// A DISTRIBUTED procedure (rewrite_data_files) cannot be expressed by the single-row execute() contract,
// so it goes to the distributed rewrite driver. Lower a present WHERE to a neutral ConnectorPredicate
// here (engine half, no iceberg types); the converter is fail-loud, so an unrepresentable WHERE throws
// rather than silently widening the rewrite scope.
if (mode == ProcedureExecutionMode.DISTRIBUTED) {
ConnectorPredicate loweredWhere = whereCondition.isPresent()
? UnboundExpressionToConnectorPredicateConverter.convert(whereCondition.get(), table)
: null;
ConnectorRewriteDriver driver = new ConnectorRewriteDriver(ConnectContext.get(), table, catalog,
metadata, procedureOps, session, tableHandle, actionType, properties, partitionNames,
loweredWhere);
try {
ConnectorProcedureResult result = driver.run();
refreshTableCachesAfterMutation();
return wrapResult(result);
} catch (DorisConnectorException e) {
throw new UserException(e.getMessage(), e);
}
}
// SINGLE_CALL: a synchronous single-result procedure.
try {
ConnectorProcedureResult result = procedureOps.execute(
session, tableHandle, actionType, properties, null, partitionNames);
refreshTableCachesAfterMutation();
return wrapResult(result);
} catch (DorisConnectorException e) {
// Surface the connector's unchecked exception as a checked UserException so
// ExecuteActionCommand.run() catches it and re-wraps it with the legacy "Failed to execute action:"
// prefix. Use the plain UserException type the legacy action bodies threw (e.g.
// IcebergRollbackToSnapshotAction.executeAction), so getMessage() formats identically; the message is
// kept verbatim (the connector preserves the legacy text byte-for-byte — T08 byte-parity).
throw new UserException(e.getMessage(), e);
}
}
/**
* After a successful procedure commit, drop this FE's caches for the mutated table through the standard
* refresh-table path — exactly what a follower FE does when it replays the refresh-table journal that
* {@code ExecuteActionCommand} writes after this returns. {@code refreshTableInternal} clears BOTH the
* engine meta cache (keyed by the table's LOCAL names) and the connector's own per-table cache (keyed by
* the REMOTE names), resolving both from the {@link PluginDrivenExternalTable}. Without this, the FE that
* ran the procedure keeps serving stale connector metadata (the iceberg latest-snapshot cache, default TTL
* 24h) until expiry — a leader/follower split. Connector-agnostic: {@code refreshTableInternal}'s connector
* arm is a generic SPI call (no-op default).
*/
private void refreshTableCachesAfterMutation() {
Env.getCurrentEnv().getRefreshManager()
.refreshTableInternal((ExternalDatabase) table.getDatabase(), table, System.currentTimeMillis());
}
/**
* Wraps the engine-neutral {@link ConnectorProcedureResult} into a {@link CommonResultSet}, enforcing the
* legacy single-row contract (each row's width must equal the declared column count,
* {@code BaseExecuteAction:106-108}). Mirrors {@code BaseExecuteAction.execute}, which returns {@code null}
* when the metadata is absent OR the body row is {@code null}: the connector encodes a {@code null} body row
* as {@code (schema, emptyRows)} ({@code BaseIcebergAction.execute}), so an empty schema OR zero rows maps to
* a {@code null} ResultSet (the command sends nothing).
*/
private ResultSet wrapResult(ConnectorProcedureResult result) {
List<ConnectorColumn> resultSchema = result.getResultSchema();
if (resultSchema == null || resultSchema.isEmpty() || result.getRows().isEmpty()) {
return null;
}
List<Column> columns = ConnectorColumnConverter.convertColumns(resultSchema);
ResultSetMetaData metaData = new CommonResultSet.CommonResultSetMetaData(columns);
for (List<String> row : result.getRows()) {
Preconditions.checkState(columns.size() == row.size(),
"Result row size does not match metadata column count");
}
return new CommonResultSet(metaData, result.getRows());
}
@Override
public boolean isSupported(TableIf table) {
// The connector rejects unknown procedure names inside execute() with its own faithful message
// ("Unsupported <engine> procedure: ..."), so there is no engine-side support pre-filter here.
return true;
}
@Override
public String getDescription() {
return "Connector procedure: " + actionType;
}
@Override
public String getActionType() {
return actionType;
}
@Override
public Map<String, String> getProperties() {
return properties;
}
@Override
public Optional<PartitionNamesInfo> getPartitionNamesInfo() {
return partitionNamesInfo;
}
@Override
public Optional<Expression> getWhereCondition() {
return whereCondition;
}
}