PluginDrivenExternalCatalog.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.ConnectorFactory;
import org.apache.doris.connector.ConnectorSessionBuilder;
import org.apache.doris.connector.DefaultConnectorContext;
import org.apache.doris.connector.DefaultConnectorValidationContext;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.ConnectorTestResult;
import org.apache.doris.connector.api.DorisConnectorException;
import org.apache.doris.connector.api.ddl.ConnectorCreateTableRequest;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.ddl.CreateTableInfoToConnectorRequestConverter;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.transaction.PluginDrivenTransactionManager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

/**
 * An {@link ExternalCatalog} backed by a Connector SPI plugin.
 *
 * <p>This adapter bridges the connector SPI ({@link Connector}) with the existing
 * ExternalCatalog hierarchy. Metadata operations are delegated to the connector's
 * {@link org.apache.doris.connector.api.ConnectorMetadata} implementation.</p>
 *
 * <p>When created via {@link CatalogFactory}, the Connector instance is provided
 * directly. After GSON deserialization (FE restart), the Connector is recreated
 * from catalog properties during {@link #initLocalObjectsImpl()}.</p>
 */
public class PluginDrivenExternalCatalog extends ExternalCatalog {

    private static final Logger LOG = LogManager.getLogger(PluginDrivenExternalCatalog.class);

    // Volatile for cross-thread visibility; all mutations happen under synchronized(this)
    // via makeSureInitialized() ��� initLocalObjectsImpl(), or resetToUninitialized() ��� onClose().
    private transient volatile Connector connector;

    /** No-arg constructor for GSON deserialization. */
    public PluginDrivenExternalCatalog() {
    }

    /**
     * Creates a plugin-driven catalog with an already-created Connector.
     *
     * @param catalogId unique catalog id
     * @param name catalog name
     * @param resource optional resource name
     * @param props catalog properties
     * @param comment catalog comment
     * @param connector the SPI connector instance
     */
    public PluginDrivenExternalCatalog(long catalogId, String name, String resource,
            Map<String, String> props, String comment, Connector connector) {
        super(catalogId, name, InitCatalogLog.Type.PLUGIN, comment);
        this.catalogProperty = new CatalogProperty(resource, props);
        this.connector = connector;
    }

    @Override
    protected void initLocalObjectsImpl() {
        // Always (re-)create the connector so it gets the proper engine context,
        // including the catalog's execution authenticator for Kerberos/secured HMS.
        // The connector created by CatalogFactory used a lightweight context
        // without auth (the catalog didn't exist yet); we replace it now.
        Connector oldConnector = connector;
        Connector newConnector = createConnectorFromProperties();
        if (newConnector != null) {
            connector = newConnector;
            // Close the old connector (e.g., the one injected by CatalogFactory during
            // checkWhenCreating) to release its connection pool and classloader reference.
            if (oldConnector != null && oldConnector != newConnector) {
                try {
                    oldConnector.close();
                } catch (IOException e) {
                    LOG.warn("Failed to close old connector during re-initialization "
                            + "for catalog {}", name, e);
                }
            }
        }
        if (connector == null) {
            throw new RuntimeException("No ConnectorProvider found for plugin-driven catalog: "
                    + name + ", type: " + getType()
                    + ". Ensure the connector plugin is installed.");
        }
        transactionManager = new PluginDrivenTransactionManager();
        initPreExecutionAuthenticator();
    }

    @Override
    protected synchronized void initPreExecutionAuthenticator() {
        if (executionAuthenticator != null) {
            return;
        }
        try {
            MetastoreProperties msp = catalogProperty.getMetastoreProperties();
            if (msp != null) {
                executionAuthenticator = msp.getExecutionAuthenticator();
                return;
            }
        } catch (Exception ignored) {
            // Not all catalog types have metastore properties (e.g., JDBC, ES)
        }
        super.initPreExecutionAuthenticator();
    }

    /**
     * Creates a new Connector from catalog properties. Extracted as a protected method
     * so tests can override without depending on the static ConnectorFactory registry.
     */
    protected Connector createConnectorFromProperties() {
        // Use getType() which falls back to logType when "type" is not in properties.
        // This handles image deserialization of old resource-backed catalogs whose
        // properties never contained "type" (it was derived from the Resource object).
        String catalogType = getType();
        return ConnectorFactory.createConnector(catalogType,
                catalogProperty.getProperties(),
                new DefaultConnectorContext(name, id, this::getExecutionAuthenticator));
    }

    @Override
    public void checkProperties() throws DdlException {
        super.checkProperties();
        String catalogType = getType();
        try {
            ConnectorFactory.validateProperties(catalogType, catalogProperty.getProperties());
        } catch (IllegalArgumentException e) {
            throw new DdlException(e.getMessage());
        }
        // Validate function_rules JSON if present (shared across all connector types).
        String functionRules = catalogProperty.getOrDefault("function_rules", null);
        ExternalFunctionRules.check(functionRules);
    }

    @Override
    public void checkWhenCreating() throws DdlException {
        // Let the connector perform its type-specific pre-creation validation
        // (e.g., JDBC driver security, checksum computation).
        DefaultConnectorValidationContext validationCtx =
                new DefaultConnectorValidationContext(getId(), catalogProperty);
        try {
            connector.preCreateValidation(validationCtx);
        } catch (DdlException e) {
            throw e;
        } catch (Exception e) {
            throw new DdlException(e.getMessage(), e);
        }

        boolean testConnection = Boolean.parseBoolean(
                catalogProperty.getOrDefault(ExternalCatalog.TEST_CONNECTION,
                        String.valueOf(connector.defaultTestConnection())));
        if (!testConnection) {
            return;
        }
        // Delegate FE���external connectivity testing to the connector SPI.
        ConnectorSession session = buildConnectorSession();
        ConnectorTestResult result = connector.testConnection(session);
        if (!result.isSuccess()) {
            throw new DdlException("Connectivity test failed for catalog '"
                    + name + "': " + result.getMessage());
        }
        LOG.info("Connectivity test passed for plugin-driven catalog '{}': {}", name, result);

        // Execute any BE���external connectivity test the connector registered.
        validationCtx.executePendingBeTests();
    }

    /**
     * Handles catalog property updates. Delegates to the parent which resets
     * caches, sets objectCreated=false, and calls onClose() to release the
     * current connector. The next makeSureInitialized() call will trigger
     * initLocalObjectsImpl() which creates a new connector with the updated
     * properties and proper engine context (auth, etc.).
     *
     * <p>This follows the same lifecycle pattern as all other ExternalCatalog
     * subclasses: reset ��� lazy re-initialization on next access.</p>
     */
    @Override
    public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
        super.notifyPropertiesUpdated(updatedProps);
    }

    @Override
    protected List<String> listDatabaseNames() {
        ConnectorSession session = buildConnectorSession();
        return connector.getMetadata(session).listDatabaseNames(session);
    }

    @Override
    protected List<String> listTableNamesFromRemote(SessionContext ctx, String dbName) {
        ConnectorSession session = buildConnectorSession();
        return connector.getMetadata(session).listTableNames(session, dbName);
    }

    @Override
    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
        ConnectorSession session = buildConnectorSession();
        return connector.getMetadata(session)
                .getTableHandle(session, dbName, tblName).isPresent();
    }

    @Override
    public String getType() {
        // Return the actual catalog type (e.g., "es", "jdbc") from properties,
        // not the internal "plugin" logType.
        return catalogProperty.getOrDefault(CatalogMgr.CATALOG_TYPE_PROP, super.getType());
    }

    /** Returns the underlying SPI connector. Ensures the catalog is initialized first. */
    public Connector getConnector() {
        makeSureInitialized();
        return connector;
    }

    /**
     * Routes {@code CREATE TABLE} through the SPI's
     * {@code ConnectorTableOps.createTable(session, request)} instead of the
     * legacy {@code metadataOps} path used by other {@link ExternalCatalog}
     * subclasses.
     *
     * <p>Connectors that have not overridden the new SPI default fall through
     * to the SPI's "CREATE TABLE not supported" exception, which is wrapped
     * here as a {@link DdlException} to match the existing caller contract.</p>
     *
     * <p>The SPI {@code createTable} is {@code void} and this override has no
     * {@code metadataOps}, so it mirrors legacy
     * {@code MaxComputeMetadataOps.createTableImpl}: when the table already exists
     * and {@code IF NOT EXISTS} was given it returns {@code true} and skips the
     * connector create + edit log + cache reset (so a {@code CREATE TABLE IF NOT
     * EXISTS ... AS SELECT} short-circuits per the {@code Env.createTable} contract
     * instead of INSERTing into the existing table); otherwise it creates the table,
     * writes the edit log, resets the cache, and returns {@code false}.</p>
     */
    @Override
    public boolean createTable(CreateTableInfo createTableInfo) throws UserException {
        makeSureInitialized();
        // Resolve the local db name to its remote (ODPS) name before handing it to the connector,
        // mirroring legacy MaxComputeMetadataOps.createTableImpl (db.getRemoteName()). Without this,
        // name-mapped catalogs (lower_case_meta_names / meta_names_mapping, where the local display
        // name differs from the remote name) would address the wrong remote schema. The table name
        // is intentionally NOT remote-resolved (legacy parity: the table does not exist yet, so
        // there is no local->remote mapping for it).
        ExternalDatabase<? extends ExternalTable> db = getDbNullable(createTableInfo.getDbName());
        if (db == null) {
            throw new DdlException("Failed to get database: '" + createTableInfo.getDbName()
                    + "' in catalog: " + getName());
        }
        ConnectorSession session = buildConnectorSession();
        ConnectorMetadata metadata = connector.getMetadata(session);
        // Mirror legacy MaxComputeMetadataOps.createTableImpl:178-197 -- probe BOTH the remote
        // (connector) and the local FE cache for an existing table. On IF NOT EXISTS this lets CTAS
        // short-circuit (Env.createTable contract: return true when the table already exists), so a
        // "CREATE TABLE IF NOT EXISTS ... AS SELECT" does NOT fall through to an INSERT into the
        // pre-existing table. The table name is intentionally NOT remote-resolved (legacy parity).
        boolean exists = metadata.getTableHandle(session, db.getRemoteName(),
                createTableInfo.getTableName()).isPresent()
                || db.getTableNullable(createTableInfo.getTableName()) != null;
        if (exists && createTableInfo.isIfNotExists()) {
            LOG.info("create table[{}.{}.{}] which already exists; skipping (IF NOT EXISTS)",
                    getName(), createTableInfo.getDbName(), createTableInfo.getTableName());
            return true;
        }
        // existing + !IF NOT EXISTS falls through to connector.createTable, which throws
        // "already exists" -> DdlException (unchanged); only the IF NOT EXISTS hit short-circuits.
        ConnectorCreateTableRequest request = CreateTableInfoToConnectorRequestConverter
                .convert(createTableInfo, db.getRemoteName());
        try {
            metadata.createTable(session, request);
        } catch (DorisConnectorException e) {
            throw new DdlException(e.getMessage(), e);
        }
        org.apache.doris.persist.CreateTableInfo persistInfo =
                new org.apache.doris.persist.CreateTableInfo(
                        getName(),
                        createTableInfo.getDbName(),
                        createTableInfo.getTableName());
        Env.getCurrentEnv().getEditLog().logCreateTable(persistInfo);
        // Invalidate the FE-side table-name cache so the new table is immediately visible on
        // this FE. The legacy metadataOps path did this via afterCreateTable(); since
        // PluginDrivenExternalCatalog has no metadataOps, the override must do it here.
        // (Edit log and cache invalidation deliberately use the LOCAL db/table names for
        // follower-replay consistency; only the connector-bound name is remote-resolved.)
        getDbForReplay(createTableInfo.getDbName()).ifPresent(d -> d.resetMetaCacheNames());
        LOG.info("finished to create table {}.{}.{}", getName(),
                createTableInfo.getDbName(), createTableInfo.getTableName());
        return false;
    }

    /**
     * Routes {@code CREATE DATABASE} through the SPI's
     * {@code ConnectorSchemaOps.createDatabase(session, dbName, properties)}.
     *
     * <p>The SPI signature carries no {@code ifNotExists}; this override honors it
     * FE-side. It short-circuits on the local FE cache, and ��� for connectors that
     * support CREATE DATABASE ({@code supportsCreateDatabase()}) ��� also consults the
     * remote {@code databaseExists} so {@code CREATE DATABASE IF NOT EXISTS} on a
     * database that exists remotely but is not yet in this FE's cache cleanly no-ops
     * instead of surfacing a remote "already exists" error (mirroring legacy
     * {@code MaxComputeMetadataOps.createDbImpl}, which checked both). On success it
     * writes the edit log and invalidates the cached db-name list (mirroring the
     * legacy {@code metadataOps.afterCreateDb()} the plugin path no longer has).</p>
     */
    @Override
    public void createDb(String dbName, boolean ifNotExists, Map<String, String> properties) throws DdlException {
        makeSureInitialized();
        // Fast path: FE-cache hit + IF NOT EXISTS => no-op (legacy createDbImpl: dorisDb != null).
        if (ifNotExists && getDbNullable(dbName) != null) {
            return;
        }
        ConnectorSession session = buildConnectorSession();
        ConnectorMetadata metadata = connector.getMetadata(session);
        // FE-cache miss but the db may already exist REMOTELY (created on another FE / before this
        // FE's db-name cache was populated). Legacy MaxComputeMetadataOps.createDbImpl consulted
        // BOTH getDbNullable AND the remote databaseExist, and IF NOT EXISTS then no-oped. Mirror
        // that remote check. Gated on supportsCreateDatabase() so connectors that cannot create
        // databases (jdbc/es/trino) keep their prior behavior (fall through to createDatabase ->
        // "not supported"); the && short-circuit means they never even issue the remote query.
        if (ifNotExists && metadata.supportsCreateDatabase() && metadata.databaseExists(session, dbName)) {
            LOG.info("create database[{}] which already exists remotely, skip", dbName);
            return;
        }
        try {
            metadata.createDatabase(session, dbName, properties);
        } catch (DorisConnectorException e) {
            throw new DdlException(e.getMessage(), e);
        }
        Env.getCurrentEnv().getEditLog().logCreateDb(new CreateDbInfo(getName(), dbName, null));
        resetMetaCacheNames();
        LOG.info("finished to create database {}.{}", getName(), dbName);
    }

    /**
     * Routes {@code DROP DATABASE} through the SPI's
     * {@code ConnectorSchemaOps.dropDatabase(session, dbName, ifExists)}.
     *
     * <p>{@code force} is forwarded to the connector, which performs the table
     * cascade (mirroring legacy {@code MaxComputeMetadataOps.dropDbImpl}; ODPS
     * {@code schemas().delete()} does not auto-cascade). On success it writes the
     * edit log and unregisters the database from the cache (mirroring the legacy
     * {@code metadataOps.afterDropDb()}); legacy emits no per-table editlog for the
     * cascaded tables, so the single {@code logDropDb} + {@code unregisterDatabase}
     * below is the complete legacy db-level FE bookkeeping.</p>
     */
    @Override
    public void dropDb(String dbName, boolean ifExists, boolean force) throws DdlException {
        makeSureInitialized();
        if (getDbNullable(dbName) == null) {
            if (ifExists) {
                return;
            }
            throw new DdlException("Failed to get database: '" + dbName + "' in catalog: " + getName());
        }
        ConnectorSession session = buildConnectorSession();
        try {
            connector.getMetadata(session).dropDatabase(session, dbName, ifExists, force);
        } catch (DorisConnectorException e) {
            throw new DdlException(e.getMessage(), e);
        }
        Env.getCurrentEnv().getEditLog().logDropDb(new DropDbInfo(getName(), dbName));
        unregisterDatabase(dbName);
        LOG.info("finished to drop database {}.{}", getName(), dbName);
    }

    /**
     * Routes {@code DROP TABLE} through the SPI's
     * {@code ConnectorTableOps.dropTable(session, handle)}.
     *
     * <p>The SPI takes a {@link ConnectorTableHandle} and carries no {@code ifExists};
     * this override resolves the handle first (absent = table does not exist) and
     * enforces {@code IF EXISTS} FE-side. On success it writes the edit log and
     * unregisters the table from the cache (mirroring {@code metadataOps.afterDropTable()}).</p>
     */
    @Override
    public void dropTable(String dbName, String tableName, boolean isView, boolean isMtmv, boolean isStream,
                          boolean ifExists, boolean mustTemporary, boolean force) throws DdlException {
        makeSureInitialized();
        // Resolve the local db/table names to their remote (ODPS) names before handing them to the
        // connector, mirroring base ExternalCatalog.dropTable -- the exact path legacy
        // MaxComputeMetadataOps.dropTableImpl ran through, which used dorisTable.getRemoteDbName() /
        // getRemoteName(). Without this, name-mapped catalogs would locate the wrong remote table
        // (IF EXISTS silently no-ops / non-IF-EXISTS wrongly reports "not found"). Matching base:
        // a missing db ALWAYS throws (even with IF EXISTS); a missing table honors IF EXISTS.
        ExternalDatabase<? extends ExternalTable> db = getDbNullable(dbName);
        if (db == null) {
            throw new DdlException("Failed to get database: '" + dbName + "' in catalog: " + getName());
        }
        ExternalTable dorisTable = db.getTableNullable(tableName);
        if (dorisTable == null) {
            if (ifExists) {
                return;
            }
            throw new DdlException("Failed to get table: '" + tableName + "' in database: " + dbName);
        }
        ConnectorSession session = buildConnectorSession();
        ConnectorMetadata metadata = connector.getMetadata(session);
        Optional<ConnectorTableHandle> handle = metadata.getTableHandle(
                session, dorisTable.getRemoteDbName(), dorisTable.getRemoteName());
        // The table is present in the FE cache but may have been dropped out-of-band on the remote
        // side; preserve the existing IF EXISTS handling for that case.
        if (!handle.isPresent()) {
            if (ifExists) {
                return;
            }
            throw new DdlException("Failed to get table: '" + tableName + "' in database: " + dbName);
        }
        try {
            metadata.dropTable(session, handle.get());
        } catch (DorisConnectorException e) {
            throw new DdlException(e.getMessage(), e);
        }
        // Edit log and cache invalidation deliberately use the LOCAL db/table names for
        // follower-replay consistency; only the connector-bound names are remote-resolved.
        Env.getCurrentEnv().getEditLog().logDropTable(new DropInfo(getName(), dbName, tableName));
        getDbForReplay(dbName).ifPresent(d -> d.unregisterTable(tableName));
        LOG.info("finished to drop table {}.{}.{}", getName(), dbName, tableName);
    }

    @Override
    public String fromRemoteDatabaseName(String remoteDatabaseName) {
        ConnectorSession session = buildConnectorSession();
        return connector.getMetadata(session).fromRemoteDatabaseName(session, remoteDatabaseName);
    }

    @Override
    public String fromRemoteTableName(String remoteDatabaseName, String remoteTableName) {
        ConnectorSession session = buildConnectorSession();
        return connector.getMetadata(session).fromRemoteTableName(session, remoteDatabaseName, remoteTableName);
    }

    /**
     * Builds a {@link ConnectorSession} from the current thread's {@link ConnectContext}.
     */
    public ConnectorSession buildConnectorSession() {
        ConnectContext ctx = ConnectContext.get();
        if (ctx != null) {
            return ConnectorSessionBuilder.from(ctx)
                    .withCatalogId(getId())
                    .withCatalogName(getName())
                    .withCatalogProperties(catalogProperty.getProperties())
                    .build();
        }
        return ConnectorSessionBuilder.create()
                .withCatalogId(getId())
                .withCatalogName(getName())
                .withCatalogProperties(catalogProperty.getProperties())
                .build();
    }

    @Override
    protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String remoteDbName, String localDbName,
            long dbId, InitCatalogLog.Type logType, boolean checkExists) {
        // Always use PLUGIN logType regardless of what was serialized (e.g., ES from migration).
        return super.buildDbForInit(remoteDbName, localDbName, dbId, InitCatalogLog.Type.PLUGIN, checkExists);
    }

    @Override
    public void gsonPostProcess() throws IOException {
        super.gsonPostProcess();
        // For old resource-backed catalogs (e.g., ES, JDBC), the "type" property was never
        // persisted ��� it was derived from the Resource object at runtime. After image
        // deserialization with registerCompatibleSubtype, those catalogs land here as
        // PluginDrivenExternalCatalog with logType still set to the original value (ES/JDBC).
        // Backfill "type" from logType before we overwrite it below, so that
        // createConnectorFromProperties() and getType() can resolve the catalog type.
        if (logType != null && logType != InitCatalogLog.Type.PLUGIN
                && logType != InitCatalogLog.Type.UNKNOWN) {
            String oldType = legacyLogTypeToCatalogType(logType);
            if (catalogProperty.getOrDefault(CatalogMgr.CATALOG_TYPE_PROP, "").isEmpty()) {
                LOG.info("Backfilling missing 'type' property for catalog '{}' from logType: {}",
                        name, oldType);
                catalogProperty.addProperty(CatalogMgr.CATALOG_TYPE_PROP, oldType);
            }
        }
        // After deserializing a migrated old catalog (e.g., ES ��� PluginDriven), fix logType
        // so that buildDbForInit uses PLUGIN path.
        if (logType != InitCatalogLog.Type.PLUGIN) {
            LOG.info("Migrating catalog '{}' logType from {} to PLUGIN", name, logType);
            logType = InitCatalogLog.Type.PLUGIN;
        }
    }

    // CatalogFactory type strings don't all match Type.name().toLowerCase():
    // TRINO_CONNECTOR ��� "trino-connector" (hyphen), not "trino_connector".
    // Add cases here whenever a connector's CatalogFactory key diverges from
    // the lowercase enum name.
    // MAX_COMPUTE needs no case: the default branch yields "max_compute", which
    // already matches its CatalogFactory key ��� do not add a redundant case.
    private static String legacyLogTypeToCatalogType(InitCatalogLog.Type logType) {
        switch (logType) {
            case TRINO_CONNECTOR:
                return "trino-connector";
            default:
                return logType.name().toLowerCase(Locale.ROOT);
        }
    }

    @Override
    public void onClose() {
        super.onClose();
        if (connector != null) {
            try {
                connector.close();
            } catch (IOException e) {
                LOG.warn("Failed to close connector for catalog {}", name, e);
            }
            connector = null;
        }
    }
}